Skip to content

Commit

Permalink
Add offloader and LikesCounterUpdater
Browse files Browse the repository at this point in the history
  • Loading branch information
ImoutoChan committed Apr 30, 2024
1 parent e4759fa commit a8b2e11
Show file tree
Hide file tree
Showing 16 changed files with 235 additions and 127 deletions.
54 changes: 13 additions & 41 deletions Source/WetPicsRebirth/Commands/UserCommands/LikeCallbackHandler.cs
Original file line number Diff line number Diff line change
@@ -1,40 +1,38 @@
using System.Text.RegularExpressions;
using Telegram.Bot.Exceptions;
using WetPicsRebirth.Commands.UserCommands.Abstract;
using WetPicsRebirth.Commands.UserCommands.Abstract;
using WetPicsRebirth.Data.Entities;
using WetPicsRebirth.Data.Repositories.Abstract;
using WetPicsRebirth.EntryPoint.Service.Notifications;
using WetPicsRebirth.Extensions;
using WetPicsRebirth.Services;
using WetPicsRebirth.Services.UserAccounts;
using WetPicsRebirth.Services.LikesCounterUpdater;
using WetPicsRebirth.Services.Offload;

namespace WetPicsRebirth.Commands.UserCommands;

public partial class LikeCallbackHandler : ICallbackHandler
public class LikeCallbackHandler : ICallbackHandler
{
[GeneratedRegex("retry after (?<after>\\d+)")]
private static partial Regex RetryAfterRegex();

private const string LikeData = "vote_l";
private readonly ITelegramBotClient _telegramBotClient;

private readonly IUsersRepository _usersRepository;
private readonly IVotesRepository _votesRepository;
private readonly ILikesToFavoritesTranslatorScheduler _likesToFavoritesTranslatorScheduler;
private readonly IOffloader<Vote> _likesToFavorites;
private readonly IOffloader<MessageToUpdateCounter> _likesCounterUpdater;
private readonly ILogger<LikeCallbackHandler> _logger;

public LikeCallbackHandler(
IUsersRepository usersRepository,
IVotesRepository votesRepository,
ITelegramBotClient telegramBotClient,
ILikesToFavoritesTranslatorScheduler likesToFavoritesTranslatorScheduler,
ILogger<LikeCallbackHandler> logger)
IOffloader<Vote> likesToFavorites,
ILogger<LikeCallbackHandler> logger,
IOffloader<MessageToUpdateCounter> likesCounterUpdater)
{
_usersRepository = usersRepository;
_votesRepository = votesRepository;
_telegramBotClient = telegramBotClient;
_likesToFavoritesTranslatorScheduler = likesToFavoritesTranslatorScheduler;
_likesToFavorites = likesToFavorites;
_logger = logger;
_likesCounterUpdater = likesCounterUpdater;
}

public async Task Handle(CallbackNotification notification, CancellationToken token)
Expand Down Expand Up @@ -66,33 +64,7 @@ await _telegramBotClient.AnswerCallbackQueryAsync(
if (counts <= 0)
return;

await _likesToFavoritesTranslatorScheduler.Schedule(vote);
await UpdateMessageWithRetries(chatId, messageId, 0, token);
}

private async Task UpdateMessageWithRetries(
long chatId,
int messageId,
int retryCount,
CancellationToken ct)
{
try
{
var currentCount = await _votesRepository.GetCountForPost(chatId, messageId);
await _telegramBotClient.EditMessageReplyMarkupAsync(
chatId,
messageId,
Keyboards.WithLikes(currentCount),
ct);
}
catch (ApiRequestException e) when (e.Message.Contains("retry after"))
{
if (retryCount > 2)
throw;

var after = int.Parse(RetryAfterRegex().Match(e.Message).Groups["after"].Value);
await Task.Delay(after * 1000, ct);
await UpdateMessageWithRetries(chatId, messageId, retryCount + 1, ct);
}
await _likesToFavorites.Offload(vote);
await _likesCounterUpdater.Offload(new(chatId, messageId));
}
}
2 changes: 0 additions & 2 deletions Source/WetPicsRebirth/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using WetPicsRebirth.Data;
using WetPicsRebirth.Extensions;
using WetPicsRebirth.Services;
using WetPicsRebirth.Services.UserAccounts;

