Skip to content

Commit dd7b817

Browse files
authored
fix(parser): add template cleanup
Added template cleanup by moving dictionary to a separate service. Now we also check whether template is already in dictionary, prevent ArgumentException.
1 parent 74eb0d8 commit dd7b817

File tree

8 files changed

+147
-45
lines changed

8 files changed

+147
-45
lines changed

Packrat/Fennec.Tests/Parsers/IpFixParserTests.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Fennec.Services;
55
using NSubstitute;
66
using Serilog;
7+
using IpFixTemplateRecord = DotNetFlow.Ipfix.TemplateRecord;
78

89
namespace Fennec.Tests.Parsers;
910

@@ -15,8 +16,10 @@ public void ParseIpfixMessage()
1516
// Arrange
1617
var substituteLogger = Substitute.For<ILogger>();
1718
var substituteMetricService = Substitute.For<IMetricService>();
19+
var substituteTemplateCleanupService = Substitute.For<IIpFixCleanupService>();
20+
substituteTemplateCleanupService.TemplateRecords.Returns(new Dictionary<(IPAddress, ushort), IpFixTemplateRecord>());
1821

19-
var parser = new IpFixParser(substituteLogger, substituteMetricService);
22+
var parser = new IpFixParser(substituteLogger, substituteMetricService,substituteTemplateCleanupService);
2023
var ipfixTemplates = Convert.FromBase64String(
2124
"AAoFgGUlYdkB1iL/AAAAAAACA3wBAAAVAAgABAAMAAQAAQAIAAIACACYAAgAmQAIAAcAAgALAAIACgAEAA4ABAFfAAgABAABAIgAAQAGAAEABQABADUAAQA9AAGDegACAAAa3IN4AAIAABrcg3kAAQAAGtwA0gABAQEAEgAIAAQADAAEAAEACAACAAgAmAAIAJkACAAKAAQADgAEAAQAAQCIAAEABQABADUAAQA9AAEBXwAIg3oAAgAAGtyDeAACAAAa3IN5AAEAABrcANIAAgECABUAGwAQABwAEAABAAgAAgAIAJgACACZAAgABwACAAsAAgAKAAQADgAEAV8ACAAEAAEAiAABAAYAAQAFAAEANQABAD0AAYN6AAIAABrcg3gAAgAAGtyDeQABAAAa3ADSAAEBAwASABsAEAAcABAAAQAIAAIACACYAAgAmQAIAAoABAAOAAQABAABAIgAAQAFAAEANQABAD0AAQFfAAiDegACAAAa3IN4AAIAABrcg3kAAQAAGtwA0gACAQQAGQAIAAQADAAEAAEACAACAAgAmAAIAJkACAAHAAIACwACAAoABAAOAAQABAABAIgAAQAGAAEABQABADUAAQA9AAEBXwAIg3EABAAAGtyDcgAEAAAa3IN2AAIAABrcg3cAAgAAGtyDcAABAAAa3IN6AAIAABrcg3gAAgAAGtyDeQABAAAa3AEFABcACAAEAAwABAABAAgAAgAIAJgACACZAAgABwACAAsAAgAKAAQADgAEAAQAAQCIAAEABQABADUAAQA9AAEBXwAIg3EABAAAGtyDcgAEAAAa3INwAAEAABrcg3oAAgAAGtyDeAACAAAa3IN5AAEAABrcANIAAQEGABkACAAEAAwABAABAAgAAgAIAJgACACZAAgABwACAAsAAgAKAAQADgAEAAQAAQCIAAEABgABAAUAAQA1AAEAPQABAV8ACINzABAAABrcg3QAEAAAGtyDdgACAAAa3IN3AAIAABrcg3AAAQAAGtyDegACAAAa3IN4AAIAABrcg3kAAQAAGtwBBwAXAAgABAAMAAQAAQAIAAIACACYAAgAmQAIAAcAAgALAAIACgAEAA4ABAAEAAEABQABADUAAQA9AAEAiAABAV8ACINzABAAABrcg3QAEAAAGtyDcAABAAAa3IN6AAIAABrcg3gAAgAAGtyDeQABAAAa3ADSAAEAAgDMAQgAFQAIAAQADAAEAAEACAACAAgAmAAIAJkACAAHAAIACwACAAoABAAOAAQBXwAIAAQAAQCIAAEABgABAAUAAQA1AAEAPQABg3oAAgAAGtyDeAACAAAa3IN5AAEAABrcANIAAQEJABUAGwAQABwAEAABAAgAAgAIAJgACACZAAgABwACAAsAAgAKAAQADgAEAV8ACAAEAAEAiAABAAYAAQAFAAEANQABAD0AAYN6AAIAABrcg3gAAgAAGtyDeQABAAAa3ADSAAEAAgDMAQoAFQAIAAQADAAEAAEACAACAAgAmAAIAJkACAAHAAIACwACAAoABAAOAAQBXwAIAAQAAQCIAAEABgABAAUAAQA1AAEAPQABg3oAAgAAGtyDeAACAAAa3IN5AAEAABrcANIAAQELABUAGwAQABwAEAABAAgAAgAIAJgACACZAAgABwACAAsAAgAKAAQADgAEAV8ACAAEAAEAiAABAAYAAQAFAAEANQABAD0AAYN6AAIAABrcg3gAAgAAGtyDeQABAAAa3ADSAAEAAgBcAQwAEgAIAAQADAAEAAEACAACAAgAmAAIAJkACAAKAAQADgAEAAQAAQCIAAEABQABADUAAQA9AAEBXwAIg3oAAgAAGtyDeAACAAAa3IN5AAEAABrcANIAAg==");
2225
var ipfixData = Convert.FromBase64String(

Packrat/Fennec.Tests/Parsers/NetFlow9ParserTests.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Net;
22
using System.Net.Sockets;
3+
using NetFlow9TemplateRecord = DotNetFlow.Netflow9.TemplateRecord;
34
using Fennec.Parsers;
45
using Fennec.Services;
56
using NSubstitute;
@@ -15,8 +16,10 @@ public void ParseNetFlow9Message()
1516
// Arrange
1617
var substituteLogger = Substitute.For<ILogger>();
1718
var substituteMetricService = Substitute.For<IMetricService>();
19+
var substituteTemplateCleanupService = Substitute.For<INetFlow9CleanupService>();
20+
substituteTemplateCleanupService.TemplateRecords.Returns(new Dictionary<(IPAddress, ushort), NetFlow9TemplateRecord>());
1821

19-
var parser = new NetFlow9Parser(substituteLogger, substituteMetricService);
22+
var parser = new NetFlow9Parser(substituteLogger, substituteMetricService, substituteTemplateCleanupService);
2023
var netflowTemplates = Convert.FromBase64String(
2124
"AAkAAgBaBI9k0SU7AAAAaAAAAAIBAwBRAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAzC8AxwKgAlgG71i0GAAAAAQBZiYAAWYmAAAApaQAAAAAAAABAAAAAAAAAAAEAAAA8AQMADQAbABAAHAAQAAgABAAMAAQABwACAAsAAgAEAAEAMAAEABYABAAVAAQACgAEAAEACAACAAg=");
2225
var netflowData = Convert.FromBase64String(
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using Microsoft.Extensions.Options;
2+
3+
namespace Fennec.Options;
4+
5+
/// <summary>
6+
/// Options for cleaning up IPFIX and NetFlow9 templates.
7+
/// </summary>
8+
public class TemplateCleanupOptions
9+
{
10+
public TimeSpan IpFixCleanupInterval { get; set; } = TimeSpan.FromDays(2);
11+
public TimeSpan NetFlow9CleanupInterval { get; set; } = TimeSpan.FromDays(2);
12+
}

Packrat/Fennec/Parsers/IpFixParser.cs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Net;
1+
using System.Collections.Concurrent;
2+
using System.Net;
23
using System.Net.Sockets;
34
using DotNetFlow.Ipfix;
45
using Fennec.Database;
@@ -16,22 +17,21 @@ public class IpFixParser : IParser
1617
{
1718
private readonly ILogger _log;
1819
// TODO: expand _templateRecords to a service, can be used to display/monitor templates in frontend
19-
private readonly IDictionary<(IPAddress, ushort), TemplateRecord> _templateRecords;
2020
private readonly IMetricService _metricService;
21+
private readonly IIpFixCleanupService _templateCleanupService;
2122

22-
public IpFixParser(ILogger log, IMetricService metricService)
23+
public IpFixParser(ILogger log, IMetricService metricService, IIpFixCleanupService templateCleanupService)
2324
{
2425
_log = log.ForContext<IpFixParser>();
25-
_templateRecords = new Dictionary<(IPAddress, ushort), TemplateRecord>();
2626
_metricService = metricService;
27+
_templateCleanupService = templateCleanupService;
2728
}
2829

2930
public IEnumerable<TraceImportInfo> Parse(UdpReceiveResult result)
3031
{
31-
// result.RemoteEndPoint.Address --> address of exporter
3232
var stream = new MemoryStream(result.Buffer);
3333

34-
using var ipfixReader = new IpfixReader(stream, 0, _templateRecords.Values);
34+
using var ipfixReader = new IpfixReader(stream, 0, _templateCleanupService.TemplateRecords.Values);
3535
_ = ipfixReader.ReadPacketHeader();
3636

3737
// Process each set in the IPFIX message. Has to be while(true) because header doesn't provide a count of sets.
@@ -45,7 +45,7 @@ public IEnumerable<TraceImportInfo> Parse(UdpReceiveResult result)
4545
{
4646
case DataSet dataSet:
4747
var key = (result.RemoteEndPoint.Address, set.ID);
48-
if (!_templateRecords.TryGetValue(key, out var template))
48+
if (!_templateCleanupService.TemplateRecords.TryGetValue(key, out var template))
4949
{
5050
_log.Warning("Could not parse data set... " +
5151
"Reading this set requires a not yet transmitted " +
@@ -58,7 +58,9 @@ public IEnumerable<TraceImportInfo> Parse(UdpReceiveResult result)
5858
case TemplateSet templateSet:
5959
foreach (var templateRecord in templateSet.Records)
6060
{
61-
_templateRecords.Add((result.RemoteEndPoint.Address, templateRecord.ID), templateRecord);
61+
if (_templateCleanupService.TemplateRecords.ContainsKey((result.RemoteEndPoint.Address, templateRecord.ID)))
62+
continue;
63+
_templateCleanupService.TemplateRecords.Add((result.RemoteEndPoint.Address, templateRecord.ID), templateRecord);
6264
_log.Information("Received new template set with id #{TemplateSetId}", templateRecord.ID);
6365
}
6466

Packrat/Fennec/Parsers/NetFlow9Parser.cs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,33 +13,33 @@ namespace Fennec.Parsers;
1313
public class NetFlow9Parser : IParser
1414
{
1515
private readonly ILogger _log;
16-
private readonly IDictionary<(IPAddress, ushort), TemplateRecord> _templateRecords; // matches (ExporterIp, TemplateId) to TemplateRecord
1716
private readonly IMetricService _metricService;
18-
19-
public NetFlow9Parser(ILogger log, IMetricService metricService)
17+
private readonly INetFlow9CleanupService _templateCleanupService;
18+
19+
public NetFlow9Parser(ILogger log, IMetricService metricService, INetFlow9CleanupService templateCleanupService)
2020
{
2121
_log = log.ForContext<NetFlow9Parser>();
22-
_templateRecords = new Dictionary<(IPAddress, ushort), TemplateRecord>();
2322
_metricService = metricService;
23+
_templateCleanupService = templateCleanupService;
2424
}
2525
public IEnumerable<TraceImportInfo> Parse(UdpReceiveResult result)
2626
{
2727
var stream = new MemoryStream(result.Buffer);
28-
using var nr = new NetflowReader(stream, 0, _templateRecords.Values);
28+
using var nr = new NetflowReader(stream, 0, _templateCleanupService.TemplateRecords.Values);
2929
var header = nr.ReadPacketHeader();
3030

3131
for (var i = 0; i < header.Count; i++)
3232
{
3333
try
3434
{
35-
var dict = _templateRecords.Values.ToDictionary(t => t.ID, t => t);
35+
var dict = _templateCleanupService.TemplateRecords.Values.ToDictionary(t => t.ID, t => t);
3636
var set = nr.ReadFlowSet(dict);
3737

3838
switch (set)
3939
{
4040
case DataFlowSet dataFlowSet:
4141
var key = (result.RemoteEndPoint.Address, set.ID);
42-
if (!_templateRecords.TryGetValue(key, out var template))
42+
if (!_templateCleanupService.TemplateRecords.TryGetValue(key, out var template))
4343
{
4444
_log.Warning("Could not parse data set... " +
4545
"Reading this set requires a not yet transmitted " +
@@ -52,7 +52,9 @@ public IEnumerable<TraceImportInfo> Parse(UdpReceiveResult result)
5252
case TemplateFlowSet templateFlowSet:
5353
foreach (var templateRecord in templateFlowSet.Records)
5454
{
55-
_templateRecords.Add((result.RemoteEndPoint.Address, templateRecord.ID), templateRecord);
55+
if (_templateCleanupService.TemplateRecords.ContainsKey((result.RemoteEndPoint.Address, templateRecord.ID)))
56+
continue;
57+
_templateCleanupService.TemplateRecords.Add((result.RemoteEndPoint.Address, templateRecord.ID), templateRecord);
5658
_log.Information("Received new template set with id #{TemplateSetId}", templateRecord.ID);
5759
}
5860

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
using System.Collections.Concurrent;
2+
using System.Net;
3+
using DotNetFlow.Ipfix;
4+
using Fennec.Options;
5+
using Microsoft.Extensions.Options;
6+
using IpFixTemplateRecord = DotNetFlow.Ipfix.TemplateRecord;
7+
using Netflow9TemplateRecord = DotNetFlow.Netflow9.TemplateRecord;
8+
9+
namespace Fennec.Services;
10+
11+
/// <summary>
12+
/// Interface for cleanup service for IPFIX templates
13+
/// </summary>
14+
public interface IIpFixCleanupService
15+
{
16+
Dictionary<(IPAddress, ushort), IpFixTemplateRecord> TemplateRecords { get; set; }
17+
}
18+
19+
/// <summary>
20+
/// Interface for cleanup service for NetFlow9 templates
21+
/// </summary>
22+
public interface INetFlow9CleanupService
23+
{
24+
Dictionary<(IPAddress, ushort), Netflow9TemplateRecord> TemplateRecords { get; set; }
25+
}
26+
27+
/// <summary>
28+
/// Cleanup service for IPFIX templates
29+
/// </summary>
30+
public class IpFixTemplateCleanupService : BackgroundService, IIpFixCleanupService
31+
{
32+
public Dictionary<(IPAddress, ushort), IpFixTemplateRecord> TemplateRecords { get; set; }
33+
private readonly TimeSpan _cleanupInterval;
34+
35+
public IpFixTemplateCleanupService(IOptions<TemplateCleanupOptions> options)
36+
{
37+
TemplateRecords = new Dictionary<(IPAddress, ushort), IpFixTemplateRecord>();
38+
_cleanupInterval = options.Value.IpFixCleanupInterval;
39+
}
40+
41+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
42+
{
43+
while (!stoppingToken.IsCancellationRequested)
44+
{
45+
await Task.Delay(_cleanupInterval, stoppingToken);
46+
47+
TemplateRecords.Clear();
48+
}
49+
}
50+
}
51+
52+
/// <summary>
53+
/// Cleanup service for NetFlow9 templates
54+
/// </summary>
55+
public class NetFlow9TemplateCleanupService : BackgroundService, INetFlow9CleanupService
56+
{
57+
public Dictionary<(IPAddress, ushort), Netflow9TemplateRecord> TemplateRecords { get; set; }
58+
private readonly TimeSpan _cleanupInterval;
59+
60+
61+
public NetFlow9TemplateCleanupService(IOptions<TemplateCleanupOptions> options)
62+
{
63+
TemplateRecords = new Dictionary<(IPAddress, ushort), Netflow9TemplateRecord>();
64+
_cleanupInterval = options.Value.NetFlow9CleanupInterval;
65+
}
66+
67+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
68+
{
69+
while (!stoppingToken.IsCancellationRequested)
70+
{
71+
await Task.Delay(_cleanupInterval, stoppingToken);
72+
73+
TemplateRecords.Clear();
74+
}
75+
}
76+
}

Packrat/Fennec/Startup.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,18 @@ public void ConfigureServices(IServiceCollection services, IWebHostEnvironment e
7474
// Metric service
7575
services.AddSingleton<IMetricService, MetricService>();
7676

77+
// Template cleanup services
78+
services.Configure<TemplateCleanupOptions>(Configuration.GetSection("TemplateCleanupOptions"));
79+
80+
services.AddSingleton<IIpFixCleanupService, IpFixTemplateCleanupService>();
81+
services.AddSingleton<INetFlow9CleanupService, NetFlow9TemplateCleanupService>();
82+
83+
// services.AddSingleton<IpFixTemplateCleanupService>();
84+
// services.AddSingleton<NetFlow9TemplateCleanupService>();
85+
86+
// services.AddHostedService<IpFixTemplateCleanupService>();
87+
// services.AddHostedService<NetFlow9TemplateCleanupService>();
88+
7789
// Database services
7890
services.AddSingleton<ITraceRepository, TraceRepository>();
7991
services.AddSingleton<ITimeService, TimeService>();

Packrat/Fennec/appsettings.json

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,17 @@
55
"Microsoft.AspNetCore": "Warning"
66
}
77
},
8-
98
"_Startup": "Settings that will only be used at startup.",
109
"Startup": {
1110
"_EnableSwagger": "Enable or disable the Swagger UI.",
1211
"EnableSwagger": false,
1312
"_AllowCors": "Allow CORS for every request.",
1413
"AllowCors": false
15-
},
16-
14+
},
1715
"ConnectionStrings": {
1816
"_MongoConnection": "The connection string to the MongoDB database.",
1917
"MongoConnection": "mongodb://mongo:27017"
2018
},
21-
2219
"_Multiplexers": "Define ports and for what protocols to listen on these ports.",
2320
"Multiplexers": [
2421
{
@@ -35,15 +32,15 @@
3532
"Enabled": true,
3633
"Name": "Ipfix Multiplexer",
3734
"ListeningPort": 2056,
38-
"Parsers": [ "Ipfix" ]
35+
"Parsers": [
36+
"Ipfix"
37+
]
3938
}
4039
],
41-
4240
"_Security": "Security configuration.",
4341
"Security": {
4442
"_Enabled": "Enable or disable the requirement for authentication to access protected endpoints.",
4543
"Enabled": true,
46-
4744
"_Access": "Configuration for users and how accessing endpoints should behave.",
4845
"Access": {
4946
"_Username": "The username of the default user. Only generated if no users exist in the database.",
@@ -52,49 +49,46 @@
5249
"Password": null
5350
}
5451
},
55-
52+
"_TemplateCleanupOptions": "TemplateCleanup configuration.",
53+
"TemplateCleanupOptions": {
54+
"_IpFixCleanupInterval": "Interval at which the IpFixCleanupService should check for expired templates. Format is dd:hh:mm:ss.",
55+
"IpFixCleanupInterval": "02:00:00:00",
56+
"_NetFlow9CleanupInterval": "Interval at which the NetFlow9CleanupService should check for expired templates. Format is dd:hh:mm:ss.",
57+
"NetFlow9CleanupInterval": "02:00:00:00"
58+
},
5659
"_DnsCache": "DnsCache configuration.",
5760
"DnsCache": {
5861
"_Enabled": "Enable or disable the DnsCacheService.",
5962
"Enabled": true,
60-
6163
"_CleanupInterval": "Interval at which the DnsCacheCleanupService should check for expired DNS records. Format is dd:hh:mm:ss.",
6264
"CleanupInterval": "00:06:00:00",
63-
6465
"_InvalidationDuration": "Duration after which a DNS record is considered invalid. Format is dd:hh:mm:ss.",
6566
"InvalidationDuration": "00:01:00:00"
6667
},
67-
6868
"_DuplicateFlagging": "DuplicateFlagging configuration",
6969
"DuplicateFlagging": {
7070
"_ClaimExpirationLifespan": "How long a flow exporter has the claim to be the origin for traces.",
7171
"ClaimExpirationLifespan": "00:10:00",
72-
7372
"_CleanupInterval": "Interval between cleaning up unused claims.",
7473
"CleanupInterval": "01:00:00"
7574
},
76-
7775
"_TagsRequest": "Configuration for the option to request tag from a VMware machine.",
7876
"TagsRequest": {
7977
"_Enabled": "Enable or disable the tag service.",
8078
"Enabled": false,
81-
8279
"_VmWareRequest": "Configuration for the requesting of the tags.",
8380
"VmWareRequest": {
8481
"_VmWareTargetAddress": "The address of the target.",
8582
"VmWareTargetAddress": "",
86-
87-
"_MaxRequestPerSecond" : "Maximum request per second",
88-
"MaxRequestPerSecond" : "10",
89-
83+
"_MaxRequestPerSecond": "Maximum request per second",
84+
"MaxRequestPerSecond": "10",
9085
"_VmWareCredentials": "Credentials for the VMWare API.",
9186
"VmWareCredentials": {
9287
"_Username": "The username of the vmware client for the API.",
9388
"Username": "",
9489
"_Password": "The password of the vmware client for the API.",
9590
"Password": ""
9691
},
97-
9892
"_VmWareApiPaths": "Paths for getting certain information from the Vmware API.",
9993
"VmWareApiPaths": {
10094
"_SessionPath": "Path for getting a session token.",
@@ -104,18 +98,16 @@
10498
}
10599
}
106100
},
107-
108-
"_TagsCache" : "Configurations for the tag cache service.",
109-
"TagsCache" : {
110-
"_RefreshPeriod" : "The time interval to send request to the target ip to get the tags. Format is dd:hh:mm:ss",
111-
"RefreshPeriod" : "12:00:00"
101+
"_TagsCache": "Configurations for the tag cache service.",
102+
"TagsCache": {
103+
"_RefreshPeriod": "The time interval to send request to the target ip to get the tags. Format is dd:hh:mm:ss",
104+
"RefreshPeriod": "12:00:00"
112105
},
113-
114106
"_FlowImportMetrics": "Configuration for the flowImport",
115107
"FlowImportMetrics": {
116108
"_TraceSummationPeriod": "Defines how often the flow should be saved into a combined flow",
117-
"TraceSummationPeriod": "00:01:00",
118-
"_FlowSavePeriod" : "Defines for what period the flowImports should be saved",
109+
"TraceSummationPeriod": "00:01:00",
110+
"_FlowSavePeriod": "Defines for what period the flowImports should be saved",
119111
"FlowSavePeriod": "12:00:00"
120112
}
121113
}

0 commit comments

Comments
 (0)