Skip to content

Commit

Permalink
Implements support for the stream metadata (#53)
Browse files Browse the repository at this point in the history
* Implements support for the stream metadata

Part of SneaksAndData/arcane-framework#113

* Fix incorrect usage of incoming contract

* Build fix

* Use the stream metadata interface from Arcane.Framework v0.0.37
  • Loading branch information
s-vitaliy authored Oct 16, 2024
1 parent f978fb4 commit ab3f435
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 2 deletions.
16 changes: 16 additions & 0 deletions .helm/templates/crd-sql-server-change-tracking.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,22 @@ spec:
description: How long to wait before polling for next result set. Can be from 1 to 60 seconds.
minimum: 1
maximum: 60
streamMetadata:
type: object
optional: true
default: null
properties:
partitions:
type: array
items:
type: object
properties:
name:
type: string
fieldName:
type: string
fieldFormat:
type: string
status:
type: object
properties:
Expand Down
2 changes: 1 addition & 1 deletion src/Arcane.Stream.SqlServerChangeTracking.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Arcane.Framework" Version="0.0.36" />
<PackageReference Include="Arcane.Framework" Version="0.0.37" />
</ItemGroup>

</Project>
33 changes: 33 additions & 0 deletions src/Models/SqlServerChangeTrackingStreamContext.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
using System;
using System.Linq;
using System.Text.Json.Serialization;
using Akka.Util;
using Arcane.Framework.Configuration;
using Arcane.Framework.Services.Base;
using Arcane.Framework.Services.Models;
using Arcane.Framework.Sinks.Models;

namespace Arcane.Stream.SqlServerChangeTracking.Models;

Expand Down Expand Up @@ -69,6 +73,35 @@ public class SqlServerChangeTrackingStreamContext : IStreamContext, IStreamConte
/// <inheritdoc cref="IStreamContext.StreamKind"/>
public string StreamKind { get; private set; }

/// <summary>
/// Property to hold stream metadata received from the stream context.
/// </summary>
[JsonPropertyName("streamMetadata")]
public StreamMetadataDefinition StreamMetadataProperty { get; set; }

/// <inheritdoc cref="IStreamContext.StreamMetadata"/>
[JsonIgnore]
public Option<StreamMetadata> StreamMetadata
{
get
{
if (this.StreamMetadataProperty is null)
{
return Option<StreamMetadata>.None;
}

var partitions = this.StreamMetadataProperty
.Partitions
.Select(partition => new StreamPartition
{
Name = partition.Name,
FieldName = partition.FieldName,
FieldFormat = partition.FieldFormat
}).ToArray();
return new StreamMetadata(partitions);
}
}

/// <inheritdoc cref="IStreamContextWriter.SetStreamId"/>
public void SetStreamId(string streamId)
{
Expand Down
5 changes: 4 additions & 1 deletion src/Services/SqlServerChangeTrackgingGraphBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
using System.Threading.Tasks;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Util;
using Arcane.Framework.Contracts;
using Arcane.Framework.Services.Base;
using Arcane.Framework.Sinks.Models;
using Arcane.Framework.Sinks.Parquet;
using Arcane.Framework.Sources.Exceptions;
using Arcane.Framework.Sources.SqlServer;
Expand Down Expand Up @@ -51,7 +53,8 @@ ILogger<SqlServerChangeTrackingGraphBuilder> logger
rowGroupsPerFile: context.GroupsPerFile,
createSchemaFile: true,
dataSinkPathSegment: context.IsBackfilling ? "backfill" : "data",
dropCompletionToken: context.IsBackfilling);
dropCompletionToken: context.IsBackfilling,
streamMetadata: context.StreamMetadata.GetOrElse(new StreamMetadata(Option<StreamPartition[]>.None)));

return Source
.FromGraph(source)
Expand Down

0 comments on commit ab3f435

Please sign in to comment.