namespace WetPicsRebirth;

Expand Down Expand Up @@ -36,7 +35,6 @@ private static IHostBuilder CreateHostBuilder(string[] args) =>
.ConfigureWebHostDefaults(webBuilder => webBuilder.UseStartup<Startup>())
.ConfigureSerilog()
.ConfigureServices(x => x.AddHostedService<TelegramHostedService>())
.ConfigureServices(x => x.AddHostedService<LikesToFavoritesTranslatorHostedService>())
.ConfigureServices(x => x.AddQuartzHostedService());


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace WetPicsRebirth.Services.LikesCounterUpdater;

public interface ILikesCounterUpdater
{
Task Update(MessageToUpdateCounter message);
}

public record MessageToUpdateCounter(long ChatId, int MessageId);
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System.Text.RegularExpressions;
using Telegram.Bot.Exceptions;
using WetPicsRebirth.Data.Repositories.Abstract;

namespace WetPicsRebirth.Services.LikesCounterUpdater;

internal partial class LikesCounterUpdater : ILikesCounterUpdater
{
[GeneratedRegex("retry after (?<after>\\d+)")]
private static partial Regex RetryAfterRegex();

private readonly ITelegramBotClient _telegramBotClient;
private readonly IVotesRepository _votesRepository;

public LikesCounterUpdater(ITelegramBotClient telegramBotClient, IVotesRepository votesRepository)
{
_telegramBotClient = telegramBotClient;
_votesRepository = votesRepository;
}

public async Task Update(MessageToUpdateCounter message)
=> await UpdateMessageWithRetries(message.ChatId, message.MessageId, 0);

private async Task UpdateMessageWithRetries(
long chatId,
int messageId,
int retryCount,
CancellationToken ct = default)
{
try
{
var currentCount = await _votesRepository.GetCountForPost(chatId, messageId);
await _telegramBotClient.EditMessageReplyMarkupAsync(
chatId,
messageId,
Keyboards.WithLikes(currentCount),
ct);
}
catch (ApiRequestException e) when (e.Message.Contains("retry after"))
{
if (retryCount > 2)
throw;

var after = int.Parse(RetryAfterRegex().Match(e.Message).Groups["after"].Value);
await Task.Delay(after * 1000, ct);
await UpdateMessageWithRetries(chatId, messageId, retryCount + 1, ct);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using WetPicsRebirth.Services.Offload;

namespace WetPicsRebirth.Services.LikesCounterUpdater;

public static class LikesCounterUpdaterOffloadServiceCollectionExtensions
{
public static IServiceCollection AddLikesCounterUpdaterOffload(this IServiceCollection services)
{
services.AddTransient<ILikesCounterUpdater, LikesCounterUpdater>();
services.AddOffload<MessageToUpdateCounter>(options =>
{
options.ItemProcessor
= (x, message) => x.GetRequiredService<ILikesCounterUpdater>().Update(message);

options.ErrorLogger = (logger, message, exception) =>
logger.LogError(
exception,
"Unable to update likes counter for chat {ChatId} message {MessageId}",
message.ChatId,
message.MessageId);
});
return services;
}
}
10 changes: 10 additions & 0 deletions Source/WetPicsRebirth/Services/Offload/IOffloadReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Threading.Channels;

namespace WetPicsRebirth.Services.Offload;

internal interface IOffloadReader<T>
{
ChannelReader<T> Reader { get; }

void Complete();
}
6 changes: 6 additions & 0 deletions Source/WetPicsRebirth/Services/Offload/IOffloader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace WetPicsRebirth.Services.Offload;

public interface IOffloader<in T>
{
Task Offload(T vote);
}
55 changes: 55 additions & 0 deletions Source/WetPicsRebirth/Services/Offload/OffloadHostedService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using Microsoft.Extensions.Options;

namespace WetPicsRebirth.Services.Offload;

internal class OffloadHostedService<T> : IHostedService
{
private readonly IOffloadReader<T> _offload;
private readonly IServiceProvider _serviceProvider;
private readonly IOptions<OffloadOptions<T>> _options;
private readonly ILogger<OffloadHostedService<T>> _logger;

protected OffloadHostedService(
IOffloadReader<T> offload,
IServiceProvider serviceProvider,
IOptions<OffloadOptions<T>> options,
ILogger<OffloadHostedService<T>> logger)
{
_offload = offload;
_serviceProvider = serviceProvider;
_options = options;
_logger = logger;
}

public Task StartAsync(CancellationToken cancellationToken)
{
Task.Run(Process, cancellationToken);

return Task.CompletedTask;
}

private async Task Process()
{
var reader = _offload.Reader;

while (await reader.WaitToReadAsync())
while (reader.TryRead(out var item))
{
try
{
using var scope = _serviceProvider.CreateScope();
await _options.Value.ItemProcessor(scope.ServiceProvider, item);
}
catch (Exception e)
{
_options.Value.ErrorLogger(_logger, item, e);
}
}
}

public Task StopAsync(CancellationToken cancellationToken)
{
_offload.Complete();
return Task.CompletedTask;
}
}
8 changes: 8 additions & 0 deletions Source/WetPicsRebirth/Services/Offload/OffloadOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace WetPicsRebirth.Services.Offload;

public class OffloadOptions<T>
{
public required Func<IServiceProvider, T, Task> ItemProcessor { get; set; }

public required Action<ILogger, T, Exception> ErrorLogger { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace WetPicsRebirth.Services.Offload;

public static class OffloadServiceCollectionExtensions
{
public static IServiceCollection AddOffload<T>(
this IServiceCollection services, Action<OffloadOptions<T>> configure)
{
services.Configure(configure);

services.AddSingleton<Offloader<T>>();
services.AddTransient<IOffloader<T>>(x => x.GetRequiredService<Offloader<T>>());
services.AddTransient<IOffloadReader<T>>(x => x.GetRequiredService<Offloader<T>>());
services.AddHostedService<OffloadHostedService<T>>();

return services;
}
}
17 changes: 17 additions & 0 deletions Source/WetPicsRebirth/Services/Offload/Offloader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System.Threading.Channels;

namespace WetPicsRebirth.Services.Offload;

/// <remarks>
/// Should be registered as a singleton.
/// </remarks>>
internal class Offloader<T> : IOffloader<T>, IOffloadReader<T>
{
private Channel<T> VotesToTranslate { get; } = Channel.CreateUnbounded<T>();

public async Task Offload(T vote) => await VotesToTranslate.Writer.WriteAsync(vote);

public ChannelReader<T> Reader => VotesToTranslate.Reader;

public void Complete() => VotesToTranslate.Writer.Complete();
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using WetPicsRebirth.Data.Entities;
using WetPicsRebirth.Services.Offload;

namespace WetPicsRebirth.Services.UserAccounts;

public static class LikesToFavoritesOffloadServiceCollectionExtensions
{
public static IServiceCollection AddLikesToFavoritesOffload(this IServiceCollection services)
{
services.AddTransient<ILikesToFavoritesTranslator, LikesToFavoritesTranslator>();
services.AddOffload<Vote>(options =>
{
options.ItemProcessor = (x, vote) => x.GetRequiredService<ILikesToFavoritesTranslator>().Translate(vote);

options.ErrorLogger = (logger, vote, exception) =>
logger.LogError(
exception,
"Unable to fav post chat {ChatId} message {MessageId}",
vote.ChatId,
vote.MessageId);
});
return services;
}
}
Loading

0 comments on commit a8b2e11

Please sign in to comment.