Skip to content

Commit

Permalink
Merge pull request #176 from AirQualityControl/feature/#173
Browse files Browse the repository at this point in the history
Feature/#173
  • Loading branch information
ArturLavrov authored Jan 19, 2024
2 parents 5d3e849 + 76454fd commit 056b155
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,36 @@ public UpdateStationInfoBlock(

private async Task<Message> Transform((Message, MonitoringStation) tuple)
{
var monitoringStation = tuple.Item2;
var existingStation = await _monitoringStationRepository.FindByProviderNameAsync(monitoringStation.DisplayName);
if (existingStation.IsEmpty)
try
{
_logger.LogInformation($"new station was added {monitoringStation.DisplayName}");
await _monitoringStationRepository.AddAsync(monitoringStation);
return tuple.Item1;
}
var monitoringStation = tuple.Item2;
var existingStation = await _monitoringStationRepository.FindByProviderNameAsync(monitoringStation.DisplayName);
if (existingStation.IsEmpty)
{
_logger.LogInformation($"new station was added {monitoringStation.DisplayName}");
await _monitoringStationRepository.AddAsync(monitoringStation);
return tuple.Item1;
}

var existingAirPollution = existingStation.GetAirPollution();
if(existingAirPollution == null)
{
_logger.LogWarning($"Air pollution for existing station {existingStation.DisplayName} is empty");
await UpdateStation(monitoringStation, existingStation);
}
var existingAirPollution = existingStation.GetAirPollution();
if(existingAirPollution == null)
{
_logger.LogWarning($"Air pollution for existing station {existingStation.DisplayName} is empty");
await UpdateStation(monitoringStation, existingStation);
}

var existingStationDateTime = existingStation.GetAirPollution()?.GetMeasurementsDateTime();
var receivedStationDateTime = monitoringStation.GetAirPollution()?.GetMeasurementsDateTime();
var existingStationDateTime = existingStation.GetAirPollution()?.GetMeasurementsDateTime();
var receivedStationDateTime = monitoringStation.GetAirPollution()?.GetMeasurementsDateTime();

if (receivedStationDateTime > existingStationDateTime)
if (receivedStationDateTime > existingStationDateTime)
{
await UpdateStation(monitoringStation, existingStation);
_logger.LogInformation($"A new monitoring data for station {monitoringStation.DisplayName} received. New measurement date: {receivedStationDateTime}");
}
}
catch (Exception ex)
{
await UpdateStation(monitoringStation, existingStation);
_logger.LogInformation($"A new monitoring data for station {monitoringStation.DisplayName} received. New measurement date: {receivedStationDateTime}");
_logger.LogError($"Error occured: {ex.Message}, {ex.StackTrace}");
}

return tuple.Item1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,44 @@
using System.Threading.Tasks.Dataflow;
using AirSnitch.Domain.Models;
using AirSnitch.Infrastructure.Abstract.MessageQueue;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using City = AirSnitch.Domain.Models.City;

namespace AirSnitch.Worker.AirPollutionConsumer.Pipeline
{
public class ValidateMessageBlock
{
private readonly ILogger<AirPollutionDataConsumer> _logger;
public ValidateMessageBlock(ILogger<AirPollutionDataConsumer> logger)
{
_logger = logger;
}
public TransformBlock<Message, ValueTuple<Message, MonitoringStation>> Instance => new TransformBlock<Message, ValueTuple<Message, MonitoringStation>>(Transform);

private async Task<(Message, MonitoringStation)> Transform(Message receivedMsg)

Check warning on line 22 in src/AirSnitch.Worker/AirPollutionConsumer/Pipeline/ValidateMessageBlock.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 22 in src/AirSnitch.Worker/AirPollutionConsumer/Pipeline/ValidateMessageBlock.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
{
var dataPoint = JsonConvert.DeserializeObject<DataPoint>(receivedMsg.Body);
if (dataPoint == null)
var airMonitoringStation = new MonitoringStation() { IsEmpty = false };
try
{
var dataPoint = JsonConvert.DeserializeObject<DataPoint>(receivedMsg.Body);
if (dataPoint == null)
{
throw new ArgumentException("Deserialized object is null");
}
airMonitoringStation.SetName(dataPoint.StationInfo?.StationName);
airMonitoringStation.SetLocation(GetStationLocation(dataPoint));
airMonitoringStation.SetAirPollution(GetAirPollution(dataPoint));
var stationOwner = new MonitoringStationOwner(dataPoint.DataProviderInfo?.Tag, dataPoint.DataProviderInfo?.Name);
stationOwner.SetWebSite(new Uri(dataPoint.DataProviderInfo?.Uri));
airMonitoringStation.SetOwnerInfo(stationOwner);
}
catch (Exception ex)
{
throw new ArgumentException("");
_logger.LogError($"Error occured: {ex.Message}, {ex.StackTrace}");
}

var airMonitoringStation = new MonitoringStation() { IsEmpty = false };
airMonitoringStation.SetName(dataPoint.StationInfo?.StationName);
airMonitoringStation.SetLocation(GetStationLocation(dataPoint));
airMonitoringStation.SetAirPollution(GetAirPollution(dataPoint));
var stationOwner = new MonitoringStationOwner(dataPoint.DataProviderInfo?.Tag, dataPoint.DataProviderInfo?.Name);
stationOwner.SetWebSite(new Uri(dataPoint.DataProviderInfo?.Uri));
airMonitoringStation.SetOwnerInfo(stationOwner);
return (receivedMsg, airMonitoringStation);

}

private AirPollution GetAirPollution(DataPoint dataPoint)
Expand Down

0 comments on commit 056b155

Please sign in to comment.