-
Notifications
You must be signed in to change notification settings - Fork 0
/
RedisReservationStreamService.cs
76 lines (64 loc) · 2.81 KB
/
RedisReservationStreamService.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
using System.Text.Json;
using System.Text.Json.Nodes;
using MediatR;
using PlayOfferService.Domain.Events;
using PlayOfferService.Domain.Events.Reservation;
using PlayOfferService.Domain.Repositories;
using StackExchange.Redis;
namespace PlayOfferService.Application;
public class RedisReservationStreamService : BackgroundService
{
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly CancellationToken _cancellationToken;
private readonly IDatabase _db;
private const string StreamName = "court_service.events.baseevents";
private const string GroupName = "pos.reservation.events.group";
public RedisReservationStreamService(IServiceScopeFactory serviceScopeFactory)
{
_serviceScopeFactory = serviceScopeFactory;
var tokenSource = new CancellationTokenSource();
_cancellationToken = tokenSource.Token;
var muxer = ConnectionMultiplexer.Connect("redis");
_db = muxer.GetDatabase();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using IServiceScope scope = _serviceScopeFactory.CreateScope();
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
if (!(await _db.KeyExistsAsync(StreamName)) ||
(await _db.StreamGroupInfoAsync(StreamName)).All(x=>x.Name!=GroupName))
{
await _db.StreamCreateConsumerGroupAsync(StreamName, GroupName, "0-0", true);
}
var id = string.Empty;
while (!_cancellationToken.IsCancellationRequested)
{
if (!string.IsNullOrEmpty(id))
{
await _db.StreamAcknowledgeAsync(StreamName, GroupName, id);
id = string.Empty;
}
var result = await _db.StreamReadGroupAsync(StreamName, GroupName, "pos-member", ">", 1);
if (result.Any())
{
var streamEntry = result.First();
id = streamEntry.Id;
var parsedEvent = FilterAndParseEvent(streamEntry);
if (parsedEvent == null)
continue;
await mediator.Send(parsedEvent, _cancellationToken);
}
await Task.Delay(10000);
}
}
private TechnicalReservationEvent? FilterAndParseEvent(StreamEntry value)
{
var dict = value.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
var jsonContent = JsonNode.Parse(dict.Values.First());
var eventInfo = JsonNode.Parse(jsonContent["payload"]["after"].GetValue<string>());
var entityType = eventInfo["entityType"].GetValue<string>();
if (entityType != "Reservation")
return null;
return EventParser.ParseEvent<TechnicalReservationEvent>(eventInfo);
}
}