1+ using System ;
2+ using System . Threading . Tasks ;
3+ using Confluent . Kafka ;
4+ using Microsoft . Extensions . Configuration ;
5+ using Microsoft . Extensions . Logging ;
6+
7+ namespace Json2Kafka . Services
8+ {
9+ class ProducerService
10+ {
11+ private static IProducer < Null , string > _Producer ; // singleton du producer
12+ private static Boolean _SetupDone = false ; //pour ne pas rejouer x fois la config
13+ private static ProducerConfig _ProducerConfig ; // configuration du producer (depuis les variables d'environnement)
14+ private static string _Topic ; // (depuis les variables d'environnement)
15+ private static ILogger _logger ;
16+
17+
18+
19+ // Constructeur du singleton et de la gestion de l'obtention de son instance (paramètres interdits dans le constructeur, donc Setup depuis une méthode séparée)
20+ private static readonly ProducerService _ProducerServiceInstance = new ProducerService ( ) ;
21+ public static ProducerService GetInstance ( IConfiguration Configuration ) {
22+ if ( ! _SetupDone )
23+ {
24+ // création de la configuration Kafka à partir des infos récupèrées
25+ // dans appsettings.json qui peuvent être surchargées par
26+ // les variables d'environements pour tourner en container
27+
28+ // configs obligatoires
29+ _Topic = Configuration [ "Topic" ] ;
30+ _ProducerConfig = new ProducerConfig {
31+ ClientId = Configuration [ "ClientId" ] ,
32+ CompressionType = CompressionType . Snappy , //Compression active par défaut, faible charge CPU et 0.5x de traffic et empreinte stockage sur kafka
33+ BootstrapServers = Configuration [ "BootstrapServers" ] ,
34+ EnableSslCertificateVerification = bool . Parse ( Configuration [ "EnableSslCertificateVerification" ] ) ,
35+ EnableIdempotence = bool . Parse ( Configuration [ "EnableIdempotence" ] )
36+ } ;
37+ // configs optionnelles
38+ if ( "" != Configuration [ "SaslPassword" ] ) _ProducerConfig . SaslPassword = Configuration [ "SaslPassword" ] ;
39+ if ( "" != Configuration [ "SaslUsername" ] ) _ProducerConfig . SaslUsername = Configuration [ "SaslUsername" ] ;
40+ if ( "" != Configuration [ "SslCaLocation" ] ) _ProducerConfig . SslCaLocation = Configuration [ "SslCaLocation" ] ;
41+ if ( "" != Configuration [ "SaslMechanism" ] ) _ProducerConfig . SaslMechanism = ( SaslMechanism ) int . Parse ( Configuration [ "SaslMechanism" ] ) ;
42+ if ( "" != Configuration [ "SecurityProtocol" ] ) _ProducerConfig . SecurityProtocol = ( SecurityProtocol ) int . Parse ( Configuration [ "SecurityProtocol" ] ) ;
43+
44+ _Producer = new ProducerBuilder < Null , string > ( _ProducerConfig ) . Build ( ) ;
45+ _logger = LoggerFactory . Create ( builder => builder . AddConsole ( ) . AddConfiguration ( Configuration . GetSection ( "Logging" ) ) ) . CreateLogger ( "Json2Kafka.services.ProducerService" ) ;
46+ _SetupDone = true ;
47+ }
48+
49+ return _ProducerServiceInstance ;
50+ }
51+
52+ // méthode principale pour écrire dans Kafka
53+ public async Task < object > ProduceAwait ( object msg )
54+ {
55+ try
56+ {
57+ _logger . LogInformation ( $ "Sending message to producer [{ msg } ]") ;
58+ return await _Producer . ProduceAsync ( _Topic , new Message < Null , string > { Value = msg . ToString ( ) } ) ;
59+ }
60+ catch ( ProduceException < Null , string > e )
61+ {
62+ //Console.WriteLine($"Delivery failed: {e.Error.Reason}");
63+ _logger . LogError ( $ "Delivery failed: { e . Error . Reason } ") ;
64+ return e ;
65+ }
66+ }
67+ // méthode en cours de test
68+ private void Produce ( object msg )
69+ {
70+ Action < DeliveryReport < Null , string > > handler = r =>
71+ Console . WriteLine ( ! r . Error . IsError
72+ ? $ "Delivered message to { r . TopicPartitionOffset } "
73+ : $ "Delivery Error: { r . Error . Reason } ") ;
74+
75+ _Producer . Produce ( _Topic , new Message < Null , string > { Value = msg . ToString ( ) } , handler ) ;
76+
77+ // wait for up to 10 seconds for any inflight messages to be delivered.
78+ _Producer . Flush ( TimeSpan . FromSeconds ( 10 ) ) ;
79+
80+ }
81+ // méthode en cours de test
82+ private async Task ProduceAsync ( object msg )
83+ {
84+ try
85+ {
86+ var dr = await _Producer . ProduceAsync ( _Topic , new Message < Null , string > { Value = msg . ToString ( ) } ) ;
87+ Console . WriteLine ( $ "Delivered '{ dr . Value } ' to '{ dr . TopicPartitionOffset } '") ;
88+ }
89+ catch ( ProduceException < Null , string > e )
90+ {
91+ Console . WriteLine ( $ "Delivery failed: { e . Error . Reason } ") ;
92+ }
93+ }
94+
95+ }
96+
97+ }
0 commit comments