diff --git a/Directory.Packages.props b/Directory.Packages.props index 19ce069eaf..44eeec7a94 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -54,6 +54,7 @@ + diff --git a/Orleans.sln b/Orleans.sln index 4df07f0c88..3c32abfb0d 100644 --- a/Orleans.sln +++ b/Orleans.sln @@ -223,6 +223,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Streaming.AdoNet", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Benchmarks.AdoNet", "test\Benchmarks.AdoNet\Benchmarks.AdoNet.csproj", "{B8F43537-2D2E-42A0-BE67-5E07E4313AEA}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Streaming.Kinesis", "src\AWS\Orleans.Streaming.Kinesis\Orleans.Streaming.Kinesis.csproj", "{BD85A85E-3BDE-4FAE-A705-E68A5056A828}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -593,6 +595,10 @@ Global {B8F43537-2D2E-42A0-BE67-5E07E4313AEA}.Debug|Any CPU.Build.0 = Debug|Any CPU {B8F43537-2D2E-42A0-BE67-5E07E4313AEA}.Release|Any CPU.ActiveCfg = Release|Any CPU {B8F43537-2D2E-42A0-BE67-5E07E4313AEA}.Release|Any CPU.Build.0 = Release|Any CPU + {BD85A85E-3BDE-4FAE-A705-E68A5056A828}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BD85A85E-3BDE-4FAE-A705-E68A5056A828}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BD85A85E-3BDE-4FAE-A705-E68A5056A828}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BD85A85E-3BDE-4FAE-A705-E68A5056A828}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -702,6 +708,7 @@ Global {A073C0EE-8732-42F9-A22E-D47034E25076} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23} {2B994F33-16CF-4679-936A-5AEABC529D2C} = {EB2EDE59-5021-42EE-A97A-D59939B39C66} {B8F43537-2D2E-42A0-BE67-5E07E4313AEA} = {2CAB7894-777C-42B1-8B1E-322868CE92C7} + {BD85A85E-3BDE-4FAE-A705-E68A5056A828} = {DA8E126B-BCDB-4E8F-BFB9-2DBFD41F8F70} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {7BFB3429-B5BB-4DB1-95B4-67D77A864952} diff --git a/src/AWS/Orleans.Streaming.Kinesis/Hosting/ClientBuilderExtensions.cs b/src/AWS/Orleans.Streaming.Kinesis/Hosting/ClientBuilderExtensions.cs new file mode 100644 index 0000000000..0f3f0b5ec5 --- /dev/null +++ b/src/AWS/Orleans.Streaming.Kinesis/Hosting/ClientBuilderExtensions.cs @@ -0,0 +1,28 @@ +using System; +using Orleans.Streaming.Kinesis; + +namespace Orleans.Hosting +{ + public static class ClientBuilderExtensions + { + /// + /// Configure cluster client to use Kinesis Data Stream persistent streams with default settings + /// + public static IClientBuilder AddKinesisStreams(this IClientBuilder builder, string name, Action configureOptions) + { + builder.AddKinesisStreams(name, b => + b.ConfigureKinesis(ob => ob.Configure(configureOptions))); + return builder; + } + + /// + /// Configure cluster client to use Kinesis Data Stream persistent streams. + /// + public static IClientBuilder AddKinesisStreams(this IClientBuilder builder, string name, Action configure) + { + var configurator = new ClusterClientKinesisStreamConfigurator(name, builder); + configure?.Invoke(configurator); + return builder; + } + } +} diff --git a/src/AWS/Orleans.Streaming.Kinesis/Hosting/ClusterClientKinesisStreamConfigurator.cs b/src/AWS/Orleans.Streaming.Kinesis/Hosting/ClusterClientKinesisStreamConfigurator.cs new file mode 100644 index 0000000000..aa4de2b7fa --- /dev/null +++ b/src/AWS/Orleans.Streaming.Kinesis/Hosting/ClusterClientKinesisStreamConfigurator.cs @@ -0,0 +1,33 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.Streaming.Kinesis; + +namespace Orleans.Hosting +{ + public class ClusterClientKinesisStreamConfigurator : ClusterClientPersistentStreamConfigurator + { + public ClusterClientKinesisStreamConfigurator(string name, IClientBuilder builder) + : base(name, builder, KinesisAdapterFactory.Create) + { + this.ConfigureDelegate(services => + { + services.ConfigureNamedOptionForLogging(name) + .ConfigureNamedOptionForLogging(name); + }); + } + + public ClusterClientKinesisStreamConfigurator ConfigureKinesis(Action> configureOptions) + { + this.Configure(configureOptions); + return this; + } + + public ClusterClientKinesisStreamConfigurator ConfigureKinesis(Action configureOptions) + { + this.ConfigureKinesis(ob => ob.Configure(configureOptions)); + return this; + } + } +} diff --git a/src/AWS/Orleans.Streaming.Kinesis/Hosting/SiloBuilderExtensions.cs b/src/AWS/Orleans.Streaming.Kinesis/Hosting/SiloBuilderExtensions.cs new file mode 100644 index 0000000000..bd10922723 --- /dev/null +++ b/src/AWS/Orleans.Streaming.Kinesis/Hosting/SiloBuilderExtensions.cs @@ -0,0 +1,29 @@ +using System; +using Orleans.Streaming.Kinesis; + +namespace Orleans.Hosting +{ + public static class SiloBuilderExtensions + { + /// + /// Configure silo to use Kinesis Data Stream streaming with default settings. + /// + public static ISiloBuilder AddKinesisStreams(this ISiloBuilder builder, string name, Action configureOptions) + { + builder.AddKinesisStreams(name, b => + b.ConfigureKinesis(ob => ob.Configure(configureOptions))); + return builder; + } + + /// + /// Configure silo to use Kinesis Data Stream streaming. + /// + public static ISiloBuilder AddKinesisStreams(this ISiloBuilder builder, string name, Action configure) + { + var configurator = new SiloKinesisStreamConfigurator(name, + configureServicesDelegate => builder.ConfigureServices(configureServicesDelegate)); + configure?.Invoke(configurator); + return builder; + } + } +} diff --git a/src/AWS/Orleans.Streaming.Kinesis/Hosting/SiloKinesisStreamConfigurator.cs b/src/AWS/Orleans.Streaming.Kinesis/Hosting/SiloKinesisStreamConfigurator.cs new file mode 100644 index 0000000000..ddb14bf438 --- /dev/null +++ b/src/AWS/Orleans.Streaming.Kinesis/Hosting/SiloKinesisStreamConfigurator.cs @@ -0,0 +1,45 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.Streaming.Kinesis; +using Orleans.Streams; + +namespace Orleans.Hosting +{ + public class SiloKinesisStreamConfigurator : SiloPersistentStreamConfigurator + { + public SiloKinesisStreamConfigurator(string name, Action> configureServicesDelegate) + : base(name, configureServicesDelegate, KinesisAdapterFactory.Create) + { + this.ConfigureDelegate(services => + { + services.ConfigureNamedOptionForLogging(name) + .ConfigureNamedOptionForLogging(name) + .ConfigureNamedOptionForLogging(name) + .AddTransient(sp => new StreamCheckpointerConfigurationValidator(sp, name)); + }); + } + + public SiloKinesisStreamConfigurator ConfigureKinesis(Action> configureOptions) + { + this.Configure(configureOptions); + return this; + } + + public SiloKinesisStreamConfigurator ConfigureKinesis(Action configureOptions) + { + this.ConfigureKinesis(ob => ob.Configure(configureOptions)); + return this; + } + + public SiloKinesisStreamConfigurator ConfigureCheckpointer( + Func checkpointerFactoryBuilder, + Action> configureOptions) + where TOptions : class, new() + { + this.ConfigureComponent(checkpointerFactoryBuilder, configureOptions); + return this; + } + } +} diff --git a/src/AWS/Orleans.Streaming.Kinesis/Orleans.Streaming.Kinesis.csproj b/src/AWS/Orleans.Streaming.Kinesis/Orleans.Streaming.Kinesis.csproj new file mode 100644 index 0000000000..745b50b8d3 --- /dev/null +++ b/src/AWS/Orleans.Streaming.Kinesis/Orleans.Streaming.Kinesis.csproj @@ -0,0 +1,21 @@ + + + Microsoft.Orleans.Streaming.Kinesis + Microsoft Orleans AWS Kinesis Streaming Provider + Microsoft Orleans streaming provider backed by AWS Kinesis + $(PackageTags) AWS Kinesis + $(DefaultTargetFrameworks) + $(DefineConstants);STREAMING_KINESIS + true + + + + + + + + + + + + diff --git a/src/AWS/Orleans.Streaming.Kinesis/Properties/AssemblyInfo.cs b/src/AWS/Orleans.Streaming.Kinesis/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..aad501b99e --- /dev/null +++ b/src/AWS/Orleans.Streaming.Kinesis/Properties/AssemblyInfo.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("AWSUtils.Tests")] diff --git a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterFactory.cs b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterFactory.cs new file mode 100644 index 0000000000..5c66811a63 --- /dev/null +++ b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterFactory.cs @@ -0,0 +1,175 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Amazon; +using Amazon.Kinesis; +using Amazon.Kinesis.Model; +using Amazon.Runtime; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Orleans.Configuration; +using Orleans.Providers.Streams.Common; +using Orleans.Runtime; +using Orleans.Serialization; +using Orleans.Streams; + +namespace Orleans.Streaming.Kinesis +{ + /// + /// Queue adapter factory which allows the PersistentStreamProvider to use AWS Kinesis Data Streams as its backend persistent event queue. + /// + internal class KinesisAdapterFactory : IQueueAdapterFactory, IQueueAdapter + { + private readonly KinesisStreamOptions _options; + private readonly Serializer _serializer; + private readonly IStreamQueueCheckpointerFactory _checkpointerFactory; + private readonly ILoggerFactory _loggerFactory; + private readonly IQueueAdapterCache _adapterCache; + private readonly ILogger _logger; + private readonly Func _queueMapperFactory; + private readonly AmazonKinesisClient _client; + + private HashRingBasedPartitionedStreamQueueMapper _streamQueueMapper; + + public KinesisAdapterFactory( + string name, + KinesisStreamOptions options, + SimpleQueueCacheOptions cacheOptions, + Serializer serializer, + IStreamQueueCheckpointerFactory checkpointerFactory, + ILoggerFactory loggerFactory + ) + { + _options = options ?? throw new ArgumentNullException(nameof(options)); + + Name = name; + _serializer = serializer; + _checkpointerFactory = checkpointerFactory; + _loggerFactory = loggerFactory; + _logger = loggerFactory.CreateLogger(); + + _adapterCache = new SimpleQueueAdapterCache( + cacheOptions, + name, + loggerFactory + ); + + _queueMapperFactory = partitions => new HashRingBasedPartitionedStreamQueueMapper(partitions, Name); + _client = CreateClient(); + } + + public string Name { get; } + + public bool IsRewindable => false; + + public StreamProviderDirection Direction => StreamProviderDirection.ReadWrite; + + public static KinesisAdapterFactory Create(IServiceProvider services, string name) + { + var streamsConfig = services.GetOptionsByName(name); + var cacheOptions = services.GetOptionsByName(name); + var serializer = services.GetRequiredService>(); + var checkpointerFactory = services.GetRequiredKeyedService(name); + var logger = services.GetRequiredService(); + + var factory = ActivatorUtilities.CreateInstance( + services, + name, + streamsConfig, + cacheOptions, + serializer, + checkpointerFactory, + logger + ); + + return factory; + } + + public async Task CreateAdapter() + { + if (_streamQueueMapper is null) + { + var kinesisStreams = await GetPartitionIdsAsync(); + _streamQueueMapper = _queueMapperFactory(kinesisStreams); + } + + return this; + } + + public IQueueAdapterCache GetQueueAdapterCache() + => _adapterCache; + + public IStreamQueueMapper GetStreamQueueMapper() + => _streamQueueMapper; + + public Task GetDeliveryFailureHandler(QueueId queueId) + => Task.FromResult(new NoOpStreamDeliveryFailureHandler(false)); + + public async Task QueueMessageBatchAsync(StreamId streamId, IEnumerable events, StreamSequenceToken token, Dictionary requestContext) + { + var data = KinesisBatchContainer.ToKinesisPayload(_serializer, streamId, events, requestContext); + + var putRecordRequest = new PutRecordRequest + { + StreamName = _options.StreamName, + Data = new MemoryStream(data), + PartitionKey = streamId.GetKeyAsString(), + }; + + _ = await _client.PutRecordAsync(putRecordRequest); + } + + public IQueueAdapterReceiver CreateReceiver(QueueId queueId) + { + var partition = _streamQueueMapper.QueueToPartition(queueId); + + return new KinesisAdapterReceiver( + CreateClient(), + _options.StreamName, + partition, + _checkpointerFactory, + _serializer, + _loggerFactory + ); + } + + internal AmazonKinesisClient CreateClient() + { + if (_options.Service.StartsWith("http://", StringComparison.OrdinalIgnoreCase) || + _options.Service.StartsWith("https://", StringComparison.OrdinalIgnoreCase)) + { + // Local Kinesis instance (for testing) + var credentials = !string.IsNullOrEmpty(_options.AccessKey) && !string.IsNullOrEmpty(_options.SecretKey) ? + new BasicAWSCredentials(_options.AccessKey, _options.SecretKey) : + new BasicAWSCredentials("dummy", "dummyKey"); + + return new AmazonKinesisClient(credentials, new AmazonKinesisConfig { ServiceURL = _options.Service }); + } + else if (!string.IsNullOrEmpty(_options.AccessKey) && !string.IsNullOrEmpty(_options.SecretKey)) + { + // AWS Kinesis instance (auth via explicit credentials) + var credentials = new BasicAWSCredentials(_options.AccessKey, _options.SecretKey); + return new AmazonKinesisClient(credentials, new AmazonKinesisConfig { RegionEndpoint = RegionEndpoint.GetBySystemName(_options.Service) }); + } + else + { + // AWS Kinesis instance (implicit auth - EC2 IAM Roles etc) + return new AmazonKinesisClient(new AmazonKinesisConfig { RegionEndpoint = RegionEndpoint.GetBySystemName(_options.Service) }); + } + } + + internal async Task GetPartitionIdsAsync() + { + var request = new ListShardsRequest + { + StreamName = _options.StreamName, + }; + + var response = await _client.ListShardsAsync(request); + + return response.Shards.Select(s => s.ShardId).ToArray(); + } + } +} diff --git a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterReceiver.cs b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterReceiver.cs new file mode 100644 index 0000000000..4ce3654716 --- /dev/null +++ b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisAdapterReceiver.cs @@ -0,0 +1,124 @@ +using Amazon.Kinesis; +using Amazon.Kinesis.Model; +using Microsoft.Extensions.Logging; +using Orleans.Serialization; +using Orleans.Streams; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Orleans.Streaming.Kinesis +{ + internal class KinesisAdapterReceiver : IQueueAdapterReceiver + { + private readonly ILogger _logger; + private readonly AmazonKinesisClient _client; + private readonly string _streamName; + private readonly string _partition; + private readonly IStreamQueueCheckpointerFactory _checkpointerFactory; + private readonly Serializer _serializer; + + private IStreamQueueCheckpointer _checkpointer; + private string _shardIterator; + private long _lastReadMessage; + + internal KinesisAdapterReceiver( + AmazonKinesisClient client, + string streamName, + string partition, + IStreamQueueCheckpointerFactory checkpointerFactory, + Serializer serializer, + ILoggerFactory loggerFactory + ) + { + _client = client; + _streamName = streamName; + _partition = partition; + _checkpointerFactory = checkpointerFactory; + _serializer = serializer; + _logger = loggerFactory.CreateLogger(); + } + + public async Task Initialize(TimeSpan timeout) + { + _checkpointer = await _checkpointerFactory.Create(_partition); + var checkpointOffset = await _checkpointer.Load(); + + var getShardIteratorRequest = new GetShardIteratorRequest + { + StreamName = _streamName, + ShardId = _partition, + }; + + if (string.IsNullOrEmpty(checkpointOffset)) + { + getShardIteratorRequest.ShardIteratorType = ShardIteratorType.TRIM_HORIZON; + } + else + { + getShardIteratorRequest.ShardIteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER; + getShardIteratorRequest.StartingSequenceNumber = checkpointOffset; + } + + var getShardIteratorResponse = await _client.GetShardIteratorAsync(getShardIteratorRequest); + _shardIterator = getShardIteratorResponse.ShardIterator; + } + + public async Task> GetQueueMessagesAsync(int maxCount) + { + var getRecordsRequest = new GetRecordsRequest + { + Limit = maxCount, + ShardIterator = _shardIterator, + }; + + var getRecordsResponse = await _client.GetRecordsAsync(getRecordsRequest); + _shardIterator = getRecordsResponse.NextShardIterator; + + if (getRecordsResponse.Records.Count == 0) + { + return Array.Empty(); + } + + var batch = new List(); + + foreach (var record in getRecordsResponse.Records) + { + // Kinesis only has a long string sequence ID, so we fake one based on the order we read from the partition. + batch.Add(KinesisBatchContainer.FromKinesisRecord(_serializer, record, _lastReadMessage++)); + } + + return batch; + } + + public Task MessagesDeliveredAsync(IList messages) + { + KinesisBatchContainer batchWithHighestOffset = null; + + try + { + if (!messages.Any()) + return Task.CompletedTask; + + batchWithHighestOffset = messages + .Cast() + .Max(); + + _checkpointer.Update(batchWithHighestOffset.Token.ShardSequence, DateTime.UtcNow); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to commit message offset {@offset} to shard {shardId}", batchWithHighestOffset?.Token?.ShardSequence, _partition); + throw; + } + + return Task.CompletedTask; + } + + public Task Shutdown(TimeSpan timeout) + { + return Task.CompletedTask; + } + } +} diff --git a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisBatchContainer.cs b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisBatchContainer.cs new file mode 100644 index 0000000000..86e13327a7 --- /dev/null +++ b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisBatchContainer.cs @@ -0,0 +1,117 @@ +using Amazon.Kinesis.Model; +using Newtonsoft.Json; +using Orleans.Runtime; +using Orleans.Serialization; +using Orleans.Streams; +using System; +using System.Collections.Generic; +using System.Linq; + +namespace Orleans.Streaming.Kinesis +{ + [Serializable] + [Orleans.GenerateSerializer] + internal class KinesisBatchContainer : IBatchContainer, IComparable + { + [JsonProperty] + [Id(0)] + private readonly byte[] _rawRecord; + + // Payload is local cache of deserialized payloadBytes. Should never be serialized as part of batch container. During batch container serialization raw payloadBytes will always be used. + [NonSerialized] + private Body _payload; + + [JsonIgnore] + [field: NonSerialized] + internal Serializer Serializer { get; set; } + + [JsonProperty] + [Id(1)] + internal KinesisSequenceToken Token { get; } + + private KinesisBatchContainer(Record record, Serializer serializer, long sequenceId) + { + this.Serializer = serializer; + this._rawRecord = record.Data.ToArray(); + + Token = new KinesisSequenceToken(record.SequenceNumber, sequenceId, 0); + } + + [GeneratedActivatorConstructor] + internal KinesisBatchContainer(Serializer serializer) + { + this.Serializer = serializer; + } + + /// + /// Stream identifier for the stream this batch is part of. + /// + public StreamId StreamId => GetPayload().StreamId; + + /// + /// Stream Sequence Token for the start of this batch. + /// + public StreamSequenceToken SequenceToken => Token; + + private Body GetPayload() => _payload ?? (_payload = this.Serializer.Deserialize(_rawRecord)); + + /// + /// Gets events of a specific type from the batch. + /// + /// + /// + public IEnumerable> GetEvents() + { + return GetPayload().Events.Cast().Select((e, i) => Tuple.Create(e, new KinesisSequenceToken(Token.ShardSequence, Token.SequenceNumber, i))); + } + + /// + /// Gives an opportunity to IBatchContainer to set any data in the RequestContext before this IBatchContainer is sent to consumers. + /// It can be the data that was set at the time event was generated and enqueued into the persistent provider or any other data. + /// + /// True if the RequestContext was indeed modified, false otherwise. + public bool ImportRequestContext() + { + if (GetPayload().RequestContext != null) + { + RequestContextExtensions.Import(GetPayload().RequestContext); + return true; + } + return false; + } + + public int CompareTo(KinesisBatchContainer other) + => Token.SequenceNumber.CompareTo(other.SequenceToken.SequenceNumber); + + [Serializable] + [GenerateSerializer] + internal class Body + { + [Id(0)] + public List Events { get; set; } + + [Id(1)] + public Dictionary RequestContext { get; set; } + + [Id(2)] + public StreamId StreamId { get; set; } + } + + internal static byte[] ToKinesisPayload(Serializer serializer, StreamId streamId, IEnumerable events, Dictionary requestContext) + { + var payload = new Body + { + Events = events.Cast().ToList(), + RequestContext = requestContext, + StreamId = streamId, + }; + + return serializer.SerializeToArray(payload); + } + + internal static KinesisBatchContainer FromKinesisRecord(Serializer serializer, Record record, long sequenceId) + { + return new KinesisBatchContainer(record, serializer, sequenceId); + } + } +} diff --git a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisSequenceToken.cs b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisSequenceToken.cs new file mode 100644 index 0000000000..cdfe63b516 --- /dev/null +++ b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisSequenceToken.cs @@ -0,0 +1,49 @@ +using Newtonsoft.Json; +using Orleans.Providers.Streams.Common; +using System; +using System.Globalization; + +namespace Orleans.Streaming.Kinesis +{ + [Serializable] + [GenerateSerializer] + internal class KinesisSequenceToken : EventSequenceTokenV2 + { + /// + /// Initializes a new instance of the class. + /// + /// Kinesis offset within the shard (partition) from which this message came. + /// Receiver-generated sequenceNumber for this message. + /// Index into a batch of events, if multiple events were delivered within a single Kinesis record. + public KinesisSequenceToken(string shardSequence, long sequenceNumber, int eventIndex) + : base(sequenceNumber, eventIndex) + { + ShardSequence = shardSequence; + } + + /// + /// Initializes a new instance of the class. + /// + /// + /// This constructor is exposed for serializer use only. + /// + public KinesisSequenceToken() : base() + { + } + + /// + /// Offset of the message within an Kinesis shard. + /// + [Id(0)] + [JsonProperty] + public string ShardSequence { get; } + + /// Returns a string that represents the current object. + /// A string that represents the current object. + /// 2 + public override string ToString() + { + return string.Format(CultureInfo.InvariantCulture, "KinesisSequenceToken(ShardSequence: {0}, SequenceNumber: {1}, EventIndex: {2})", ShardSequence, SequenceNumber, EventIndex); + } + } +} diff --git a/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisStreamOptions.cs b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisStreamOptions.cs new file mode 100644 index 0000000000..4492c71bce --- /dev/null +++ b/src/AWS/Orleans.Streaming.Kinesis/Streams/KinesisStreamOptions.cs @@ -0,0 +1,27 @@ +namespace Orleans.Streaming.Kinesis +{ + public class KinesisStreamOptions + { + /// + /// Optional Access Key string for Kinesis. + /// + [Redact] + public string AccessKey { get; set; } + + /// + /// Optional Secret key for Kinesis. + /// + [Redact] + public string SecretKey { get; set; } + + /// + /// Kinesis region name, such as "us-west-2", or a URL for the development endpoint. + /// + public string Service { get; set; } + + /// + /// Name of the Kinesis Stream. + /// + public string StreamName { get; set; } + } +} diff --git a/src/AWS/Shared/AWSUtils.cs b/src/AWS/Shared/AWSUtils.cs index 269935ce81..54bc9098bd 100644 --- a/src/AWS/Shared/AWSUtils.cs +++ b/src/AWS/Shared/AWSUtils.cs @@ -9,6 +9,8 @@ namespace Orleans.Persistence.DynamoDB namespace Orleans.Reminders.DynamoDB #elif STREAMING_SQS namespace Orleans.Streaming.SQS +#elif STREAMING_KINESIS +namespace Orleans.Streaming.Kinesis #elif AWSUTILS_TESTS namespace Orleans.AWSUtils.Tests #elif TRANSACTIONS_DYNAMODB diff --git a/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventDataGeneratorStreamConfigurator.cs b/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventDataGeneratorStreamConfigurator.cs index 5d73c5be90..db1b6aa510 100644 --- a/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventDataGeneratorStreamConfigurator.cs +++ b/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventDataGeneratorStreamConfigurator.cs @@ -5,6 +5,7 @@ using Orleans.Configuration; using Orleans.Streaming.EventHubs; using Orleans.Streaming.EventHubs.Testing; +using Orleans.Streams; namespace Orleans.Hosting.Developer { diff --git a/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubStreamOptions.cs b/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubStreamOptions.cs index 915d6434c7..b846e9cbb9 100644 --- a/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubStreamOptions.cs +++ b/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubStreamOptions.cs @@ -190,23 +190,6 @@ public void ValidateConfiguration() } } - public class StreamCheckpointerConfigurationValidator : IConfigurationValidator - { - private readonly IServiceProvider services; - private readonly string name; - public StreamCheckpointerConfigurationValidator(IServiceProvider services, string name) - { - this.services = services; - this.name = name; - } - public void ValidateConfiguration() - { - var checkpointerFactory = services.GetKeyedService(this.name); - if (checkpointerFactory == null) - throw new OrleansConfigurationException($"No IStreamQueueCheckpointer is configured with PersistentStreamProvider {this.name}. Please configure one."); - } - } - public class EventHubReceiverOptions { /// diff --git a/src/Orleans.Streaming/Checkpointers/GrainStreamQueueCheckpointer.cs b/src/Orleans.Streaming/Checkpointers/GrainStreamQueueCheckpointer.cs new file mode 100644 index 0000000000..a46d97667a --- /dev/null +++ b/src/Orleans.Streaming/Checkpointers/GrainStreamQueueCheckpointer.cs @@ -0,0 +1,60 @@ +using System; +using System.Threading.Tasks; + +namespace Orleans.Streams +{ + public class GrainStreamQueueCheckpointer : IStreamQueueCheckpointer + { + // TODO Make this configurable + private static readonly TimeSpan DEFAULT_CHECKPOINT_PERSIST_INTERVAL = TimeSpan.FromMinutes(1); + + private readonly IStreamCheckpointerGrain _grain; + + private string _checkpoint; + private Task _inProgressSave; + private DateTime? _throttleSavesUntilUtc; + + public GrainStreamQueueCheckpointer(IStreamCheckpointerGrain grain) + { + _grain = grain; + } + + public bool CheckpointExists => _checkpoint is not null; + + public static async Task> Create(string providerName, string partition, string serviceId, IClusterClient clusterClient) + { + var grain = clusterClient.GetGrain($"{providerName}_{serviceId}_{partition}"); + + var checkpoint = new GrainStreamQueueCheckpointer(grain); + _ = await checkpoint.Load(); + + return checkpoint; + } + + public async Task Load() + { + _checkpoint = await _grain.Load(); + return _checkpoint; + } + + public void Update(string offset, DateTime utcNow) + { + // if offset has not changed, do nothing + if (string.Compare(_checkpoint, offset, StringComparison.Ordinal) == 0) + { + return; + } + + // if we've saved before but it's not time for another save or the last save operation has not completed, do nothing + if (_throttleSavesUntilUtc.HasValue && (_throttleSavesUntilUtc.Value > utcNow || !_inProgressSave.IsCompleted)) + { + return; + } + + _checkpoint = offset; + _throttleSavesUntilUtc = utcNow + DEFAULT_CHECKPOINT_PERSIST_INTERVAL; + _inProgressSave = _grain.Update(_checkpoint); + _inProgressSave.Ignore(); + } + } +} diff --git a/src/Orleans.Streaming/Checkpointers/GrainStreamQueueCheckpointerFactory.cs b/src/Orleans.Streaming/Checkpointers/GrainStreamQueueCheckpointerFactory.cs new file mode 100644 index 0000000000..fbb0885ef0 --- /dev/null +++ b/src/Orleans.Streaming/Checkpointers/GrainStreamQueueCheckpointerFactory.cs @@ -0,0 +1,32 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using System; +using System.Threading.Tasks; + +namespace Orleans.Streams +{ + public class GrainStreamQueueCheckpointerFactory : IStreamQueueCheckpointerFactory + { + private readonly string _providerName; + private readonly IClusterClient _clusterClient; + private readonly ClusterOptions _clusterOptions; + + public GrainStreamQueueCheckpointerFactory(string providerName, IOptions clusterOptions, IClusterClient clusterClient) + { + _providerName = providerName; + _clusterClient = clusterClient; + _clusterOptions = clusterOptions.Value; + } + + public static IStreamQueueCheckpointerFactory CreateFactory(IServiceProvider services, string providerName) + { + return ActivatorUtilities.CreateInstance(services, providerName); + } + + public Task> Create(string partition) + { + return GrainStreamQueueCheckpointer.Create(_providerName, partition, _clusterOptions.ServiceId.ToString(), _clusterClient); + } + } +} diff --git a/src/Orleans.Streaming/Checkpointers/IStreamCheckpointerGrain.cs b/src/Orleans.Streaming/Checkpointers/IStreamCheckpointerGrain.cs new file mode 100644 index 0000000000..f531ea958d --- /dev/null +++ b/src/Orleans.Streaming/Checkpointers/IStreamCheckpointerGrain.cs @@ -0,0 +1,19 @@ +using System.Threading.Tasks; + +namespace Orleans.Streams +{ + public interface IStreamCheckpointerGrain : IGrainWithStringKey + { + /// + /// Loads the checkpoint. + /// + /// The checkpoint. + Task Load(); + + /// + /// Updates the checkpoint. + /// + /// The offset. + Task Update(string offset); + } +} diff --git a/src/Orleans.Streaming/Checkpointers/StreamCheckpointerConfigurationValidator.cs b/src/Orleans.Streaming/Checkpointers/StreamCheckpointerConfigurationValidator.cs new file mode 100644 index 0000000000..fa6deb1077 --- /dev/null +++ b/src/Orleans.Streaming/Checkpointers/StreamCheckpointerConfigurationValidator.cs @@ -0,0 +1,27 @@ +using Microsoft.Extensions.DependencyInjection; +using Orleans.Runtime; +using System; + +namespace Orleans.Streams +{ + public class StreamCheckpointerConfigurationValidator : IConfigurationValidator + { + private readonly IServiceProvider _services; + private readonly string _name; + + public StreamCheckpointerConfigurationValidator(IServiceProvider services, string name) + { + _services = services; + _name = name; + } + + public void ValidateConfiguration() + { + var checkpointerFactory = _services.GetKeyedService(_name); + if (checkpointerFactory == null) + { + throw new OrleansConfigurationException($"No IStreamQueueCheckpointer is configured with PersistentStreamProvider {_name}. Please configure one."); + } + } + } +} diff --git a/src/Orleans.Streaming/Checkpointers/StreamCheckpointerGrain.cs b/src/Orleans.Streaming/Checkpointers/StreamCheckpointerGrain.cs new file mode 100644 index 0000000000..4c884a2b56 --- /dev/null +++ b/src/Orleans.Streaming/Checkpointers/StreamCheckpointerGrain.cs @@ -0,0 +1,29 @@ +using Orleans.Placement; +using Orleans.Providers; +using Orleans.Runtime; +using System.Threading.Tasks; + +namespace Orleans.Streams +{ + [PreferLocalPlacement] + public class StreamCheckpointerGrainGrain : Grain, IStreamCheckpointerGrain + { + private readonly IPersistentState _state; + + // TODO Expose the provider name as an option for the GrainStreamQueueCheckpointer + public StreamCheckpointerGrainGrain( + [PersistentState("streamcheckpointer", ProviderConstants.DEFAULT_PUBSUB_PROVIDER_NAME)] + IPersistentState state) + { + _state = state; + } + + public Task Load() => Task.FromResult(_state.State.Checkpoint); + + public async Task Update(string offset) + { + _state.State.Checkpoint = offset; + await _state.WriteStateAsync(); + } + } +} diff --git a/src/Orleans.Streaming/Checkpointers/StreamCheckpointerGrainState.cs b/src/Orleans.Streaming/Checkpointers/StreamCheckpointerGrainState.cs new file mode 100644 index 0000000000..69ccce9b4e --- /dev/null +++ b/src/Orleans.Streaming/Checkpointers/StreamCheckpointerGrainState.cs @@ -0,0 +1,10 @@ +using System; + +namespace Orleans.Streams +{ + [Serializable] + public class StreamCheckpointerGrainState + { + public string Checkpoint { get; set; } + } +} diff --git a/src/Orleans.Streaming/QueueBalancer/PersistentStreamConfiguratorExtension.cs b/src/Orleans.Streaming/QueueBalancer/PersistentStreamConfiguratorExtension.cs index 1909c13f07..32c9df63ac 100644 --- a/src/Orleans.Streaming/QueueBalancer/PersistentStreamConfiguratorExtension.cs +++ b/src/Orleans.Streaming/QueueBalancer/PersistentStreamConfiguratorExtension.cs @@ -69,5 +69,14 @@ public static void UseLeaseBasedQueueBalancer(this ISiloPersistentStreamConfigur configurator.ConfigurePartitionBalancing((s, n) => LeaseBasedQueueBalancer.Create(s, n), configureOptions); } + + /// + /// Configures the stream provider to use grain-based checkpointer. + /// + /// The configuration builder. + public static void UseGrainCheckpointer(this ISiloPersistentStreamConfigurator configurator) + { + configurator.ConfigureComponent(GrainStreamQueueCheckpointerFactory.CreateFactory); + } } }