12
12
13
13
namespace Bluekiri . Consumer
14
14
{
15
+ /// <summary>
16
+ /// Worker for consume.
17
+ /// </summary>
18
+ /// <typeparam name="TConsumerOptions"><see cref="ConsumerOptions"/></typeparam>
15
19
public class ConsumerService < TConsumerOptions > : BackgroundService where TConsumerOptions : ConsumerOptions , new ( )
16
20
{
17
21
private readonly IBrokerConsumer _consumer ;
@@ -20,6 +24,17 @@ namespace Bluekiri.Consumer
20
24
private readonly IEnumerable < IMessageFormatter > _formatters ;
21
25
private readonly ILogger _logger ;
22
26
27
+ private const string ContentType = "ContentType" ;
28
+ private const string MessageType = "MessageType" ;
29
+
30
+ /// <summary>
31
+ /// Constructor.
32
+ /// </summary>
33
+ /// <param name="consumer"><see cref="IBrokerConsumer"/></param>
34
+ /// <param name="handlerManager"><see cref="IHandlerManager"/></param>
35
+ /// <param name="factory"><see cref="IHandlerMessageFactory"/></param>
36
+ /// <param name="formatters"><see cref="IEnumerable{IMessageFormatter}"/></param>
37
+ /// <param name="logger"><see cref="ILogger{ConsumerService}"/></param>
23
38
public ConsumerService (
24
39
IBrokerConsumer consumer ,
25
40
IHandlerManager handlerManager ,
@@ -33,7 +48,10 @@ public ConsumerService(
33
48
_formatters = formatters ;
34
49
_logger = logger ;
35
50
}
36
-
51
+ /// <summary>
52
+ /// Backgroundservice override.
53
+ /// </summary>
54
+ /// <param name="stoppingToken"><see cref="CancellationToken"/></param>
37
55
protected override async Task ExecuteAsync ( CancellationToken stoppingToken )
38
56
{
39
57
while ( ! stoppingToken . IsCancellationRequested )
@@ -42,10 +60,19 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
42
60
{
43
61
var consumeResult = _consumer . Consume ( stoppingToken ) ;
44
62
if ( consumeResult is null ) continue ;
63
+ if ( ! consumeResult . Headers . ContainsKey ( ContentType ) )
64
+ {
65
+ _logger . LogError ( "Headers not contains key ContentType" ) ;
66
+ continue ;
67
+ }
68
+ if ( ! consumeResult . Headers . ContainsKey ( MessageType ) )
69
+ {
70
+ _logger . LogError ( "Headers not contains key MessageType" ) ;
71
+ continue ;
72
+ }
45
73
46
-
47
- var contentType = Encoding . UTF8 . GetString ( consumeResult . Headers [ "ContentType" ] ) ;
48
- var messageType = Encoding . UTF8 . GetString ( consumeResult . Headers [ "MessageType" ] ) ;
74
+ var contentType = Encoding . UTF8 . GetString ( consumeResult . Headers [ ContentType ] ) ;
75
+ var messageType = Encoding . UTF8 . GetString ( consumeResult . Headers [ MessageType ] ) ;
49
76
50
77
if ( string . IsNullOrWhiteSpace ( contentType ) )
51
78
{
@@ -59,7 +86,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
59
86
continue ;
60
87
}
61
88
62
- var formatter = _formatters . FirstOrDefault ( f => f . ContentType . Equals ( contentType ) ) ;
89
+ var formatter = _formatters . Where ( f => f . ContentType . Equals ( contentType ) ) . FirstOrDefault ( ) ;
63
90
64
91
if ( formatter is null )
65
92
{
@@ -73,9 +100,9 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
73
100
continue ;
74
101
}
75
102
var result = formatter . Deserialize ( consumeResult . Message , modelType ) ;
76
- await _factory . Execute ( result , stoppingToken ) . ConfigureAwait ( false ) ;
103
+ await _factory . ExecuteAsync ( result , stoppingToken ) . ConfigureAwait ( false ) ;
104
+
77
105
78
- //await handler.HandleAsync(result);
79
106
if ( ! _consumer . IsEnabledAutoCommit )
80
107
{
81
108
await consumeResult . CommitAsync ( ) . ConfigureAwait ( false ) ;
@@ -88,10 +115,12 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
88
115
}
89
116
}
90
117
91
-
118
+ /// <summary>
119
+ /// base dispose
120
+ /// </summary>
92
121
public override void Dispose ( )
93
122
{
94
- // _consumer?.Dispose();
123
+ _consumer ? . Dispose ( ) ;
95
124
base . Dispose ( ) ;
96
125
}
97
126
}
0 commit comments