From 52c10eb4373bd68fa4029375dc563e2da0819b34 Mon Sep 17 00:00:00 2001 From: Jon Steinich Date: Mon, 19 Feb 2018 14:31:18 -0600 Subject: [PATCH 1/6] first passs at DynamoDBTransactionLogStorage --- Orleans.sln | 15 + .../Membership/DynamoDBMembershipTable.cs | 4 +- .../Hosting/SiloBuilderExtensions.cs | 46 ++ .../Orleans.Transactions.DynamoDB.csproj | 27 ++ .../Properties/AssemblyInfo.cs | 3 + .../Storage/DynamoDBTransactionLogOptions.cs | 34 ++ .../Storage/DynamoDBTransactionLogStorage.cs | 412 ++++++++++++++++++ src/AWS/Shared/AWSUtils.cs | 2 + src/AWS/Shared/Storage/DynamoDBStorage.cs | 9 +- src/Orleans.Core/Properties/AssemblyInfo.cs | 1 + .../DynamoDBStorageStressTests.cs | 2 +- .../StorageTests/DynamoDBStorageTests.cs | 4 +- 12 files changed, 551 insertions(+), 8 deletions(-) create mode 100644 src/AWS/Orleans.Transactions.DynamoDB/Hosting/SiloBuilderExtensions.cs create mode 100644 src/AWS/Orleans.Transactions.DynamoDB/Orleans.Transactions.DynamoDB.csproj create mode 100644 src/AWS/Orleans.Transactions.DynamoDB/Properties/AssemblyInfo.cs create mode 100644 src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogOptions.cs create mode 100644 src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogStorage.cs diff --git a/Orleans.sln b/Orleans.sln index a00859029b3..066c971d577 100644 --- a/Orleans.sln +++ b/Orleans.sln @@ -194,6 +194,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.TestingHost.Legacy" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.TestingHost.AppDomain", "test\Orleans.TestingHost.AppDomain\Orleans.TestingHost.AppDomain.csproj", "{DF911257-3617-4B5C-9B78-AED17BA6DC9C}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Transactions.DynamoDB", "src\AWS\Orleans.Transactions.DynamoDB\Orleans.Transactions.DynamoDB.csproj", "{17A7F27C-DBDE-4339-A7F5-18BD80C3F205}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -1236,6 +1238,18 @@ Global {DF911257-3617-4B5C-9B78-AED17BA6DC9C}.Release|x64.Build.0 = Release|Any CPU {DF911257-3617-4B5C-9B78-AED17BA6DC9C}.Release|x86.ActiveCfg = Release|Any CPU {DF911257-3617-4B5C-9B78-AED17BA6DC9C}.Release|x86.Build.0 = Release|Any CPU + {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Debug|Any CPU.Build.0 = Debug|Any CPU + {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Debug|x64.ActiveCfg = Debug|Any CPU + {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Debug|x64.Build.0 = Debug|Any CPU + {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Debug|x86.ActiveCfg = Debug|Any CPU + {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Debug|x86.Build.0 = Debug|Any CPU + {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Release|Any CPU.ActiveCfg = Release|Any CPU + {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Release|Any CPU.Build.0 = Release|Any CPU + {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Release|x64.ActiveCfg = Release|Any CPU + {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Release|x64.Build.0 = Release|Any CPU + {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Release|x86.ActiveCfg = Release|Any CPU + {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1334,6 +1348,7 @@ Global {262A898E-0EED-4235-908B-322A699CDD01} = {A6573187-FD0D-4DF7-91D1-03E07E470C0A} {AC640F05-E013-4D80-A1A4-E577D6883D16} = {A6573187-FD0D-4DF7-91D1-03E07E470C0A} {DF911257-3617-4B5C-9B78-AED17BA6DC9C} = {A6573187-FD0D-4DF7-91D1-03E07E470C0A} + {17A7F27C-DBDE-4339-A7F5-18BD80C3F205} = {DA8E126B-BCDB-4E8F-BFB9-2DBFD41F8F70} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {7BFB3429-B5BB-4DB1-95B4-67D77A864952} diff --git a/src/AWS/Orleans.Clustering.DynamoDB/Membership/DynamoDBMembershipTable.cs b/src/AWS/Orleans.Clustering.DynamoDB/Membership/DynamoDBMembershipTable.cs index c4d9f52b673..c54f5fbe26b 100644 --- a/src/AWS/Orleans.Clustering.DynamoDB/Membership/DynamoDBMembershipTable.cs +++ b/src/AWS/Orleans.Clustering.DynamoDB/Membership/DynamoDBMembershipTable.cs @@ -65,7 +65,7 @@ public async Task DeleteMembershipTableEntries(string clusterId) var records = await storage.QueryAsync(this.options.TableName, keys, $"{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME} = :{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}", item => new SiloInstanceRecord(item)); var toDelete = new List>(); - foreach (var record in records) + foreach (var record in records.results) { toDelete.Add(record.GetKeys()); } @@ -114,7 +114,7 @@ public async Task ReadAll() var keys = new Dictionary { { $":{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}", new AttributeValue(this.clusterId) } }; var records = await this.storage.QueryAsync(this.options.TableName, keys, $"{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME} = :{SiloInstanceRecord.DEPLOYMENT_ID_PROPERTY_NAME}", item => new SiloInstanceRecord(item)); - MembershipTableData data = Convert(records); + MembershipTableData data = Convert(records.results); if (this.logger.IsEnabled(LogLevel.Trace)) this.logger.Trace("ReadAll Table=" + Environment.NewLine + "{0}", data.ToString()); return data; diff --git a/src/AWS/Orleans.Transactions.DynamoDB/Hosting/SiloBuilderExtensions.cs b/src/AWS/Orleans.Transactions.DynamoDB/Hosting/SiloBuilderExtensions.cs new file mode 100644 index 00000000000..355857e4284 --- /dev/null +++ b/src/AWS/Orleans.Transactions.DynamoDB/Hosting/SiloBuilderExtensions.cs @@ -0,0 +1,46 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using Orleans.Configuration; +using Orleans.Transactions.DynamoDB; + +namespace Orleans.Hosting +{ + public static class SiloBuilderExtensions + { + /// + /// Configure cluster to use dynamoDB transaction log using configure action. + /// + public static ISiloHostBuilder UseDynamoDBTransactionLog(this ISiloHostBuilder builder, Action configureOptions) + { + return builder.UseDynamoDBTransactionLog(ob => ob.Configure(configureOptions)); + } + + /// + /// Configure cluster to use dynamoDB transaction log using configuration builder. + /// + public static ISiloHostBuilder UseDynamoDBTransactionLog(this ISiloHostBuilder builder, Action> configureOptions) + { + return builder.ConfigureServices(services => services.UseDynamoDBTransactionLog(configureOptions)); + } + + /// + /// Configure cluster service to use dynamoDB transaction log using configure action. + /// + public static IServiceCollection UseDynamoDBTransactionLog(this IServiceCollection services, Action configureOptions) + { + return services.UseDynamoDBTransactionLog(ob => ob.Configure(configureOptions)); + } + + /// + /// Configure cluster service to use dynamoDB transaction log using configuration builder. + /// + public static IServiceCollection UseDynamoDBTransactionLog(this IServiceCollection services, + Action> configureOptions) + { + configureOptions?.Invoke(services.AddOptions()); + services.AddTransient(); + services.AddTransient(DynamoDBTransactionLogStorage.Create); + return services; + } + } +} diff --git a/src/AWS/Orleans.Transactions.DynamoDB/Orleans.Transactions.DynamoDB.csproj b/src/AWS/Orleans.Transactions.DynamoDB/Orleans.Transactions.DynamoDB.csproj new file mode 100644 index 00000000000..9040e20df7f --- /dev/null +++ b/src/AWS/Orleans.Transactions.DynamoDB/Orleans.Transactions.DynamoDB.csproj @@ -0,0 +1,27 @@ + + + Microsoft.Orleans.Transactions.DynamoDB + Microsoft Orleans Transactions on DynamoDB + DynamoDB Transaction library of Microsoft Orleans used on the server. + $(PackageTags) DynamoDB Transactions + + + + Orleans.Transactions.DynamoDB + Orleans.Transactions.DynamoDB + $(DefineConstants);TRANSACTIONS_DYNAMODB + + + + + + + + + + + + + + + diff --git a/src/AWS/Orleans.Transactions.DynamoDB/Properties/AssemblyInfo.cs b/src/AWS/Orleans.Transactions.DynamoDB/Properties/AssemblyInfo.cs new file mode 100644 index 00000000000..bf170eabdf9 --- /dev/null +++ b/src/AWS/Orleans.Transactions.DynamoDB/Properties/AssemblyInfo.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("AWSUtils.Tests")] diff --git a/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogOptions.cs b/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogOptions.cs new file mode 100644 index 00000000000..b9fb2e33dd0 --- /dev/null +++ b/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogOptions.cs @@ -0,0 +1,34 @@ +using Microsoft.Extensions.Options; +using Orleans.Runtime; + +namespace Orleans.Configuration +{ + public class DynamoDBTransactionLogOptions + { + public string ConnectionString { get; set; } + + public string TableName { get; set; } = "TransactionLog"; + } + + public class DynamoDBTransactionLogOptionsValidator : IConfigurationValidator + { + private readonly DynamoDBTransactionLogOptions options; + + public DynamoDBTransactionLogOptionsValidator(IOptions configurationOptions) + { + this.options = configurationOptions.Value; + } + + public void ValidateConfiguration() + { + if (string.IsNullOrWhiteSpace(this.options.ConnectionString)) + { + throw new OrleansConfigurationException($"Invalid DynamoDBTransactionLogOptions. ConnectionString is required."); + } + if (string.IsNullOrWhiteSpace(this.options.TableName)) + { + throw new OrleansConfigurationException($"Invalid DynamoDBTransactionLogOptions. TableName is required."); + } + } + } +} diff --git a/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogStorage.cs b/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogStorage.cs new file mode 100644 index 00000000000..943d049ae92 --- /dev/null +++ b/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogStorage.cs @@ -0,0 +1,412 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.Extensions.Options; +using Microsoft.Extensions.DependencyInjection; +using Orleans.Configuration; +using Orleans.Serialization; +using Orleans.Transactions.Abstractions; +using Microsoft.Extensions.Logging; +using Amazon.DynamoDBv2.Model; +using Amazon.DynamoDBv2; +using System.Globalization; +using System.IO; + +namespace Orleans.Transactions.DynamoDB +{ + public class DynamoDBTransactionLogStorage : ITransactionLogStorage + { + + private const string RowKey = "RowKey"; + private const string PartitionKey = "PartitionKey"; + private const string AllocatedTransactionIdsKey = "AllocatedTransactionIds"; + private const string FirstLSNKey = "FirstLSN"; + private const string TransactionsKey = "Transactions"; + private const string RowKeyAlias = ":RowKey"; + private const string PartitionKeyAlias = ":PartitionKey"; + + private const int BatchOperationLimit = 25; + private const int CommitRecordsPerRow = 40; + private const string CommitRecordPartitionKey = "0"; + + private const string StartRowPartitionKey = "1"; + private static readonly AttributeValue StartRowRowKey = new AttributeValue { N = "0" }; + + //TODO: jbragg - Do not use serializationManager for persistent data!! + private readonly SerializationManager serializationManager; + private readonly DynamoDBTransactionLogOptions options; + private readonly ILoggerFactory loggerFactory; + + private DynamoDBStorage storage; + + private long startRecordValue; + private long nextLogSequenceNumber; + + // Log iteration indexes, reused between operations + private Dictionary currentLastEvaluatedKey; + private List currentQueryResult; + private int currentQueryResultIndex; + private List currentRowTransactions; + private int currentRowTransactionsIndex; + + public DynamoDBTransactionLogStorage(SerializationManager serializationManager, IOptions configurationOptions, ILoggerFactory loggerFactory) + { + this.serializationManager = serializationManager; + this.options = configurationOptions.Value; + this.loggerFactory = loggerFactory; + } + + public async Task Initialize() + { + if (string.IsNullOrWhiteSpace(this.options.ConnectionString)) + { + throw new ArgumentNullException(nameof(this.options.ConnectionString)); + } + + storage = new DynamoDBStorage(this.options.ConnectionString, loggerFactory); + await storage.InitializeTable(this.options.TableName, + new List + { + new KeySchemaElement { AttributeName = PartitionKey, KeyType = KeyType.HASH }, + new KeySchemaElement { AttributeName = RowKey, KeyType = KeyType.RANGE } + }, + new List + { + new AttributeDefinition { AttributeName = PartitionKey, AttributeType = ScalarAttributeType.S }, + new AttributeDefinition { AttributeName = RowKey, AttributeType = ScalarAttributeType.N } + }).ConfigureAwait(false); + + var (results, lastEvaluatedKey) = await storage.QueryAsync(this.options.TableName, + new Dictionary + { + { PartitionKeyAlias, new AttributeValue(StartRowPartitionKey) }, + { RowKeyAlias, StartRowRowKey } + }, + $"{PartitionKey} = {PartitionKeyAlias} AND {RowKey} <= {RowKeyAlias}", + (fields) => + { + return new StartRow(AttributeToLong(fields[AllocatedTransactionIdsKey])); + }).ConfigureAwait(false); + + if (results.Count == 0) + { + // This is a fresh deployment, the StartRecord isn't created yet. + // Create it here. + await storage.PutEntryAsync(this.options.TableName, + new Dictionary + { + { PartitionKey, new AttributeValue(StartRowPartitionKey) }, + { RowKey, StartRowRowKey }, + { AllocatedTransactionIdsKey, new AttributeValue { N = "0" } } + }).ConfigureAwait(false); + + startRecordValue = 0; + } + else + { + startRecordValue = results[0].AllocatedTransactionIds; + } + } + + public static Factory> Create(IServiceProvider serviceProvider) + { + return async () => + { + DynamoDBTransactionLogStorage logStorage = ActivatorUtilities.CreateInstance(serviceProvider, new object[0]); + await logStorage.Initialize(); + return logStorage; + }; + } + + public async Task GetFirstCommitRecord() + { + currentLastEvaluatedKey = null; + + await ReadRowsFromTable(0); + + if (currentQueryResult.Count == 0) + { + // The log has no log entries + currentQueryResult = null; + + nextLogSequenceNumber = 1; + + return null; + } + + currentRowTransactions = DeserializeCommitRecords(currentQueryResult[0].Transactions); + + // TODO: Assert not empty? + + nextLogSequenceNumber = currentRowTransactions[currentRowTransactionsIndex].LSN + 1; + + return currentRowTransactions[currentRowTransactionsIndex++]; + } + + public async Task GetNextCommitRecord() + { + // Based on the current implementation logic in TransactionManager, this must be not be null, since + // GetFirstCommitRecord sets a query or if no start record, the TransactionManager exits its loop. + if (currentQueryResult == null) + { + throw new InvalidOperationException("GetNextCommitRecord called but currentQueryResult is null."); + } + + if (currentRowTransactionsIndex == currentRowTransactions.Count) + { + currentQueryResultIndex++; + currentRowTransactionsIndex = 0; + currentRowTransactions = null; + } + + if (currentQueryResultIndex == currentQueryResult.Count) + { + // No more rows in our current segment, retrieve the next segment from the Table. + if (currentLastEvaluatedKey == null) + { + currentQueryResult = null; + return null; + } + + await ReadRowsFromTable(0); + } + + if (currentRowTransactions == null) + { + // TODO: assert currentRowTransactionsIndex = 0? + currentRowTransactions = DeserializeCommitRecords(currentQueryResult[currentQueryResultIndex].Transactions); + } + + var currentTransaction = currentRowTransactions[currentRowTransactionsIndex++]; + + nextLogSequenceNumber = currentTransaction.LSN + 1; + + return currentTransaction; + } + + public Task GetStartRecord() + { + return Task.FromResult(startRecordValue); + } + + public async Task UpdateStartRecord(long transactionId) + { + await storage.UpsertEntryAsync(this.options.TableName, + new Dictionary + { + { PartitionKey, new AttributeValue(StartRowPartitionKey) }, + { RowKey, StartRowRowKey } + }, + new Dictionary + { + { AllocatedTransactionIdsKey, LongToAttribute(transactionId) } + }).ConfigureAwait(false); + + startRecordValue = transactionId; + } + + public async Task Append(IEnumerable commitRecords) + { + var batchOperation = new List>(); + + // TODO modify this to be able to use IEnumerable, fixed size array for serialization in the size of CommitRecordsPerRow, list is temporary + var transactionList = new List(commitRecords); + + for (int nextRecord = 0; nextRecord < transactionList.Count; nextRecord += CommitRecordsPerRow) + { + var recordCount = Math.Min(transactionList.Count - nextRecord, CommitRecordsPerRow); + var transactionSegment = transactionList.GetRange(nextRecord, recordCount); + var commitRow = new CommitRow(nextLogSequenceNumber); + + foreach (var transaction in transactionSegment) + { + transaction.LSN = nextLogSequenceNumber++; + } + + commitRow.Transactions = SerializeCommitRecords(transactionSegment); + + batchOperation.Add( + new Dictionary + { + { PartitionKey, new AttributeValue(CommitRecordPartitionKey) }, + { RowKey, commitRow.FirstLSNAttribute }, + { TransactionsKey, new AttributeValue { B = new MemoryStream(commitRow.Transactions.Value.Array) } } + }); + + if (batchOperation.Count == BatchOperationLimit) + { + await storage.PutEntriesAsync(this.options.TableName, batchOperation).ConfigureAwait(false); + + batchOperation = new List>(); + } + } + + if (batchOperation.Count > 0) + { + await storage.PutEntriesAsync(this.options.TableName, batchOperation).ConfigureAwait(false); + } + } + + public async Task TruncateLog(long lsn) + { + var keyValues = new Dictionary + { + { PartitionKeyAlias, new AttributeValue(CommitRecordPartitionKey) }, + { RowKeyAlias, LongToAttribute(lsn) } + }; + string query = $"{PartitionKey} = {PartitionKeyAlias} AND {RowKey} <= {RowKeyAlias}"; + Dictionary lastEvaluatedKey = null; + var batchOperation = new List>(); + + do + { + var result = await storage.QueryAsync(this.options.TableName, keyValues, query, + CommitRowResolver, lastEvaluatedKey: lastEvaluatedKey).ConfigureAwait(false); + lastEvaluatedKey = result.lastEvaluatedKey; + + foreach (var row in result.results) + { + var transactions = DeserializeCommitRecords(row.Transactions); + + if (transactions.Count > 0 && transactions[transactions.Count - 1].LSN <= lsn) + { + batchOperation.Add( + new Dictionary + { + { PartitionKey, new AttributeValue(CommitRecordPartitionKey) }, + { RowKey, row.FirstLSNAttribute } + }); + + if (batchOperation.Count == BatchOperationLimit) + { + await storage.DeleteEntriesAsync(this.options.TableName, batchOperation).ConfigureAwait(false); + + batchOperation = new List>(); + } + } + else + { + break; + } + } + + } while (lastEvaluatedKey.Count != 0); + + if (batchOperation.Count > 0) + { + await storage.DeleteEntriesAsync(this.options.TableName, batchOperation).ConfigureAwait(false); + } + } + + private async Task ReadRowsFromTable(long keyLowerBound) + { + var (results, lastEvaluatedKey) = await storage.QueryAsync(this.options.TableName, + new Dictionary + { + { PartitionKeyAlias, new AttributeValue(CommitRecordPartitionKey) }, + { RowKeyAlias, LongToAttribute(keyLowerBound) } + }, + $"{PartitionKey} = {PartitionKeyAlias} AND {RowKey} >= {RowKeyAlias}", + CommitRowResolver, + lastEvaluatedKey: currentLastEvaluatedKey).ConfigureAwait(false); + currentQueryResult = results; + + // Reset the indexes + currentQueryResultIndex = 0; + currentRowTransactionsIndex = 0; + currentLastEvaluatedKey = lastEvaluatedKey; + } + + private ArraySegment SerializeCommitRecords(List commitRecords) + { + var serializableList = new List>>(commitRecords.Count); + + foreach (var commitRecord in commitRecords) + { + serializableList.Add(new Tuple>(commitRecord.LSN, commitRecord.TransactionId, commitRecord.Resources)); + } + + var streamWriter = new BinaryTokenStreamWriter(); + + serializationManager.Serialize(serializableList, streamWriter); + + return new ArraySegment(streamWriter.ToByteArray()); + } + + private List DeserializeCommitRecords(ArraySegment? serializerCommitRecords) + { + if (!serializerCommitRecords.HasValue) + { + return new List(); + } + + var streamReader = new BinaryTokenStreamReader(serializerCommitRecords.Value); + + var deserializedList = serializationManager.Deserialize>>>(streamReader); + + var commitRecords = new List(deserializedList.Count); + + foreach (var item in deserializedList) + { + commitRecords.Add(new CommitRecord { LSN = item.Item1, TransactionId = item.Item2, Resources = item.Item3 }); + } + + return commitRecords; + } + + private static AttributeValue LongToAttribute(long value) + { + return new AttributeValue { N = value.ToString("d", CultureInfo.InvariantCulture) }; + } + + private static long AttributeToLong(AttributeValue value) + { + return long.Parse(value.N, CultureInfo.InvariantCulture); + } + + private static Func, CommitRow> CommitRowResolver => (fields) => + { + var commitRow = new CommitRow(AttributeToLong(fields[FirstLSNKey])); + var stream = fields[TransactionsKey].B; + if (stream.TryGetBuffer(out ArraySegment buffer)) + { + commitRow.Transactions = buffer; + } + else + { + commitRow.Transactions = new ArraySegment(stream.ToArray()); + } + return commitRow; + }; + + private class CommitRow + { + public CommitRow(long firstLSN) + { + FirstLSN = firstLSN; + } + + public ArraySegment? Transactions { get; set; } + + public long FirstLSN { get; set; } + + public AttributeValue FirstLSNAttribute + { + get + { + return LongToAttribute(FirstLSN); + } + } + } + + private class StartRow + { + public StartRow(long transactionId) + { + AllocatedTransactionIds = transactionId; + } + + public long AllocatedTransactionIds { get; set; } + } + } +} diff --git a/src/AWS/Shared/AWSUtils.cs b/src/AWS/Shared/AWSUtils.cs index adb42d0636a..4f747deda93 100644 --- a/src/AWS/Shared/AWSUtils.cs +++ b/src/AWS/Shared/AWSUtils.cs @@ -11,6 +11,8 @@ namespace Orleans.Reminders.DynamoDB namespace Orleans.Streaming.SQS #elif AWSUTILS_TESTS namespace Orleans.AWSUtils.Tests +#elif TRANSACTIONS_DYNAMODB +namespace Orleans.Transactions.DynamoDB #else // No default namespace intentionally to cause compile errors if something is not defined #endif diff --git a/src/AWS/Shared/Storage/DynamoDBStorage.cs b/src/AWS/Shared/Storage/DynamoDBStorage.cs index 1a3c54c0fbd..875c7a13a9c 100644 --- a/src/AWS/Shared/Storage/DynamoDBStorage.cs +++ b/src/AWS/Shared/Storage/DynamoDBStorage.cs @@ -18,6 +18,8 @@ namespace Orleans.Persistence.DynamoDB namespace Orleans.Reminders.DynamoDB #elif AWSUTILS_TESTS namespace Orleans.AWSUtils.Tests +#elif TRANSACTIONS_DYNAMODB +namespace Orleans.Transactions.DynamoDB #else // No default namespace intentionally to cause compile errors if something is not defined #endif @@ -433,7 +435,7 @@ public async Task ReadSingleEntryAsync(string tableName, Dicti /// In case a secondary index is used in the keyConditionExpression /// In case an index is used, show if the seek order is ascending (true) or descending (false) /// The collection containing a list of objects translated by the resolver function - public async Task> QueryAsync(string tableName, Dictionary keys, string keyConditionExpression, Func, TResult> resolver, string indexName = "", bool scanIndexForward = true) where TResult : class + public async Task<(List results, Dictionary lastEvaluatedKey)> QueryAsync(string tableName, Dictionary keys, string keyConditionExpression, Func, TResult> resolver, string indexName = "", bool scanIndexForward = true, Dictionary lastEvaluatedKey = null) where TResult : class { try { @@ -443,7 +445,8 @@ public async Task> QueryAsync(string tableName, Dictionar ExpressionAttributeValues = keys, ConsistentRead = true, KeyConditionExpression = keyConditionExpression, - Select = Select.ALL_ATTRIBUTES + Select = Select.ALL_ATTRIBUTES, + ExclusiveStartKey = lastEvaluatedKey }; if (!string.IsNullOrWhiteSpace(indexName)) @@ -459,7 +462,7 @@ public async Task> QueryAsync(string tableName, Dictionar { resultList.Add(resolver(item)); } - return resultList; + return (resultList, response.LastEvaluatedKey); } catch (Exception) { diff --git a/src/Orleans.Core/Properties/AssemblyInfo.cs b/src/Orleans.Core/Properties/AssemblyInfo.cs index 3287fae157d..fb287ec22e6 100644 --- a/src/Orleans.Core/Properties/AssemblyInfo.cs +++ b/src/Orleans.Core/Properties/AssemblyInfo.cs @@ -20,6 +20,7 @@ [assembly: InternalsVisibleTo("Orleans.TestingHost")] [assembly: InternalsVisibleTo("Orleans.TestingHost.AppDomain")] [assembly: InternalsVisibleTo("Orleans.TestingHost.Legacy")] +[assembly: InternalsVisibleTo("Orleans.Transactions.DynamoDB")] [assembly: InternalsVisibleTo("OrleansCounterControl")] [assembly: InternalsVisibleTo("OrleansManager")] [assembly: InternalsVisibleTo("OrleansProviders")] diff --git a/test/AWSUtils.Tests/StorageTests/DynamoDBStorageStressTests.cs b/test/AWSUtils.Tests/StorageTests/DynamoDBStorageStressTests.cs index 16d8154dbed..208441cfca0 100644 --- a/test/AWSUtils.Tests/StorageTests/DynamoDBStorageStressTests.cs +++ b/test/AWSUtils.Tests/StorageTests/DynamoDBStorageStressTests.cs @@ -70,7 +70,7 @@ public void DynamoDBDataManagerStressTests_ReadAll_SinglePartition() var data = manager.QueryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, keys, $"PartitionKey = :PK", item => new UnitTestDynamoDBTableData(item)).Result; sw.Stop(); - int count = data.Count(); + int count = data.results.Count(); output.WriteLine("DynamoDBDataManagerStressTests_ReadAll completed. ReadAll {0} entries in {1} at {2} RPS", count, sw.Elapsed, count / sw.Elapsed.TotalSeconds); //Assert.True(count >= iterations, $"ReadAllshould return some data: Found={count}"); diff --git a/test/AWSUtils.Tests/StorageTests/DynamoDBStorageTests.cs b/test/AWSUtils.Tests/StorageTests/DynamoDBStorageTests.cs index dba10b9aa00..89ac447adc9 100644 --- a/test/AWSUtils.Tests/StorageTests/DynamoDBStorageTests.cs +++ b/test/AWSUtils.Tests/StorageTests/DynamoDBStorageTests.cs @@ -109,8 +109,8 @@ public async Task DynamoDBDataManager_ReadAllTableEntryByPartitionAsync() await manager.PutEntryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, GetValues(toPersist2, true)); var keys = new Dictionary { { ":PK", new AttributeValue(toPersist.PartitionKey) } }; var found = await manager.QueryAsync(UnitTestDynamoDBStorage.INSTANCE_TABLE_NAME, keys, $"PartitionKey = :PK", item => new UnitTestDynamoDBTableData(item)); - Assert.NotNull(found); - Assert.True(found.Count == 2); + Assert.NotNull(found.results); + Assert.True(found.results.Count == 2); } internal static Dictionary GetKeys(UnitTestDynamoDBTableData data) From 313f5ff5fe723267a81f293b76cdb946039be622 Mon Sep 17 00:00:00 2001 From: Jon Steinich Date: Mon, 19 Feb 2018 15:26:17 -0600 Subject: [PATCH 2/6] dynamodb transaction log tests --- Orleans.sln | 15 ++++++ .../GoldenPathTests.cs | 16 ++++++ .../GoldenPathTransactionManagerTest.cs | 53 +++++++++++++++++++ .../GrainFaultTests.cs | 16 ++++++ .../OrchestrationTests.cs | 16 ++++++ .../Orleans.Transactions.DynamoDB.Test.csproj | 26 +++++++++ .../TestFixture.cs | 45 ++++++++++++++++ 7 files changed, 187 insertions(+) create mode 100644 test/Orleans.Transactions.DynamoDB.Test/GoldenPathTests.cs create mode 100644 test/Orleans.Transactions.DynamoDB.Test/GoldenPathTransactionManagerTest.cs create mode 100644 test/Orleans.Transactions.DynamoDB.Test/GrainFaultTests.cs create mode 100644 test/Orleans.Transactions.DynamoDB.Test/OrchestrationTests.cs create mode 100644 test/Orleans.Transactions.DynamoDB.Test/Orleans.Transactions.DynamoDB.Test.csproj create mode 100644 test/Orleans.Transactions.DynamoDB.Test/TestFixture.cs diff --git a/Orleans.sln b/Orleans.sln index 066c971d577..8d326946cea 100644 --- a/Orleans.sln +++ b/Orleans.sln @@ -196,6 +196,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.TestingHost.AppDoma EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Transactions.DynamoDB", "src\AWS\Orleans.Transactions.DynamoDB\Orleans.Transactions.DynamoDB.csproj", "{17A7F27C-DBDE-4339-A7F5-18BD80C3F205}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Transactions.DynamoDB.Test", "test\Orleans.Transactions.DynamoDB.Test\Orleans.Transactions.DynamoDB.Test.csproj", "{8B3EEA6B-BE00-482D-9625-15A70B534183}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -1250,6 +1252,18 @@ Global {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Release|x64.Build.0 = Release|Any CPU {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Release|x86.ActiveCfg = Release|Any CPU {17A7F27C-DBDE-4339-A7F5-18BD80C3F205}.Release|x86.Build.0 = Release|Any CPU + {8B3EEA6B-BE00-482D-9625-15A70B534183}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8B3EEA6B-BE00-482D-9625-15A70B534183}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8B3EEA6B-BE00-482D-9625-15A70B534183}.Debug|x64.ActiveCfg = Debug|Any CPU + {8B3EEA6B-BE00-482D-9625-15A70B534183}.Debug|x64.Build.0 = Debug|Any CPU + {8B3EEA6B-BE00-482D-9625-15A70B534183}.Debug|x86.ActiveCfg = Debug|Any CPU + {8B3EEA6B-BE00-482D-9625-15A70B534183}.Debug|x86.Build.0 = Debug|Any CPU + {8B3EEA6B-BE00-482D-9625-15A70B534183}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8B3EEA6B-BE00-482D-9625-15A70B534183}.Release|Any CPU.Build.0 = Release|Any CPU + {8B3EEA6B-BE00-482D-9625-15A70B534183}.Release|x64.ActiveCfg = Release|Any CPU + {8B3EEA6B-BE00-482D-9625-15A70B534183}.Release|x64.Build.0 = Release|Any CPU + {8B3EEA6B-BE00-482D-9625-15A70B534183}.Release|x86.ActiveCfg = Release|Any CPU + {8B3EEA6B-BE00-482D-9625-15A70B534183}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1321,6 +1335,7 @@ Global {CFD22413-CB67-40D6-B389-F038C8C38365} = {A6573187-FD0D-4DF7-91D1-03E07E470C0A} {BE7FB595-FA25-4D88-8504-E9D4F9D8183D} = {4C5D66BF-EE1C-4DD8-8551-D1B7F3768A34} {651C7B8E-6EEB-4A6B-84A3-B5D7E4554B99} = {A6573187-FD0D-4DF7-91D1-03E07E470C0A} + {8B3EEA6B-BE00-482D-9625-15A70B534183} = {A6573187-FD0D-4DF7-91D1-03E07E470C0A} {E9A8A1F9-1943-45E2-9B9C-1E51A9154D76} = {4C5D66BF-EE1C-4DD8-8551-D1B7F3768A34} {B090606B-651C-42CA-B459-1E8CBDE987EA} = {4C5D66BF-EE1C-4DD8-8551-D1B7F3768A34} {A47E75C9-283F-4530-90D1-FA57AEFB6F65} = {4C5D66BF-EE1C-4DD8-8551-D1B7F3768A34} diff --git a/test/Orleans.Transactions.DynamoDB.Test/GoldenPathTests.cs b/test/Orleans.Transactions.DynamoDB.Test/GoldenPathTests.cs new file mode 100644 index 00000000000..62631beb955 --- /dev/null +++ b/test/Orleans.Transactions.DynamoDB.Test/GoldenPathTests.cs @@ -0,0 +1,16 @@ +using Xunit; +using Xunit.Abstractions; +using Orleans.Transactions.Tests; + +namespace Orleans.Transactions.DynamoDB.Tests +{ + [TestCategory("DynamoDb"), TestCategory("Transactions"), TestCategory("Functional")] + public class GoldenPathTests : GoldenPathTransactionTestRunner, IClassFixture + { + public GoldenPathTests(TestFixture fixture, ITestOutputHelper output) + : base(fixture.GrainFactory, output) + { + fixture.EnsurePreconditionsMet(); + } + } +} diff --git a/test/Orleans.Transactions.DynamoDB.Test/GoldenPathTransactionManagerTest.cs b/test/Orleans.Transactions.DynamoDB.Test/GoldenPathTransactionManagerTest.cs new file mode 100644 index 00000000000..9e0973e2568 --- /dev/null +++ b/test/Orleans.Transactions.DynamoDB.Test/GoldenPathTransactionManagerTest.cs @@ -0,0 +1,53 @@ +using System; +using System.Threading.Tasks; +using Xunit.Abstractions; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.Runtime.Configuration; +using Orleans.Transactions.Abstractions; +using Orleans.Transactions.Tests; +using TestExtensions; +using Orleans.TestingHost.Utils; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.DependencyInjection; +using AWSUtils.Tests.StorageTests; + +namespace Orleans.Transactions.DynamoDB.Tests +{ + [TestCategory("DynamoDb"), TestCategory("Transactions"), TestCategory("Functional")] + public class GoldenPathTransactionManagerTest : GoldenPathTransactionManagerTestRunner + { + private static readonly TimeSpan LogMaintenanceInterval = TimeSpan.FromMilliseconds(10); + private static readonly TimeSpan StorageDelay = TimeSpan.FromSeconds(30); + + public GoldenPathTransactionManagerTest(ITestOutputHelper output) + : base(MakeTransactionManager(), LogMaintenanceInterval, StorageDelay, output) + { + } + + private static ITransactionManager MakeTransactionManager() + { + TestFixture.CheckForDynamoDBStorage(); + ITransactionManager tm = new TransactionManager(new TransactionLog(StorageFactory), Options.Create(new TransactionsOptions()), NullLoggerFactory.Instance, NullTelemetryProducer.Instance, Options.Create(new SiloStatisticsOptions()), LogMaintenanceInterval); + tm.StartAsync().GetAwaiter().GetResult(); + return tm; + } + + private static async Task StorageFactory() + { + TestFixture.CheckForDynamoDBStorage(); + var config = new ClientConfiguration(); + var environment = SerializationTestEnvironment.InitializeWithDefaults(config); + var dynamoConfig = Options.Create(new DynamoDBTransactionLogOptions() + { + // TODO: Find better way for test isolation. + TableName = $"TransactionLog{((uint)Guid.NewGuid().GetHashCode()) % 100000}", + ConnectionString = $"Service={AWSTestConstants.Service}" + }); + DynamoDBTransactionLogStorage storage = new DynamoDBTransactionLogStorage(environment.SerializationManager, dynamoConfig, environment.Client.ServiceProvider.GetRequiredService()); + await storage.Initialize(); + return storage; + } + } +} diff --git a/test/Orleans.Transactions.DynamoDB.Test/GrainFaultTests.cs b/test/Orleans.Transactions.DynamoDB.Test/GrainFaultTests.cs new file mode 100644 index 00000000000..9c4df0bb884 --- /dev/null +++ b/test/Orleans.Transactions.DynamoDB.Test/GrainFaultTests.cs @@ -0,0 +1,16 @@ +using Xunit; +using Xunit.Abstractions; +using Orleans.Transactions.Tests; + +namespace Orleans.Transactions.DynamoDB.Tests +{ + [TestCategory("DynamoDb"), TestCategory("Transactions"), TestCategory("Functional")] + public class GrainFaultTests : GrainFaultTransactionTestRunner, IClassFixture + { + public GrainFaultTests(TestFixture fixture, ITestOutputHelper output) + : base(fixture.GrainFactory, output) + { + fixture.EnsurePreconditionsMet(); + } + } +} diff --git a/test/Orleans.Transactions.DynamoDB.Test/OrchestrationTests.cs b/test/Orleans.Transactions.DynamoDB.Test/OrchestrationTests.cs new file mode 100644 index 00000000000..76db7520072 --- /dev/null +++ b/test/Orleans.Transactions.DynamoDB.Test/OrchestrationTests.cs @@ -0,0 +1,16 @@ +using Xunit; +using Xunit.Abstractions; +using Orleans.Transactions.Tests; + +namespace Orleans.Transactions.DynamoDB.Tests +{ + [TestCategory("DynamoDb"), TestCategory("Transactions"), TestCategory("Functional")] + public class OrchestrationTests : OrchestrationsTransactionsTestRunner, IClassFixture + { + public OrchestrationTests(TestFixture fixture, ITestOutputHelper output) + : base(fixture.GrainFactory, output) + { + fixture.EnsurePreconditionsMet(); + } + } +} diff --git a/test/Orleans.Transactions.DynamoDB.Test/Orleans.Transactions.DynamoDB.Test.csproj b/test/Orleans.Transactions.DynamoDB.Test/Orleans.Transactions.DynamoDB.Test.csproj new file mode 100644 index 00000000000..dea76c65652 --- /dev/null +++ b/test/Orleans.Transactions.DynamoDB.Test/Orleans.Transactions.DynamoDB.Test.csproj @@ -0,0 +1,26 @@ + + + Orleans.Transactions.DynamoDB.Tests + Orleans.Transactions.DynamoDB.Tests + true + + + + + + + + + + + + + + + + + + + + + diff --git a/test/Orleans.Transactions.DynamoDB.Test/TestFixture.cs b/test/Orleans.Transactions.DynamoDB.Test/TestFixture.cs new file mode 100644 index 00000000000..5029abb71f4 --- /dev/null +++ b/test/Orleans.Transactions.DynamoDB.Test/TestFixture.cs @@ -0,0 +1,45 @@ +using System; +using Xunit; +using Orleans.Hosting; +using Orleans.TestingHost; +using TestExtensions; +using AWSUtils.Tests.StorageTests; + +namespace Orleans.Transactions.DynamoDB.Tests +{ + public class TestFixture : BaseTestClusterFixture + { + protected override void CheckPreconditionsOrThrow() + { + base.CheckPreconditionsOrThrow(); + CheckForDynamoDBStorage(); + } + + protected override void ConfigureTestCluster(TestClusterBuilder builder) + { + builder.AddSiloBuilderConfigurator(); + } + + private class SiloBuilderConfigurator : ISiloBuilderConfigurator + { + public void Configure(ISiloHostBuilder hostBuilder) + { + var id = (uint) Guid.NewGuid().GetHashCode() % 100000; + hostBuilder + .UseInClusterTransactionManager() + .UseDynamoDBTransactionLog(options => { + // TODO: Find better way for test isolation. Possibly different partition keys. + options.TableName = $"TransactionLog{id:X}"; + options.ConnectionString = $"Service={AWSTestConstants.Service}"; + }) + .UseTransactionalState(); + } + } + + public static void CheckForDynamoDBStorage() + { + if (!AWSTestConstants.IsDynamoDbAvailable) + throw new SkipException("Unable to connect to AWS DynamoDB simulator"); + } + } +} From 23e6b0ea49b633b1b48df972e8f64dd0f047cd5f Mon Sep 17 00:00:00 2001 From: Jon Steinich Date: Tue, 20 Feb 2018 11:42:25 -0600 Subject: [PATCH 3/6] dynamodb transaction log tests --- Test.cmd | 1 + test/Orleans.Transactions.DynamoDB.Test/TestFixture.cs | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/Test.cmd b/Test.cmd index 5eeb9852be7..50d145454ad 100644 --- a/Test.cmd +++ b/Test.cmd @@ -42,6 +42,7 @@ set TESTS=^ %CMDHOME%\test\RuntimeCodeGen.Tests,^ %CMDHOME%\test\Orleans.Transactions.Tests,^ %CMDHOME%\test\Orleans.Transactions.Azure.Test,^ +%CMDHOME%\test\Orleans.Transactions.DynamoDB.Test,^ %CMDHOME%\test\Orleans.TestingHost.Tests if []==[%TEST_FILTERS%] set TEST_FILTERS=-trait Category=BVT -trait Category=SlowBVT diff --git a/test/Orleans.Transactions.DynamoDB.Test/TestFixture.cs b/test/Orleans.Transactions.DynamoDB.Test/TestFixture.cs index 5029abb71f4..9105b69f203 100644 --- a/test/Orleans.Transactions.DynamoDB.Test/TestFixture.cs +++ b/test/Orleans.Transactions.DynamoDB.Test/TestFixture.cs @@ -1,9 +1,12 @@ using System; +using System.Collections.Generic; using Xunit; using Orleans.Hosting; using Orleans.TestingHost; using TestExtensions; using AWSUtils.Tests.StorageTests; +using Orleans.Transactions.Tests; +using Orleans.Storage; namespace Orleans.Transactions.DynamoDB.Tests { @@ -18,6 +21,13 @@ protected override void CheckPreconditionsOrThrow() protected override void ConfigureTestCluster(TestClusterBuilder builder) { builder.AddSiloBuilderConfigurator(); + builder.ConfigureLegacyConfiguration((legacy) => + { + legacy.ClusterConfiguration.Globals.RegisterStorageProvider(TransactionTestConstants.TransactionStore, new Dictionary + { + { "DataConnectionString", $"Service={AWSTestConstants.Service}" } + }); + }); } private class SiloBuilderConfigurator : ISiloBuilderConfigurator From 5747b8f185ebf36a6f48b7dda60bd6af8ddc5df2 Mon Sep 17 00:00:00 2001 From: Jon Steinich Date: Tue, 20 Feb 2018 13:54:06 -0600 Subject: [PATCH 4/6] a fix a couple issues found in testing --- .../Storage/DynamoDBTransactionLogStorage.cs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogStorage.cs b/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogStorage.cs index 943d049ae92..5a807761940 100644 --- a/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogStorage.cs +++ b/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogStorage.cs @@ -20,7 +20,6 @@ public class DynamoDBTransactionLogStorage : ITransactionLogStorage private const string RowKey = "RowKey"; private const string PartitionKey = "PartitionKey"; private const string AllocatedTransactionIdsKey = "AllocatedTransactionIds"; - private const string FirstLSNKey = "FirstLSN"; private const string TransactionsKey = "Transactions"; private const string RowKeyAlias = ":RowKey"; private const string PartitionKeyAlias = ":PartitionKey"; @@ -162,7 +161,7 @@ public async Task GetNextCommitRecord() if (currentQueryResultIndex == currentQueryResult.Count) { // No more rows in our current segment, retrieve the next segment from the Table. - if (currentLastEvaluatedKey == null) + if (currentLastEvaluatedKey == null || currentLastEvaluatedKey.Count == 0) { currentQueryResult = null; return null; @@ -366,7 +365,7 @@ private static long AttributeToLong(AttributeValue value) private static Func, CommitRow> CommitRowResolver => (fields) => { - var commitRow = new CommitRow(AttributeToLong(fields[FirstLSNKey])); + var commitRow = new CommitRow(AttributeToLong(fields[RowKey])); var stream = fields[TransactionsKey].B; if (stream.TryGetBuffer(out ArraySegment buffer)) { From 9a7fb9c32b75b75c941dd9ef6a728518bf0ac4ba Mon Sep 17 00:00:00 2001 From: Jon Steinich Date: Thu, 22 Feb 2018 09:48:14 -0600 Subject: [PATCH 5/6] update for DynamoDB storage migrating to options --- .../Storage/DynamoDBTransactionLogOptions.cs | 49 ++++++++++++++++--- .../Storage/DynamoDBTransactionLogStorage.cs | 8 +-- .../StorageTests/AWSTestConstants.cs | 2 +- .../GoldenPathTransactionManagerTest.cs | 2 +- .../TestFixture.cs | 24 ++++----- 5 files changed, 55 insertions(+), 30 deletions(-) diff --git a/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogOptions.cs b/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogOptions.cs index b9fb2e33dd0..4b66e487945 100644 --- a/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogOptions.cs +++ b/src/AWS/Orleans.Transactions.DynamoDB/Storage/DynamoDBTransactionLogOptions.cs @@ -1,12 +1,42 @@ using Microsoft.Extensions.Options; using Orleans.Runtime; +using Orleans.Transactions.DynamoDB; namespace Orleans.Configuration { public class DynamoDBTransactionLogOptions { - public string ConnectionString { get; set; } + /// + /// AccessKey string for DynamoDB Storage + /// + [Redact] + public string AccessKey { get; set; } + /// + /// Secret key for DynamoDB storage + /// + [Redact] + public string SecretKey { get; set; } + + /// + /// DynamoDB Service name + /// + public string Service { get; set; } + + /// + /// Read capacity unit for DynamoDB storage + /// + public int ReadCapacityUnits { get; set; } = DynamoDBStorage.DefaultReadCapacityUnits; + + /// + /// Write capacity unit for DynamoDB storage + /// + public int WriteCapacityUnits { get; set; } = DynamoDBStorage.DefaultWriteCapacityUnits; + + /// + /// DynamoDB table name. + /// Defaults to 'TransactionLog'. + /// public string TableName { get; set; } = "TransactionLog"; } @@ -21,14 +51,17 @@ public DynamoDBTransactionLogOptionsValidator(IOptions { diff --git a/test/AWSUtils.Tests/StorageTests/AWSTestConstants.cs b/test/AWSUtils.Tests/StorageTests/AWSTestConstants.cs index 91d33aa326b..803305d6d11 100644 --- a/test/AWSUtils.Tests/StorageTests/AWSTestConstants.cs +++ b/test/AWSUtils.Tests/StorageTests/AWSTestConstants.cs @@ -18,7 +18,7 @@ public class AWSTestConstants DynamoDBStorage storage; try { - storage = new DynamoDBStorage(NullLoggerFactory.Instance, "Service"); + storage = new DynamoDBStorage(NullLoggerFactory.Instance, Service); } catch (AmazonServiceException) { diff --git a/test/Orleans.Transactions.DynamoDB.Test/GoldenPathTransactionManagerTest.cs b/test/Orleans.Transactions.DynamoDB.Test/GoldenPathTransactionManagerTest.cs index 9e0973e2568..d0e0dbbcf89 100644 --- a/test/Orleans.Transactions.DynamoDB.Test/GoldenPathTransactionManagerTest.cs +++ b/test/Orleans.Transactions.DynamoDB.Test/GoldenPathTransactionManagerTest.cs @@ -43,7 +43,7 @@ private static async Task StorageFactory() { // TODO: Find better way for test isolation. TableName = $"TransactionLog{((uint)Guid.NewGuid().GetHashCode()) % 100000}", - ConnectionString = $"Service={AWSTestConstants.Service}" + Service = AWSTestConstants.Service }); DynamoDBTransactionLogStorage storage = new DynamoDBTransactionLogStorage(environment.SerializationManager, dynamoConfig, environment.Client.ServiceProvider.GetRequiredService()); await storage.Initialize(); diff --git a/test/Orleans.Transactions.DynamoDB.Test/TestFixture.cs b/test/Orleans.Transactions.DynamoDB.Test/TestFixture.cs index 9105b69f203..c878ce5a91e 100644 --- a/test/Orleans.Transactions.DynamoDB.Test/TestFixture.cs +++ b/test/Orleans.Transactions.DynamoDB.Test/TestFixture.cs @@ -1,12 +1,10 @@ using System; -using System.Collections.Generic; using Xunit; using Orleans.Hosting; using Orleans.TestingHost; using TestExtensions; using AWSUtils.Tests.StorageTests; using Orleans.Transactions.Tests; -using Orleans.Storage; namespace Orleans.Transactions.DynamoDB.Tests { @@ -21,13 +19,6 @@ protected override void CheckPreconditionsOrThrow() protected override void ConfigureTestCluster(TestClusterBuilder builder) { builder.AddSiloBuilderConfigurator(); - builder.ConfigureLegacyConfiguration((legacy) => - { - legacy.ClusterConfiguration.Globals.RegisterStorageProvider(TransactionTestConstants.TransactionStore, new Dictionary - { - { "DataConnectionString", $"Service={AWSTestConstants.Service}" } - }); - }); } private class SiloBuilderConfigurator : ISiloBuilderConfigurator @@ -37,11 +28,16 @@ public void Configure(ISiloHostBuilder hostBuilder) var id = (uint) Guid.NewGuid().GetHashCode() % 100000; hostBuilder .UseInClusterTransactionManager() - .UseDynamoDBTransactionLog(options => { - // TODO: Find better way for test isolation. Possibly different partition keys. - options.TableName = $"TransactionLog{id:X}"; - options.ConnectionString = $"Service={AWSTestConstants.Service}"; - }) + .AddDynamoDBGrainStorage(TransactionTestConstants.TransactionStore, options => + { + options.Service = AWSTestConstants.Service; + }) + .UseDynamoDBTransactionLog(options => + { + // TODO: Find better way for test isolation. Possibly different partition keys. + options.TableName = $"TransactionLog{id:X}"; + options.Service = AWSTestConstants.Service; + }) .UseTransactionalState(); } } From c1128a514116fedcdc29fe23b20b1a821dd71b12 Mon Sep 17 00:00:00 2001 From: Jon Steinich Date: Fri, 23 Mar 2018 09:02:32 -0500 Subject: [PATCH 6/6] Fix XML Comment --- src/AWS/Shared/Storage/DynamoDBStorage.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/AWS/Shared/Storage/DynamoDBStorage.cs b/src/AWS/Shared/Storage/DynamoDBStorage.cs index 875c7a13a9c..472a845c556 100644 --- a/src/AWS/Shared/Storage/DynamoDBStorage.cs +++ b/src/AWS/Shared/Storage/DynamoDBStorage.cs @@ -434,7 +434,8 @@ public async Task ReadSingleEntryAsync(string tableName, Dicti /// Function that will be called to translate the returned fields into a concrete type. This Function is only called if the result is != null and will be called for each entry that match the query and added to the results list /// In case a secondary index is used in the keyConditionExpression /// In case an index is used, show if the seek order is ascending (true) or descending (false) - /// The collection containing a list of objects translated by the resolver function + /// The primary key of the first item that this operation will evaluate. Use the value that was returned for LastEvaluatedKey in the previous operation + /// The collection containing a list of objects translated by the resolver function and the LastEvaluatedKey for paged results public async Task<(List results, Dictionary lastEvaluatedKey)> QueryAsync(string tableName, Dictionary keys, string keyConditionExpression, Func, TResult> resolver, string indexName = "", bool scanIndexForward = true, Dictionary lastEvaluatedKey = null) where TResult : class { try