using System.Collections.Generic;
public static void Main()
Console.WriteLine("Hello World");
#region Fields & Properties
private readonly static ILog log = null;
public IConsumer<Ignore, string> Consumer { get; private set; }
public string ShortDescription { get { return String.Format("Connecting to servers: {0}", this.KafkaSettings.Servers); } }
public string LongDescription { get { return String.Format("Connecting to servers: {0}, listening on topics: {1}", this.KafkaSettings.Servers, this.KafkaSettings.ConsumerTopics == this.KafkaSettings.AllTopics ? "*Any*" : this.KafkaSettings.ConsumerTopics); } }
#endregion Fields & Properties
this.Consumer = new ConsumerBuilder<Ignore, string>(this.BuildConsumerConfig())
if (this.Consumer != null)
log.Error("Error when connecting: " + ex.Message, ex);
throw new Exception("Error when connecting: " + ex.Message, ex);
log.Error("Error when disconnecting: " + ex.Message, ex);
throw new Exception("Error when disconnecting: " + ex.Message, ex);
public XCMessage Receive()
if (this.Consumer == null)
return this.ReceiveInternal(this.Consumer);
public XCMessageBatch ReceiveBatch(int? maxBatchSize)
if (this.Consumer == null)
return this.ReceiveBatchInternal(this.Consumer, maxBatchSize);
private XCMessage ReceiveInternal(IConsumer<Ignore, string> consumer)
XCMessage xcMessage = null;
List<string> topics = new List<string>() { this.KafkaSettings.ConsumerTopics };
consumer.Subscribe(topics);
ConsumeResult<Ignore, string> result;
result = consumer.Consume(this.KafkaSettings.ConsumerMessageTimeoutTimeSpan);
xcMessage = new XCMessage() { Text = result.Message.Value };
log.Error("Error when receiving a message: " + ex.Message, ex);
throw new Exception("Error when receiving a message: " + ex.Message, ex);
private XCMessageBatch ReceiveBatchInternal(IConsumer<Ignore, string> consumer, int? maxMessages)
List<XCMessage> xcMessageBatch = new List<XCMessage>();
maxMessages = maxMessages == null ? 1 : maxMessages;
consumer.Subscribe(new List<string>() { this.KafkaSettings.ConsumerTopics });
ConsumeResult<Ignore, string> result;
while ((xcMessageBatch.Count < maxMessages))
result = consumer.Consume(this.KafkaSettings.ConsumerMessageTimeoutTimeSpan);
xcMessageBatch.Add(new ScotiaXCMessage(result));
return (XCMessageBatch)xcMessageBatch;
log.Error("Error when receiving a message: " + ex.Message, ex);
throw new Exception("Error when receiving a message: " + ex.Message, ex);
#region Serialization\Deserialization
public override void XmlSerialize(XmlSerializationContext context)
base.XmlSerialize(context);
public override void XmlDeserialize(XmlSerializationContext context)
base.XmlDeserialize(context);
#endregion Serialization\Deserialization