using System.Collections.Generic;
public static void Main()
Console.WriteLine("Hello World");
[XmlType("kafkaProducer")]
public class KafkaProducer : KafkaBase, IMessageSender, IXmlSerializable, IDisposable
#region Fields & Properties
private static ILog log = ScotiaLogManagerWrapper.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
public IProducer<Null, string> Producer { get; private set; }
public Dictionary<string, string> ProducerConfig { get; private set; }
public string ShortDescription { get { return String.Format("Connecting to servers: {0}", this.KafkaSettings.Servers); } }
public string LongDescription { get { return String.Format("Connecting to servers: {0} on topics: {1}", this.KafkaSettings.Servers, this.KafkaSettings.ProducerTopic); } }
#endregion Fields & Properties
this.Producer = new ProducerBuilder<Null, string>(this.BuildProducerConfig())
if (this.Producer != null)
log.Error("Error when connecting: " + ex.Message, ex);
throw new Exception("Error when connecting: " + ex.Message, ex);
Producer.Flush(this.KafkaSettings.FlushTimeoutTimeSpan);
log.Error("Error when disconnecting: " + ex.Message, ex);
throw new Exception("Error when disconnecting: " + ex.Message, ex);
public XCMessage Send(XCMessage message)
if (this.Producer == null)
return this.SendInternal(message, this.Producer);
private XCMessage SendInternal(XCMessage message, IProducer<Null, string> producer)
Message<Null, string> kafkaMessage = new Message<Null, string>() { Value = message.Text };
DeliveryResult<Null, string> result = producer.ProduceAsync(this.KafkaSettings.ProducerTopic, kafkaMessage).GetAwaiter().GetResult();
return new ScotiaXCMessage(result);
log.Error("Error when sending a message: " + ex.Message, ex);
throw new Exception("Error when sending a message: " + ex.Message, ex);
#region Serialization\Deserialization
public override void XmlSerialize(XmlSerializationContext context)
base.XmlSerialize(context);
public override void XmlDeserialize(XmlSerializationContext context)
base.XmlDeserialize(context);
#endregion Serialization\Deserialization