3
3
using Microsoft . Extensions . Hosting ;
4
4
using Microsoft . Extensions . Logging ;
5
5
using Microsoft . Extensions . Options ;
6
- using System . Threading ;
7
- using Serilog . Data ;
8
6
9
7
namespace KernelMemory . FileWatcher . Services
10
8
{
11
- internal class HttpWorker : IHostedService , IDisposable
9
+ internal class HttpWorker : BackgroundService
12
10
{
13
11
private readonly ILogger < HttpWorker > logger ;
14
12
private readonly IHttpClientFactory httpClientFactory ;
15
13
private readonly IMessageStore store ;
16
14
private readonly KernelMemoryOptions options ;
17
- private PeriodicTimer ? timer ;
18
15
19
16
public HttpWorker ( ILogger < HttpWorker > logger , IOptions < KernelMemoryOptions > options , IHttpClientFactory httpClientFactory , IMessageStore messageStore )
20
17
{
@@ -24,79 +21,72 @@ public HttpWorker(ILogger<HttpWorker> logger, IOptions<KernelMemoryOptions> opti
24
21
this . store = messageStore ;
25
22
}
26
23
27
- public async Task StartAsync ( CancellationToken cancellationToken )
24
+ protected override async Task ExecuteAsync ( CancellationToken stoppingToken )
28
25
{
29
- logger . LogInformation ( "Starting HttpWorker" ) ;
30
- timer = new PeriodicTimer ( options . Schedule ) ;
31
- while ( await timer . WaitForNextTickAsync ( cancellationToken ) )
26
+ this . logger . LogInformation ( "Starting HttpWorker" ) ;
27
+
28
+ using PeriodicTimer timer = new PeriodicTimer ( options . Schedule ) ;
29
+ try
32
30
{
33
- var messages = store . TakeAll ( ) ;
34
- if ( messages . Any ( ) )
31
+ while ( await timer . WaitForNextTickAsync ( stoppingToken ) && ! stoppingToken . IsCancellationRequested )
35
32
{
36
- var tasks = new List < Action > ( ) ;
37
-
38
- foreach ( var message in messages )
33
+ var messages = store . TakeAll ( ) ;
34
+ if ( messages . Any ( ) )
39
35
{
40
- tasks . Add ( async ( ) =>
36
+ var parallelOptions = new ParallelOptions
41
37
{
42
- if ( message . Event ? . EventType != FileEventType . Ignore )
43
- {
44
- logger . LogInformation ( $ "Processing message { message . DocumentId } for file { message . Event ? . FileName } of type { message . Event ? . EventType } ") ;
45
- var client = httpClientFactory . CreateClient ( "km-client" ) ;
46
- string endpoint ;
47
- HttpResponseMessage ? response = null ;
48
-
49
- if ( message . Event is { EventType : FileEventType . Upsert } )
50
- {
51
- endpoint = "/upload" ;
38
+ CancellationToken = stoppingToken ,
39
+ MaxDegreeOfParallelism = options . ParallelUploads
40
+ } ;
41
+ Parallel . Invoke ( parallelOptions , messages . Select ( message => ( Action ) ( ( ) => BuildMessageTask ( message , stoppingToken ) ) ) . ToArray ( ) ) ;
42
+ }
43
+ else
44
+ {
45
+ logger . LogInformation ( "Nothing to process" ) ;
46
+
47
+ }
48
+ }
49
+ } catch ( OperationCanceledException )
50
+ {
51
+ this . logger . LogInformation ( "Stopping HttpWorker" ) ;
52
+ }
53
+ }
52
54
53
- var content = new MultipartFormDataContent ( ) ;
54
- var fileContent = new StreamContent ( File . OpenRead ( message . Event . Directory ) ) ;
55
- content . Add ( fileContent , "file" , message . Event . FileName ) ;
56
- content . Add ( new StringContent ( message . Index ) , "index" ) ;
57
- content . Add ( new StringContent ( message . DocumentId ) , "documentid" ) ;
58
- response = await client . PostAsync ( endpoint , content ) ;
59
- }
60
- else if ( message . Event is { EventType : FileEventType . Delete } )
61
- {
62
- endpoint = $ "/documents?index={ message . Index } &documentId={ message . DocumentId } ";
63
- response = await client . DeleteAsync ( endpoint ) ;
64
- }
55
+ private async Task BuildMessageTask ( Message message , CancellationToken stoppingToken )
56
+ {
57
+ if ( message . Event ? . EventType != FileEventType . Ignore )
58
+ {
59
+ logger . LogInformation ( $ "Processing message { message . DocumentId } for file { message . Event ? . FileName } of type { message . Event ? . EventType } ") ;
60
+ var client = httpClientFactory . CreateClient ( "km-client" ) ;
61
+ string endpoint ;
62
+ HttpResponseMessage ? response = null ;
65
63
66
- if ( response is { IsSuccessStatusCode : true } )
67
- {
68
- logger . LogInformation ( $ "Sent message { message . DocumentId } to { options . Endpoint } ") ;
69
- }
70
- else
71
- {
72
- logger . LogError ( $ "Failed to send message { message . DocumentId } to { options . Endpoint } ") ;
73
- }
74
- }
75
- } ) ;
76
- }
64
+ if ( message . Event is { EventType : FileEventType . Upsert } )
65
+ {
66
+ endpoint = "/upload" ;
77
67
78
- var parallelOptions = new ParallelOptions
79
- {
80
- CancellationToken = cancellationToken ,
81
- MaxDegreeOfParallelism = options . ParallelUploads
82
- } ;
83
- Parallel . Invoke ( parallelOptions , tasks . ToArray ( ) ) ;
68
+ var content = new MultipartFormDataContent ( ) ;
69
+ var fileContent = new StreamContent ( File . OpenRead ( message . Event . Directory ) ) ;
70
+ content . Add ( fileContent , "file" , message . Event . FileName ) ;
71
+ content . Add ( new StringContent ( message . Index ) , "index" ) ;
72
+ content . Add ( new StringContent ( message . DocumentId ) , "documentid" ) ;
73
+ response = await client . PostAsync ( endpoint , content , stoppingToken ) ;
74
+ }
75
+ else if ( message . Event is { EventType : FileEventType . Delete } )
76
+ {
77
+ endpoint = $ "/documents?index={ message . Index } &documentId={ message . DocumentId } ";
78
+ response = await client . DeleteAsync ( endpoint , stoppingToken ) ;
79
+ }
84
80
81
+ if ( response is { IsSuccessStatusCode : true } )
82
+ {
83
+ logger . LogInformation ( $ "Sent message { message . DocumentId } to { options . Endpoint } ") ;
85
84
}
86
85
else
87
86
{
88
- logger . LogInformation ( "Nothing to process ") ;
87
+ logger . LogError ( $ "Failed to send message { message . DocumentId } to { options . Endpoint } ") ;
89
88
}
90
89
}
91
90
}
92
-
93
- public Task StopAsync ( CancellationToken cancellationToken )
94
- {
95
- return Task . CompletedTask ;
96
- }
97
- public void Dispose ( )
98
- {
99
- timer ? . Dispose ( ) ;
100
- }
101
91
}
102
92
}
0 commit comments