private readonly IKafkaCommonProducer<string, RawDataMessage> _rawDataProducer;
.WithConfiguration(KafkaClusterOptions.DefaultConfigurationKey)
.AddProducer<string, RawDataMessage>(
.WithConfigKey(ConfigKeys.RawDataKafkaProducer)
.WithResiliency(DefaultPolicyNames.DefaultKafkaProducerSyncPolicy)));
.AddDefaultKafkaProducerSyncPolicies(
new ForeverRetryPolicyCfg(3000),
new CircuitBreakePolicyCfg(3, 5000)));
"Brokers": "kafka01.dev.tlv.local:9092, kafka02.dev.tlv.local:9092, kafka03.dev.tlv.local:9092",
"RawDataKafkaProducer": {
"EnableIdempotence": true,
"ProducerId": "RawDataProducer"
.AppendHeader(HeaderTypes.MessageType, nameof(rawDataMessage).ToLower())
.AppendHeader(HeaderTypes.MessageVersion, RawDataMessageVersions.Version1_0)
.AppendHeader(MetadataHeaders.CorrelationId, (rawDataMessage.MetaData.CorrelationId ?? string.Empty))
dr => _deliveryResultHandler.Post(dr),
KafkaProducerEvents.ProducingRawDataError, e, new DeviceLogInfo(rawDataMessage.DeviceId.DeviceType, rawDataMessage.DeviceId.DeviceSpecificId));