using Newtonsoft.Json.Linq;
using System.Collections.Generic;
private readonly static ILog log = null;
public string Name { get; set; }
public enum ConnectionPoolTypes { Static, Sniffying, Sticky }
public ConnectionPoolTypes ConnectionPoolType { get; set; }
public string ConnectionString { get; set; }
public int RequestTimeOut { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
public string LogPath { get; set; }
public static void Main()
Console.WriteLine("Hello World");
protected ElasticLowLevelClient esClient;
var uris = this.ConnectionString.Split(',').Select(uri => new Uri(uri));
IConnectionPool pool = null;
if (this.ConnectionPoolType == ElasticSearchBase.ConnectionPoolTypes.Sniffying)
pool = new SniffingConnectionPool(uris);
else if (this.ConnectionPoolType == ElasticSearchBase.ConnectionPoolTypes.Sticky)
pool = new StickyConnectionPool(uris);
pool = new StaticConnectionPool(uris);
var conn = new ConnectionConfiguration(pool);
conn.RequestTimeout(TimeSpan.FromSeconds(System.Convert.ToInt32(this.RequestTimeOut)))
.BasicAuthentication(this.UserName, this.Password)
log.Info($"{this.Name} - Initialize ES client");
this.esClient = new ElasticLowLevelClient(conn);
public JObject ScrollSearch(string indexName, string query, int scrollSpan)
string savedScrollId = null;
if (this.esClient == null)
throw new BnsException("No connection to ElasticSearch.");
log.Info($"{this.Name} - Scroll search: {query}");
var searchResponse = this.esClient.Search<DynamicResponse>(indexName, query,
new SearchRequestParameters { Scroll = TimeSpan.FromSeconds(scrollSpan) });
JArray resultArray = new JArray();
while (searchResponse.Success && ((List<dynamic>)searchResponse.Body["hits"]["hits"]).Any())
var stringResponse = Encoding.UTF8.GetString(searchResponse.ResponseBodyInBytes);
LogResponse(indexName, stringResponse);
var resp = JObject.Parse(stringResponse);
resultArray.Merge((JArray)((JObject)resp?["hits"])?["hits"]);
savedScrollId = searchResponse.Body["_scroll_id"].ToString();
log.Info($"{this.Name} - Scrolling");
searchResponse = esClient.Scroll<DynamicResponse>(PostData.Serializable(
scroll = $"{scrollSpan / 60}m",
scroll_id = savedScrollId
if (!searchResponse.Success)
throw searchResponse.OriginalException;
log.Info($"{this.Name} - Total records: {resultArray.Count}");
var json = new JObject(new JProperty("root", resultArray));
LogMessage(indexName, json);
log.Info($"{this.Name} - To clear the scroll once finished");
if (savedScrollId != null)
this.esClient?.ClearScroll<DynamicResponse>(PostData.Serializable(new { scroll_id = savedScrollId }));
public JObject NonScrollSearch(string indexName, string query)
if (this.esClient == null)
throw new BnsException("No connection to ElasticSearch.");
log.Info($"{this.Name} - Non-scroll search index: {indexName} query: {query}");
var searchResponse = this.esClient.Search<DynamicResponse>(indexName, query);
if (!searchResponse.Success)
throw searchResponse.OriginalException;
var stringResponse = Encoding.UTF8.GetString(searchResponse.ResponseBodyInBytes);
LogResponse(indexName, stringResponse);
JArray resultArray = new JArray();
var resp = JObject.Parse(stringResponse);
resultArray.Merge((JArray)((JObject)resp?["hits"])?["hits"]);
log.Info($"{this.Name} - Total records: {resultArray.Count}");
var json = new JObject(new JProperty("root", resultArray));
LogMessage(indexName, json);
public string GetFieldMapping(string indexName, string field)
if (this.esClient == null)
throw new BnsException("No connection to ElasticSearch.");
log.Info($"{this.Name} - GetFieldMapping for index: {indexName}, field: {field}");
var response = this.esClient.Indices.GetFieldMapping<StringResponse>(indexName, field);
log.Error($"{this.Name} - Error when GetFieldMapping index: {indexName}, field: {field}");
throw response.OriginalException;
LogResponse(indexName, response.Body);
var resp = JObject.Parse(Encoding.UTF8.GetString(response.ResponseBodyInBytes));
return resp.SelectTokens("$..type")?.First()?.ToString();
public void XmlDeserialize(XmlSerializationContext context)
var connCtx = context.GetChildContext("connection", true);
var poolType = connCtx.ReadElementString("connectionPoolType", "Sticky");
if ("Sniffing".Equals(poolType))
this.ConnectionPoolType = ConnectionPoolTypes.Sniffying;
else if ("Sticky".Equals(poolType))
this.ConnectionPoolType = ConnectionPoolTypes.Sticky;
this.ConnectionPoolType = ConnectionPoolTypes.Static;
this.ConnectionString = connCtx.ReadElementString("connectionString");
this.RequestTimeOut = connCtx.ReadElementAsInt32("requestTimeOut", 120);
this.UserName = connCtx.ReadElementString("userName");
var passwordFileLocation = connCtx.ReadElementString("passwordLocation");
string tempPass = File.ReadAllText(passwordFileLocation).Trim();
bool encrypt = connCtx.GetChildContext("passwordLocation").ReadAttributeAsBool("encrypted", false);
this.Password = encrypt ? BNSUtility.Decrypt(tempPass) : tempPass;
this.LogPath = connCtx.ReadElementString("logPath", String.Empty);
public void XmlSerialize(XmlSerializationContext context)
throw new NotImplementedException();
private void LogResponse(string indexName, string repsonse)
var logString = repsonse?.Length >= 2048 ? (repsonse.Substring(0, 1024) + "\n---OMITTED---\n" + repsonse.Substring(repsonse.Length - 1024)) : repsonse;
log.Info($"{this.Name} - Index: {indexName} Response: {repsonse?.Length} characters, content:\n{logString}");
private void LogMessage(string indexName, JObject json)
if (String.IsNullOrEmpty(this.LogPath))
Dictionary<string, object> parameters = new Dictionary<string, object>
{ "IndexName", indexName }
var path = FileUtility.GetPath(this.LogPath, parameters);
if (!Directory.Exists(Path.GetDirectoryName(path)))
Directory.CreateDirectory(Path.GetDirectoryName(path));
File.WriteAllText(path, json.ToString());