From 5f91f42e4a6723377941cb6a7a3d080ab9c8ebc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lengyel=20J=C3=A1nos=20Zolt=C3=A1n?= Date: Fri, 19 Jan 2024 11:31:10 +0100 Subject: [PATCH] IOneStreamProvider, IOneSinkProvider and implementations --- .../Mutators/EpPlusSimpleRowWriterMutator.cs | 2 +- EtLast.EPPlus/Readers/EpPlusExcelReader.cs | 2 +- .../Readers/EpPlusExcelSheetListReader.cs | 2 +- .../Processes/SerializeToJsonFileJobFluent.cs | 2 +- .../Streams/LocalFileSinkProvider.cs | 50 ++-- .../Streams/LocalFileStreamProvider.cs | 19 +- ...pleLocalFilesInDirectoryStreamProvider.cs} | 202 +++++++------- ...cs => MultipleLocalFilesStreamProvider.cs} | 178 ++++++------ .../PartitionedLocalFileSinkProvider.cs | 121 ++++++++ ...reamProvider.cs => IManyStreamProvider.cs} | 14 +- EtLast/Interfaces/IOneSinkProvider.cs | 7 + EtLast/Interfaces/IOneStreamProvider.cs | 7 + ...rovider.cs => IPartitionedSinkProvider.cs} | 14 +- .../Delimited/DelimitedLineReader.cs | 2 +- .../Delimited/WriteToDelimitedMutator.cs | 45 +-- .../WriteToDynamicDelimitedMutator.cs | 47 +--- .../WriteToPartitionedDelimitedFileMutator.cs | 256 +++++++++++++++++ ...oPartitionedDynamicDelimitedFileMutator.cs | 259 ++++++++++++++++++ EtLast/Processes/Http/HttpStreamProvider.cs | 14 +- .../Json/DeserializeFromJsonFileJob.cs | 68 +++-- .../Json/DeserializeFromJsonFilesJob.cs | 71 +++++ EtLast/Processes/Json/JsonArrayReader.cs | 2 +- EtLast/Processes/Json/JsonElementReader.cs | 2 +- .../Processes/Json/SerializeToJsonFileJob.cs | 4 +- .../PartitionKeyGenerator.cs | 6 +- .../SinkProvider/MemorySinkProvider.cs | 16 +- .../PartitionedMemorySinkProvider.cs | 53 ++++ .../StreamProvider/MemoryStreamProvider.cs | 86 +++--- .../StreamProvider/OneMemoryStreamProvider.cs | 33 +++ .../Tests/ReadFromDelimitedTests.cs | 8 +- .../Tests/EpPlusExcelReaderTests.cs | 4 +- .../Tests/EpPlusSimpleRowWriterTests.cs | 8 +- .../Delimited/WriteToDelimitedMutatorTests.cs | 10 +- 33 files changed, 1191 insertions(+), 423 deletions(-) rename EtLast.LocalFiles/Streams/{LocalDirectoryStreamProvider.cs => MultipleLocalFilesInDirectoryStreamProvider.cs} (95%) rename EtLast.LocalFiles/Streams/{LocalFileSetStreamProvider.cs => MultipleLocalFilesStreamProvider.cs} (95%) create mode 100644 EtLast.LocalFiles/Streams/PartitionedLocalFileSinkProvider.cs rename EtLast/Interfaces/{IStreamProvider.cs => IManyStreamProvider.cs} (76%) create mode 100644 EtLast/Interfaces/IOneSinkProvider.cs create mode 100644 EtLast/Interfaces/IOneStreamProvider.cs rename EtLast/Interfaces/{ISinkProvider.cs => IPartitionedSinkProvider.cs} (81%) create mode 100644 EtLast/Processes/Delimited/WriteToPartitionedDelimitedFileMutator.cs create mode 100644 EtLast/Processes/Delimited/WriteToPartitionedDynamicDelimitedFileMutator.cs create mode 100644 EtLast/Processes/Json/DeserializeFromJsonFilesJob.cs rename EtLast/Streams/{SinkProvider => }/PartitionKeyGenerator.cs (97%) create mode 100644 EtLast/Streams/SinkProvider/PartitionedMemorySinkProvider.cs rename EtLast/Streams/{SinkProvider => }/StreamProvider/MemoryStreamProvider.cs (92%) create mode 100644 EtLast/Streams/StreamProvider/OneMemoryStreamProvider.cs diff --git a/EtLast.EPPlus/Mutators/EpPlusSimpleRowWriterMutator.cs b/EtLast.EPPlus/Mutators/EpPlusSimpleRowWriterMutator.cs index a88b4c7a..7f402c8b 100644 --- a/EtLast.EPPlus/Mutators/EpPlusSimpleRowWriterMutator.cs +++ b/EtLast.EPPlus/Mutators/EpPlusSimpleRowWriterMutator.cs @@ -3,7 +3,7 @@ public sealed class EpPlusSimpleRowWriterMutator : AbstractMutator, IRowSink { [ProcessParameterMustHaveValue] - public required ISinkProvider SinkProvider { get; init; } + public required IPartitionedSinkProvider SinkProvider { get; init; } [ProcessParameterMustHaveValue] public required string SheetName { get; init; } diff --git a/EtLast.EPPlus/Readers/EpPlusExcelReader.cs b/EtLast.EPPlus/Readers/EpPlusExcelReader.cs index eb2293af..77a2cf37 100644 --- a/EtLast.EPPlus/Readers/EpPlusExcelReader.cs +++ b/EtLast.EPPlus/Readers/EpPlusExcelReader.cs @@ -3,7 +3,7 @@ public sealed class EpPlusExcelReader : AbstractEpPlusExcelReader { [ProcessParameterMustHaveValue] - public required IStreamProvider StreamProvider { get; init; } + public required IManyStreamProvider StreamProvider { get; init; } /// /// First stream index is (integer) 0 diff --git a/EtLast.EPPlus/Readers/EpPlusExcelSheetListReader.cs b/EtLast.EPPlus/Readers/EpPlusExcelSheetListReader.cs index 35f3d1c3..b2a90db3 100644 --- a/EtLast.EPPlus/Readers/EpPlusExcelSheetListReader.cs +++ b/EtLast.EPPlus/Readers/EpPlusExcelSheetListReader.cs @@ -3,7 +3,7 @@ public sealed class EpPlusExcelSheetListReader : AbstractRowSource { [ProcessParameterMustHaveValue] - public required IStreamProvider StreamProvider { get; init; } + public required IManyStreamProvider StreamProvider { get; init; } /// /// Default value is "Stream". diff --git a/EtLast.LocalFiles/Processes/SerializeToJsonFileJobFluent.cs b/EtLast.LocalFiles/Processes/SerializeToJsonFileJobFluent.cs index ff3c9e5d..c69b21a3 100644 --- a/EtLast.LocalFiles/Processes/SerializeToJsonFileJobFluent.cs +++ b/EtLast.LocalFiles/Processes/SerializeToJsonFileJobFluent.cs @@ -11,7 +11,7 @@ public static IFlow SerializeToLocalJsonFile(this IFlow builder, string targe Encoding = customEncoding ?? Encoding.UTF8, SinkProvider = new LocalFileSinkProvider() { - FileNameGenerator = _ => targetFileName, + FileName = targetFileName, ActionWhenFileExists = LocalSinkFileExistsAction.DeleteAndContinue, FileMode = FileMode.CreateNew, }, diff --git a/EtLast.LocalFiles/Streams/LocalFileSinkProvider.cs b/EtLast.LocalFiles/Streams/LocalFileSinkProvider.cs index 884c2db6..3532339b 100644 --- a/EtLast.LocalFiles/Streams/LocalFileSinkProvider.cs +++ b/EtLast.LocalFiles/Streams/LocalFileSinkProvider.cs @@ -1,15 +1,13 @@ namespace FizzCode.EtLast; -public enum LocalSinkFileExistsAction { Continue, Exception, DeleteAndContinue } - [ContainsProcessParameterValidation] -public class LocalFileSinkProvider : ISinkProvider +public class LocalFileSinkProvider : IOneSinkProvider { /// - /// Generates file name based on a partition key. + /// Generates file name. /// [ProcessParameterMustHaveValue] - public required Func FileNameGenerator { get; init; } + public required string FileName { get; init; } /// /// Default value is . @@ -33,26 +31,24 @@ public class LocalFileSinkProvider : ISinkProvider public bool AutomaticallyDispose => true; - public NamedSink GetSink(IProcess caller, string partitionKey, string sinkFormat, string[] columns) + public NamedSink GetSink(IProcess caller, string sinkFormat, string[] columns) { - var fileName = FileNameGenerator.Invoke(partitionKey); - var ioCommand = caller.Context.RegisterIoCommand(new IoCommand() { Process = caller, Kind = IoCommandKind.fileWrite, - Location = Path.GetDirectoryName(fileName), - Path = Path.GetFileName(fileName), + Location = Path.GetDirectoryName(FileName), + Path = Path.GetFileName(FileName), Message = "writing to local file", }); - if (ActionWhenFileExists != LocalSinkFileExistsAction.Continue && File.Exists(fileName)) + if (ActionWhenFileExists != LocalSinkFileExistsAction.Continue && File.Exists(FileName)) { if (ActionWhenFileExists == LocalSinkFileExistsAction.Exception) { - var exception = new LocalFileWriteException(caller, "local file already exist", fileName); + var exception = new LocalFileWriteException(caller, "local file already exist", FileName); exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "local file already exist: {0}", - fileName)); + FileName)); ioCommand.AffectedDataCount = 0; ioCommand.Failed(exception); @@ -62,14 +58,14 @@ public NamedSink GetSink(IProcess caller, string partitionKey, string sinkFormat { try { - File.Delete(fileName); + File.Delete(FileName); } catch (Exception ex) { - var exception = new LocalFileWriteException(caller, "error while writing local file / file deletion failed", fileName, ex); + var exception = new LocalFileWriteException(caller, "error while writing local file / file deletion failed", FileName, ex); exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while writing local file: {0}, file deletion failed, message: {1}", - fileName, ex.Message)); - exception.Data["FileName"] = fileName; + FileName, ex.Message)); + exception.Data["FileName"] = FileName; ioCommand.AffectedDataCount = 0; ioCommand.Failed(exception); @@ -78,7 +74,7 @@ public NamedSink GetSink(IProcess caller, string partitionKey, string sinkFormat } } - var directory = Path.GetDirectoryName(fileName); + var directory = Path.GetDirectoryName(FileName); if (!string.IsNullOrEmpty(directory) && !Directory.Exists(directory)) { try @@ -87,10 +83,10 @@ public NamedSink GetSink(IProcess caller, string partitionKey, string sinkFormat } catch (Exception ex) { - var exception = new LocalFileWriteException(caller, "error while writing local file / directory creation failed", fileName, ex); + var exception = new LocalFileWriteException(caller, "error while writing local file / directory creation failed", FileName, ex); exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while writing local file: {0}, directory creation failed, message: {1}", - fileName, ex.Message)); - exception.Data["FileName"] = fileName; + FileName, ex.Message)); + exception.Data["FileName"] = FileName; exception.Data["Directory"] = directory; ioCommand.AffectedDataCount = 0; @@ -101,17 +97,17 @@ public NamedSink GetSink(IProcess caller, string partitionKey, string sinkFormat try { - var sink = caller.Context.GetSink(Path.GetDirectoryName(fileName), Path.GetFileName(fileName), sinkFormat, caller, columns); + var sink = caller.Context.GetSink(Path.GetDirectoryName(FileName), Path.GetFileName(FileName), sinkFormat, caller, columns); - var stream = new FileStream(fileName, FileMode, FileAccess, FileShare); - return new NamedSink(fileName, stream, ioCommand, sink); + var stream = new FileStream(FileName, FileMode, FileAccess, FileShare); + return new NamedSink(FileName, stream, ioCommand, sink); } catch (Exception ex) { - var exception = new LocalFileWriteException(caller, "error while writing local file", fileName, ex); + var exception = new LocalFileWriteException(caller, "error while writing local file", FileName, ex); exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while writing local file: {0}, message: {1}", - fileName, ex.Message)); - exception.Data["FileName"] = fileName; + FileName, ex.Message)); + exception.Data["FileName"] = FileName; ioCommand.AffectedDataCount = 0; ioCommand.Failed(exception); diff --git a/EtLast.LocalFiles/Streams/LocalFileStreamProvider.cs b/EtLast.LocalFiles/Streams/LocalFileStreamProvider.cs index db7274d4..8b7de1fe 100644 --- a/EtLast.LocalFiles/Streams/LocalFileStreamProvider.cs +++ b/EtLast.LocalFiles/Streams/LocalFileStreamProvider.cs @@ -1,7 +1,7 @@ namespace FizzCode.EtLast; [ContainsProcessParameterValidation] -public class LocalFileStreamProvider : IStreamProvider +public class LocalFileStreamProvider : IManyStreamProvider, IOneStreamProvider { [ProcessParameterMustHaveValue] public required string FileName { get; init; } @@ -33,7 +33,7 @@ public string GetTopic() : null; } - public IEnumerable GetStreams(IProcess caller) + public NamedStream GetStream(IProcess caller) { var ioCommand = caller.Context.RegisterIoCommand(new IoCommand() { @@ -59,16 +59,13 @@ public IEnumerable GetStreams(IProcess caller) ioCommand.AffectedDataCount = 0; ioCommand.End(); - return Enumerable.Empty(); + return null; } try { var stream = new FileStream(FileName, Options); - return new[] - { - new NamedStream(FileName, stream, ioCommand), - }; + return new NamedStream(FileName, stream, ioCommand); } catch (Exception ex) { @@ -82,4 +79,10 @@ public IEnumerable GetStreams(IProcess caller) throw exception; } } -} + + public IEnumerable GetStreams(IProcess caller) + { + var stream = GetStream(caller); + return new[] { stream }; + } +} \ No newline at end of file diff --git a/EtLast.LocalFiles/Streams/LocalDirectoryStreamProvider.cs b/EtLast.LocalFiles/Streams/MultipleLocalFilesInDirectoryStreamProvider.cs similarity index 95% rename from EtLast.LocalFiles/Streams/LocalDirectoryStreamProvider.cs rename to EtLast.LocalFiles/Streams/MultipleLocalFilesInDirectoryStreamProvider.cs index 00c7f9de..caaac9d4 100644 --- a/EtLast.LocalFiles/Streams/LocalDirectoryStreamProvider.cs +++ b/EtLast.LocalFiles/Streams/MultipleLocalFilesInDirectoryStreamProvider.cs @@ -1,101 +1,101 @@ -namespace FizzCode.EtLast; - -[ContainsProcessParameterValidation] -public class LocalDirectoryStreamProvider : IStreamProvider -{ - [ProcessParameterMustHaveValue] - public required string Directory { get; init; } - - /// - /// Default value is "*.*" - /// - [ProcessParameterMustHaveValue] - public required string SearchPattern { get; init; } = "*.*"; - - /// - /// Default value is true. - /// - public bool ThrowExceptionWhenFileNotFound { get; init; } = true; - - public string GetTopic() - { - return Directory != null - ? PathHelpers.GetFriendlyPathName(Directory) - + (SearchPattern != null ? @"\" + SearchPattern : "") - : null; - } - - public IEnumerable GetStreams(IProcess caller) - { - var fileNames = new List(); - - if (System.IO.Directory.Exists(Directory)) - { - fileNames.AddRange(System.IO.Directory.EnumerateFiles(Directory, SearchPattern)); - } - - if (fileNames.Count == 0) - { - if (ThrowExceptionWhenFileNotFound) - { - var exception = new LocalFileReadException(caller, "local directory doesn't contain any matching files", Directory); - exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "local directory doesn't contain any matching files: {0}", - Directory)); - - throw exception; - } - - yield break; - } - - foreach (var fileName in fileNames) - { - yield return GetFileStream(caller, fileName); - } - } - - private NamedStream GetFileStream(IProcess caller, string fileName) - { - var ioCommand = caller.Context.RegisterIoCommand(new IoCommand() - { - Process = caller, - Kind = IoCommandKind.fileRead, - Location = Directory, - Path = fileName.Replace(Directory, "", StringComparison.InvariantCultureIgnoreCase), - Message = "reading from local file", - }); - - if (!File.Exists(fileName)) - { - if (ThrowExceptionWhenFileNotFound) - { - var exception = new LocalFileReadException(caller, "local file doesn't exist", fileName); - exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "local file doesn't exist: {0}", - fileName)); - - ioCommand.AffectedDataCount = 0; - ioCommand.Failed(exception); - throw exception; - } - - ioCommand.AffectedDataCount = 0; - ioCommand.End(); - return null; - } - - try - { - var stream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read); - return new NamedStream(fileName, stream, ioCommand); - } - catch (Exception ex) - { - var exception = new LocalFileReadException(caller, "error while opening local file", fileName, ex); - exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while opening local file: {0}, message: {1}", fileName, ex.Message)); - exception.Data["FileName"] = fileName; - - ioCommand.Failed(exception); - throw exception; - } - } -} +namespace FizzCode.EtLast; + +[ContainsProcessParameterValidation] +public class MultipleLocalFilesInDirectoryStreamProvider : IManyStreamProvider +{ + [ProcessParameterMustHaveValue] + public required string Directory { get; init; } + + /// + /// Default value is "*.*" + /// + [ProcessParameterMustHaveValue] + public required string SearchPattern { get; init; } = "*.*"; + + /// + /// Default value is true. + /// + public bool ThrowExceptionWhenFileNotFound { get; init; } = true; + + public string GetTopic() + { + return Directory != null + ? PathHelpers.GetFriendlyPathName(Directory) + + (SearchPattern != null ? @"\" + SearchPattern : "") + : null; + } + + public IEnumerable GetStreams(IProcess caller) + { + var fileNames = new List(); + + if (System.IO.Directory.Exists(Directory)) + { + fileNames.AddRange(System.IO.Directory.EnumerateFiles(Directory, SearchPattern)); + } + + if (fileNames.Count == 0) + { + if (ThrowExceptionWhenFileNotFound) + { + var exception = new LocalFileReadException(caller, "local directory doesn't contain any matching files", Directory); + exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "local directory doesn't contain any matching files: {0}", + Directory)); + + throw exception; + } + + yield break; + } + + foreach (var fileName in fileNames) + { + yield return GetFileStream(caller, fileName); + } + } + + private NamedStream GetFileStream(IProcess caller, string fileName) + { + var ioCommand = caller.Context.RegisterIoCommand(new IoCommand() + { + Process = caller, + Kind = IoCommandKind.fileRead, + Location = Directory, + Path = fileName.Replace(Directory, "", StringComparison.InvariantCultureIgnoreCase), + Message = "reading from local file", + }); + + if (!File.Exists(fileName)) + { + if (ThrowExceptionWhenFileNotFound) + { + var exception = new LocalFileReadException(caller, "local file doesn't exist", fileName); + exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "local file doesn't exist: {0}", + fileName)); + + ioCommand.AffectedDataCount = 0; + ioCommand.Failed(exception); + throw exception; + } + + ioCommand.AffectedDataCount = 0; + ioCommand.End(); + return null; + } + + try + { + var stream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read); + return new NamedStream(fileName, stream, ioCommand); + } + catch (Exception ex) + { + var exception = new LocalFileReadException(caller, "error while opening local file", fileName, ex); + exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while opening local file: {0}, message: {1}", fileName, ex.Message)); + exception.Data["FileName"] = fileName; + + ioCommand.Failed(exception); + throw exception; + } + } +} diff --git a/EtLast.LocalFiles/Streams/LocalFileSetStreamProvider.cs b/EtLast.LocalFiles/Streams/MultipleLocalFilesStreamProvider.cs similarity index 95% rename from EtLast.LocalFiles/Streams/LocalFileSetStreamProvider.cs rename to EtLast.LocalFiles/Streams/MultipleLocalFilesStreamProvider.cs index f38f366e..d5c2614f 100644 --- a/EtLast.LocalFiles/Streams/LocalFileSetStreamProvider.cs +++ b/EtLast.LocalFiles/Streams/MultipleLocalFilesStreamProvider.cs @@ -1,89 +1,89 @@ -namespace FizzCode.EtLast; - -[ContainsProcessParameterValidation] -public class LocalFileSetStreamProvider : IStreamProvider -{ - [ProcessParameterMustHaveValue] - public required string[] FileNames { get; init; } - - public static FileStreamOptions DefaultOptions => new() - { - Mode = FileMode.Open, - Access = FileAccess.Read, - Share = FileShare.Read, - Options = FileOptions.None, - BufferSize = 4096, - PreallocationSize = 0, - }; - - /// - /// Default value is - /// - public FileStreamOptions Options { get; init; } = DefaultOptions; - - /// - /// Default value is true. - /// - public bool ThrowExceptionWhenFileNotFound { get; init; } = true; - - public string GetTopic() - { - return FileNames?.Length > 0 - ? PathHelpers.GetFriendlyPathName(FileNames[0]) + "+" + FileNames.Length.ToString("D", CultureInfo.InvariantCulture) + "" - : null; - } - - public IEnumerable GetStreams(IProcess caller) - { - foreach (var fileName in FileNames) - { - yield return GetFileStream(caller, fileName); - } - } - - private NamedStream GetFileStream(IProcess caller, string fileName) - { - var ioCommand = caller.Context.RegisterIoCommand(new IoCommand() - { - Process = caller, - Kind = IoCommandKind.fileRead, - Location = Path.GetDirectoryName(fileName), - Path = Path.GetFileName(fileName), - Message = "reading from local file", - }); - - if (!File.Exists(fileName)) - { - if (ThrowExceptionWhenFileNotFound) - { - var exception = new LocalFileReadException(caller, "local file doesn't exist", fileName); - exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "local file doesn't exist: {0}", - fileName)); - - ioCommand.AffectedDataCount = 0; - ioCommand.Failed(exception); - throw exception; - } - - ioCommand.AffectedDataCount = 0; - ioCommand.End(); - return null; - } - - try - { - var stream = new FileStream(fileName, Options); - return new NamedStream(fileName, stream, ioCommand); - } - catch (Exception ex) - { - var exception = new LocalFileReadException(caller, "error while opening local file", fileName, ex); - exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while opening local file: {0}, message: {1}", fileName, ex.Message)); - exception.Data["FileName"] = fileName; - - ioCommand.AffectedDataCount = 0; - ioCommand.Failed(exception); - throw exception; - } - } -} +namespace FizzCode.EtLast; + +[ContainsProcessParameterValidation] +public class MultipleLocalFilesStreamProvider : IManyStreamProvider +{ + [ProcessParameterMustHaveValue] + public required string[] FileNames { get; init; } + + public static FileStreamOptions DefaultOptions => new() + { + Mode = FileMode.Open, + Access = FileAccess.Read, + Share = FileShare.Read, + Options = FileOptions.None, + BufferSize = 4096, + PreallocationSize = 0, + }; + + /// + /// Default value is + /// + public FileStreamOptions Options { get; init; } = DefaultOptions; + + /// + /// Default value is true. + /// + public bool ThrowExceptionWhenFileNotFound { get; init; } = true; + + public string GetTopic() + { + return FileNames?.Length > 0 + ? PathHelpers.GetFriendlyPathName(FileNames[0]) + "+" + FileNames.Length.ToString("D", CultureInfo.InvariantCulture) + "" + : null; + } + + public IEnumerable GetStreams(IProcess caller) + { + foreach (var fileName in FileNames) + { + yield return GetFileStream(caller, fileName); + } + } + + private NamedStream GetFileStream(IProcess caller, string fileName) + { + var ioCommand = caller.Context.RegisterIoCommand(new IoCommand() + { + Process = caller, + Kind = IoCommandKind.fileRead, + Location = Path.GetDirectoryName(fileName), + Path = Path.GetFileName(fileName), + Message = "reading from local file", + }); + + if (!File.Exists(fileName)) + { + if (ThrowExceptionWhenFileNotFound) + { + var exception = new LocalFileReadException(caller, "local file doesn't exist", fileName); + exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "local file doesn't exist: {0}", + fileName)); + + ioCommand.AffectedDataCount = 0; + ioCommand.Failed(exception); + throw exception; + } + + ioCommand.AffectedDataCount = 0; + ioCommand.End(); + return null; + } + + try + { + var stream = new FileStream(fileName, Options); + return new NamedStream(fileName, stream, ioCommand); + } + catch (Exception ex) + { + var exception = new LocalFileReadException(caller, "error while opening local file", fileName, ex); + exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while opening local file: {0}, message: {1}", fileName, ex.Message)); + exception.Data["FileName"] = fileName; + + ioCommand.AffectedDataCount = 0; + ioCommand.Failed(exception); + throw exception; + } + } +} diff --git a/EtLast.LocalFiles/Streams/PartitionedLocalFileSinkProvider.cs b/EtLast.LocalFiles/Streams/PartitionedLocalFileSinkProvider.cs new file mode 100644 index 00000000..2a44aa2f --- /dev/null +++ b/EtLast.LocalFiles/Streams/PartitionedLocalFileSinkProvider.cs @@ -0,0 +1,121 @@ +namespace FizzCode.EtLast; + +public enum LocalSinkFileExistsAction { Continue, Exception, DeleteAndContinue } + +[ContainsProcessParameterValidation] +public class PartitionedLocalFileSinkProvider : IPartitionedSinkProvider +{ + /// + /// Generates file name based on a partition key. + /// + [ProcessParameterMustHaveValue] + public required Func FileNameGenerator { get; init; } + + /// + /// Default value is . + /// + public required LocalSinkFileExistsAction ActionWhenFileExists { get; init; } = LocalSinkFileExistsAction.Exception; + + /// + /// Default value is . + /// + public required FileMode FileMode { get; init; } = FileMode.Append; + + /// + /// Default value is . + /// + public FileAccess FileAccess { get; init; } = FileAccess.Write; + + /// + /// Default value is . + /// + public FileShare FileShare { get; init; } = FileShare.Read; + + public bool AutomaticallyDispose => true; + + public NamedSink GetSink(IProcess caller, string partitionKey, string sinkFormat, string[] columns) + { + var fileName = FileNameGenerator.Invoke(partitionKey); + + var ioCommand = caller.Context.RegisterIoCommand(new IoCommand() + { + Process = caller, + Kind = IoCommandKind.fileWrite, + Location = Path.GetDirectoryName(fileName), + Path = Path.GetFileName(fileName), + Message = "writing to local file", + }); + + if (ActionWhenFileExists != LocalSinkFileExistsAction.Continue && File.Exists(fileName)) + { + if (ActionWhenFileExists == LocalSinkFileExistsAction.Exception) + { + var exception = new LocalFileWriteException(caller, "local file already exist", fileName); + exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "local file already exist: {0}", + fileName)); + + ioCommand.AffectedDataCount = 0; + ioCommand.Failed(exception); + throw exception; + } + else if (ActionWhenFileExists == LocalSinkFileExistsAction.DeleteAndContinue) + { + try + { + File.Delete(fileName); + } + catch (Exception ex) + { + var exception = new LocalFileWriteException(caller, "error while writing local file / file deletion failed", fileName, ex); + exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while writing local file: {0}, file deletion failed, message: {1}", + fileName, ex.Message)); + exception.Data["FileName"] = fileName; + + ioCommand.AffectedDataCount = 0; + ioCommand.Failed(exception); + throw exception; + } + } + } + + var directory = Path.GetDirectoryName(fileName); + if (!string.IsNullOrEmpty(directory) && !Directory.Exists(directory)) + { + try + { + Directory.CreateDirectory(directory); + } + catch (Exception ex) + { + var exception = new LocalFileWriteException(caller, "error while writing local file / directory creation failed", fileName, ex); + exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while writing local file: {0}, directory creation failed, message: {1}", + fileName, ex.Message)); + exception.Data["FileName"] = fileName; + exception.Data["Directory"] = directory; + + ioCommand.AffectedDataCount = 0; + ioCommand.Failed(exception); + throw exception; + } + } + + try + { + var sink = caller.Context.GetSink(Path.GetDirectoryName(fileName), Path.GetFileName(fileName), sinkFormat, caller, columns); + + var stream = new FileStream(fileName, FileMode, FileAccess, FileShare); + return new NamedSink(fileName, stream, ioCommand, sink); + } + catch (Exception ex) + { + var exception = new LocalFileWriteException(caller, "error while writing local file", fileName, ex); + exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while writing local file: {0}, message: {1}", + fileName, ex.Message)); + exception.Data["FileName"] = fileName; + + ioCommand.AffectedDataCount = 0; + ioCommand.Failed(exception); + throw exception; + } + } +} \ No newline at end of file diff --git a/EtLast/Interfaces/IStreamProvider.cs b/EtLast/Interfaces/IManyStreamProvider.cs similarity index 76% rename from EtLast/Interfaces/IStreamProvider.cs rename to EtLast/Interfaces/IManyStreamProvider.cs index e923b7b2..9402f515 100644 --- a/EtLast/Interfaces/IStreamProvider.cs +++ b/EtLast/Interfaces/IManyStreamProvider.cs @@ -1,7 +1,7 @@ -namespace FizzCode.EtLast; - -public interface IStreamProvider -{ - public string GetTopic(); - public IEnumerable GetStreams(IProcess caller); -} +namespace FizzCode.EtLast; + +public interface IManyStreamProvider +{ + public string GetTopic(); + public IEnumerable GetStreams(IProcess caller); +} diff --git a/EtLast/Interfaces/IOneSinkProvider.cs b/EtLast/Interfaces/IOneSinkProvider.cs new file mode 100644 index 00000000..f0ee21a8 --- /dev/null +++ b/EtLast/Interfaces/IOneSinkProvider.cs @@ -0,0 +1,7 @@ +namespace FizzCode.EtLast; + +public interface IOneSinkProvider +{ + public NamedSink GetSink(IProcess caller, string sinkFormat, string[] columns); + public bool AutomaticallyDispose { get; } +} \ No newline at end of file diff --git a/EtLast/Interfaces/IOneStreamProvider.cs b/EtLast/Interfaces/IOneStreamProvider.cs new file mode 100644 index 00000000..cca2b848 --- /dev/null +++ b/EtLast/Interfaces/IOneStreamProvider.cs @@ -0,0 +1,7 @@ +namespace FizzCode.EtLast; + +public interface IOneStreamProvider +{ + public string GetTopic(); + public NamedStream GetStream(IProcess caller); +} \ No newline at end of file diff --git a/EtLast/Interfaces/ISinkProvider.cs b/EtLast/Interfaces/IPartitionedSinkProvider.cs similarity index 81% rename from EtLast/Interfaces/ISinkProvider.cs rename to EtLast/Interfaces/IPartitionedSinkProvider.cs index 8e742bdd..bc02408b 100644 --- a/EtLast/Interfaces/ISinkProvider.cs +++ b/EtLast/Interfaces/IPartitionedSinkProvider.cs @@ -1,7 +1,7 @@ -namespace FizzCode.EtLast; - -public interface ISinkProvider -{ - public NamedSink GetSink(IProcess caller, string partitionKey, string sinkFormat, string[] columns); - public bool AutomaticallyDispose { get; } -} +namespace FizzCode.EtLast; + +public interface IPartitionedSinkProvider +{ + public NamedSink GetSink(IProcess caller, string partitionKey, string sinkFormat, string[] columns); + public bool AutomaticallyDispose { get; } +} diff --git a/EtLast/Processes/Delimited/DelimitedLineReader.cs b/EtLast/Processes/Delimited/DelimitedLineReader.cs index ac2a7c0c..a195090c 100644 --- a/EtLast/Processes/Delimited/DelimitedLineReader.cs +++ b/EtLast/Processes/Delimited/DelimitedLineReader.cs @@ -5,7 +5,7 @@ public enum DelimitedLineHeader { NoHeader, HasHeader, IgnoreHeader } public sealed class DelimitedLineReader : AbstractRowSource { [ProcessParameterMustHaveValue] - public required IStreamProvider StreamProvider { get; init; } + public required IManyStreamProvider StreamProvider { get; init; } public Dictionary Columns { get; init; } public TextReaderColumn DefaultColumns { get; init; } diff --git a/EtLast/Processes/Delimited/WriteToDelimitedMutator.cs b/EtLast/Processes/Delimited/WriteToDelimitedMutator.cs index f7cefac5..84620b61 100644 --- a/EtLast/Processes/Delimited/WriteToDelimitedMutator.cs +++ b/EtLast/Processes/Delimited/WriteToDelimitedMutator.cs @@ -3,7 +3,7 @@ public sealed class WriteToDelimitedMutator : AbstractMutator, IRowSink { [ProcessParameterMustHaveValue] - public required ISinkProvider SinkProvider { get; init; } + public required IOneSinkProvider SinkProvider { get; init; } /// /// Default value is @@ -51,9 +51,7 @@ public sealed class WriteToDelimitedMutator : AbstractMutator, IRowSink /// public int BatchSize { get; init; } = 10000; - public PartitionKeyGenerator PartitionKeyGenerator { get; init; } - - private readonly Dictionary _sinkEntries = []; + private SinkEntry _sinkEntry = null; private byte[] _delimiterBytes; private byte[] _lineEndingBytes; private byte[] _quoteBytes; @@ -61,8 +59,6 @@ public sealed class WriteToDelimitedMutator : AbstractMutator, IRowSink private char[] _quoteRequiredChars; private string _quoteAsString; - private int _rowCounter; - protected override void StartMutator() { _delimiterBytes = Encoding.GetBytes(new[] { Delimiter }); @@ -71,25 +67,19 @@ protected override void StartMutator() _quoteRequiredChars = [Delimiter, Quote, Escape, '\r', '\n']; _quoteAsString = Quote.ToString(); _quoteBytes = Encoding.GetBytes(new[] { Quote }); - - _rowCounter = 0; } - private SinkEntry GetSinkEntry(string partitionKey) + private SinkEntry GetSinkEntry() { - var internalKey = partitionKey ?? "\0__nopartition__\0"; + if (_sinkEntry != null) + return _sinkEntry; - if (_sinkEntries.TryGetValue(internalKey, out var sinkEntry)) - return sinkEntry; - - sinkEntry = new SinkEntry() + var sinkEntry = _sinkEntry = new SinkEntry() { - NamedSink = SinkProvider.GetSink(this, partitionKey, "delimited", Columns.Select(x => x.Value?.SourceColumn ?? x.Key).ToArray()), + NamedSink = SinkProvider.GetSink(this, "delimited", Columns.Select(x => x.Value?.SourceColumn ?? x.Key).ToArray()), Buffer = new MemoryStream(), }; - _sinkEntries.Add(internalKey, sinkEntry); - if (WriteHeader) { if (sinkEntry.NamedSink.SafeGetPosition() == 0) @@ -140,30 +130,21 @@ private SinkEntry GetSinkEntry(string partitionKey) protected override void CloseMutator() { - foreach (var sinkEntry in _sinkEntries.Values) - { - WriteBuffer(sinkEntry); - } + WriteBuffer(_sinkEntry); if (SinkProvider.AutomaticallyDispose) { - foreach (var sinkEntry in _sinkEntries.Values) - { - sinkEntry.NamedSink.Stream.Flush(); - sinkEntry.NamedSink.Stream.Close(); - sinkEntry.NamedSink.Stream.Dispose(); - } + _sinkEntry.NamedSink.Stream.Flush(); + _sinkEntry.NamedSink.Stream.Close(); + _sinkEntry.NamedSink.Stream.Dispose(); } - _sinkEntries.Clear(); + _sinkEntry = null; } protected override IEnumerable MutateRow(IRow row, long rowInputIndex) { - var partitionKey = PartitionKeyGenerator?.Invoke(row, _rowCounter); - _rowCounter++; - - var sinkEntry = GetSinkEntry(partitionKey); + var sinkEntry = GetSinkEntry(); sinkEntry.NamedSink.Sink.RegisterWrite(row); try diff --git a/EtLast/Processes/Delimited/WriteToDynamicDelimitedMutator.cs b/EtLast/Processes/Delimited/WriteToDynamicDelimitedMutator.cs index 84d173f3..a49a45f7 100644 --- a/EtLast/Processes/Delimited/WriteToDynamicDelimitedMutator.cs +++ b/EtLast/Processes/Delimited/WriteToDynamicDelimitedMutator.cs @@ -3,7 +3,7 @@ public sealed class WriteToDynamicDelimitedMutator : AbstractMutator, IRowSink { [ProcessParameterMustHaveValue] - public required ISinkProvider SinkProvider { get; init; } + public required IOneSinkProvider SinkProvider { get; init; } /// /// Default value is @@ -47,9 +47,7 @@ public sealed class WriteToDynamicDelimitedMutator : AbstractMutator, IRowSink /// public int BatchSize { get; init; } = 10000; - public PartitionKeyGenerator PartitionKeyGenerator { get; init; } - - private readonly Dictionary _sinkEntries = []; + private SinkEntry _sinkEntry; private byte[] _delimiterBytes; private byte[] _lineEndingBytes; private byte[] _quoteBytes; @@ -57,8 +55,6 @@ public sealed class WriteToDynamicDelimitedMutator : AbstractMutator, IRowSink private char[] _quoteRequiredChars; private string _quoteAsString; - private int _rowCounter; - protected override void StartMutator() { _delimiterBytes = Encoding.GetBytes(new[] { Delimiter }); @@ -67,28 +63,22 @@ protected override void StartMutator() _quoteRequiredChars = [Delimiter, Quote, Escape, '\r', '\n']; _quoteAsString = Quote.ToString(); _quoteBytes = Encoding.GetBytes(new[] { Quote }); - - _rowCounter = 0; } - private SinkEntry GetSinkEntry(string partitionKey, IReadOnlySlimRow firstRow) + private SinkEntry GetSinkEntry(IReadOnlySlimRow firstRow) { - var internalKey = partitionKey ?? "\0__nopartition__\0"; - - if (_sinkEntries.TryGetValue(internalKey, out var sinkEntry)) - return sinkEntry; + if (_sinkEntry != null) + return _sinkEntry; var columns = firstRow.Values.Select(x => x.Key).ToArray(); - sinkEntry = new SinkEntry() + var sinkEntry = _sinkEntry = new SinkEntry() { - NamedSink = SinkProvider.GetSink(this, partitionKey, "delimited", columns), + NamedSink = SinkProvider.GetSink(this, "delimited", columns), Buffer = new MemoryStream(), Columns = columns, }; - _sinkEntries.Add(internalKey, sinkEntry); - if (WriteHeader) { if (sinkEntry.NamedSink.SafeGetPosition() == 0) @@ -140,30 +130,21 @@ private SinkEntry GetSinkEntry(string partitionKey, IReadOnlySlimRow firstRow) protected override void CloseMutator() { - foreach (var sinkEntry in _sinkEntries.Values) - { - WriteBuffer(sinkEntry); - } + WriteBuffer(_sinkEntry); if (SinkProvider.AutomaticallyDispose) { - foreach (var sinkEntry in _sinkEntries.Values) - { - sinkEntry.NamedSink.Stream.Flush(); - sinkEntry.NamedSink.Stream.Close(); - sinkEntry.NamedSink.Stream.Dispose(); - } + _sinkEntry.NamedSink.Stream.Flush(); + _sinkEntry.NamedSink.Stream.Close(); + _sinkEntry.NamedSink.Stream.Dispose(); } - _sinkEntries.Clear(); + _sinkEntry = null; } protected override IEnumerable MutateRow(IRow row, long rowInputIndex) { - var partitionKey = PartitionKeyGenerator?.Invoke(row, _rowCounter); - _rowCounter++; - - var sinkEntry = GetSinkEntry(partitionKey, row); + var sinkEntry = GetSinkEntry(row); sinkEntry.NamedSink.Sink.RegisterWrite(row); try @@ -247,7 +228,7 @@ private class SinkEntry } [EditorBrowsable(EditorBrowsableState.Never)] -public static class WriteToUnstructuredDelimitedMutatorFluent +public static class WriteToDynamicDelimitedMutatorFluent { /// /// Write rows to a delimited stream. The first row if each partition is used to determine the columns of the delimited output. diff --git a/EtLast/Processes/Delimited/WriteToPartitionedDelimitedFileMutator.cs b/EtLast/Processes/Delimited/WriteToPartitionedDelimitedFileMutator.cs new file mode 100644 index 00000000..8b305195 --- /dev/null +++ b/EtLast/Processes/Delimited/WriteToPartitionedDelimitedFileMutator.cs @@ -0,0 +1,256 @@ +namespace FizzCode.EtLast; + +public sealed class WriteToPartitionedDelimitedFileMutator : AbstractMutator, IRowSink +{ + [ProcessParameterMustHaveValue] + public required IPartitionedSinkProvider SinkProvider { get; init; } + + /// + /// Default value is + /// + public Encoding Encoding { get; init; } = Encoding.UTF8; + + /// + /// Default value is ; + /// + public IFormatProvider FormatProvider { get; init; } = CultureInfo.InvariantCulture; + + /// + /// Default value is \r\n + /// + public string LineEnding { get; init; } = "\r\n"; + + /// + /// Default value is " + /// + public char Quote { get; init; } = '\"'; + + /// + /// Default value is " + /// + public char Escape { get; init; } = '\"'; + + /// + /// Default value is ';'. + /// + public char Delimiter { get; init; } = ';'; + + /// + /// Default value is true + /// + public bool WriteHeader { get; init; } = true; + + /// + /// Key is the output column title AND the column in the row (later can be customized by setting a ). + /// + [ProcessParameterMustHaveValue] + public required Dictionary Columns { get; init; } + + /// + /// Default value is 10000 + /// + public int BatchSize { get; init; } = 10000; + + public PartitionKeyGenerator PartitionKeyGenerator { get; init; } + + private readonly Dictionary _sinkEntries = []; + private byte[] _delimiterBytes; + private byte[] _lineEndingBytes; + private byte[] _quoteBytes; + private string _escapedQuote; + private char[] _quoteRequiredChars; + private string _quoteAsString; + + private int _rowCounter; + + protected override void StartMutator() + { + _delimiterBytes = Encoding.GetBytes(new[] { Delimiter }); + _lineEndingBytes = Encoding.GetBytes(LineEnding); + _escapedQuote = new string(new[] { Escape, Quote }); + _quoteRequiredChars = [Delimiter, Quote, Escape, '\r', '\n']; + _quoteAsString = Quote.ToString(); + _quoteBytes = Encoding.GetBytes(new[] { Quote }); + + _rowCounter = 0; + } + + private SinkEntry GetSinkEntry(string partitionKey) + { + var internalKey = partitionKey ?? "\0__nopartition__\0"; + + if (_sinkEntries.TryGetValue(internalKey, out var sinkEntry)) + return sinkEntry; + + sinkEntry = new SinkEntry() + { + NamedSink = SinkProvider.GetSink(this, partitionKey, "delimited", Columns.Select(x => x.Value?.SourceColumn ?? x.Key).ToArray()), + Buffer = new MemoryStream(), + }; + + _sinkEntries.Add(internalKey, sinkEntry); + + if (WriteHeader) + { + if (sinkEntry.NamedSink.SafeGetPosition() == 0) + { + var first = true; + foreach (var (columnName, _) in Columns) + { + if (!first) + sinkEntry.NamedSink.Stream.Write(_delimiterBytes); + + if (!string.IsNullOrEmpty(columnName)) + { + var quoteRequired = columnName.IndexOfAny(_quoteRequiredChars) > -1 + || columnName[0] == ' ' + || columnName[^1] == ' '; + + if (quoteRequired) + { + sinkEntry.NamedSink.Stream.Write(_quoteBytes); + + if (columnName.Contains(Quote)) + { + sinkEntry.NamedSink.Stream.Write(Encoding.GetBytes(columnName.Replace(_quoteAsString, _escapedQuote, StringComparison.Ordinal))); + } + else + { + sinkEntry.NamedSink.Stream.Write(Encoding.GetBytes(columnName)); + } + + sinkEntry.NamedSink.Stream.Write(_quoteBytes); + } + else + { + sinkEntry.NamedSink.Stream.Write(Encoding.GetBytes(columnName)); + } + } + + first = false; + } + + sinkEntry.NamedSink.Stream.Write(_lineEndingBytes); + sinkEntry.NamedSink.IncreaseRowsWritten(); + } + } + + return sinkEntry; + } + + protected override void CloseMutator() + { + foreach (var sinkEntry in _sinkEntries.Values) + { + WriteBuffer(sinkEntry); + } + + if (SinkProvider.AutomaticallyDispose) + { + foreach (var sinkEntry in _sinkEntries.Values) + { + sinkEntry.NamedSink.Stream.Flush(); + sinkEntry.NamedSink.Stream.Close(); + sinkEntry.NamedSink.Stream.Dispose(); + } + } + + _sinkEntries.Clear(); + } + + protected override IEnumerable MutateRow(IRow row, long rowInputIndex) + { + var partitionKey = PartitionKeyGenerator?.Invoke(row, _rowCounter); + _rowCounter++; + + var sinkEntry = GetSinkEntry(partitionKey); + sinkEntry.NamedSink.Sink.RegisterWrite(row); + + try + { + var first = true; + foreach (var kvp in Columns) + { + if (!first) + sinkEntry.Buffer.Write(_delimiterBytes); + + var value = row[kvp.Value?.SourceColumn ?? kvp.Key]; + + var str = (kvp.Value?.CustomFormatter ?? DelimitedValueFormatter.Default).Format(value, FormatProvider); + if (!string.IsNullOrEmpty(str)) + { + var quoteRequired = str.IndexOfAny(_quoteRequiredChars) > -1 + || str[0] == ' ' + || str[^1] == ' '; + + if (quoteRequired) + { + sinkEntry.Buffer.Write(_quoteBytes); + + if (str.Contains(Quote)) + { + sinkEntry.Buffer.Write(Encoding.GetBytes(str.Replace(_quoteAsString, _escapedQuote, StringComparison.Ordinal))); + } + else + { + sinkEntry.Buffer.Write(Encoding.GetBytes(str)); + } + + sinkEntry.Buffer.Write(_quoteBytes); + } + else + { + sinkEntry.Buffer.Write(Encoding.GetBytes(str)); + } + } + + first = false; + } + + sinkEntry.Buffer.Write(_lineEndingBytes); + sinkEntry.RowCount++; + + if (sinkEntry.RowCount >= BatchSize) + WriteBuffer(sinkEntry); + } + catch (Exception ex) + { + sinkEntry.NamedSink.IoCommand.AffectedDataCount += sinkEntry.NamedSink.RowsWritten; + sinkEntry.NamedSink.IoCommand.Failed(ex); + throw; + } + + yield return row; + } + + private void WriteBuffer(SinkEntry sinkEntry) + { + if (sinkEntry.RowCount == 0) + return; + + var data = sinkEntry.Buffer.ToArray(); + sinkEntry.NamedSink.Stream.Write(data, 0, data.Length); + sinkEntry.NamedSink.IncreaseRowsWritten(sinkEntry.RowCount); + sinkEntry.RowCount = 0; + sinkEntry.Buffer.SetLength(0); + } + + private class SinkEntry + { + public required NamedSink NamedSink { get; init; } + public required MemoryStream Buffer { get; init; } + public int RowCount = 0; + } +} + +[EditorBrowsable(EditorBrowsableState.Never)] +public static class WriteToPartitionedDelimitedFileMutatorFluent +{ + /// + /// Write rows to a delimited stream. + /// + public static IFluentSequenceMutatorBuilder WriteToPartitionedDelimitedFile(this IFluentSequenceMutatorBuilder builder, WriteToPartitionedDelimitedFileMutator mutator) + { + return builder.AddMutator(mutator); + } +} \ No newline at end of file diff --git a/EtLast/Processes/Delimited/WriteToPartitionedDynamicDelimitedFileMutator.cs b/EtLast/Processes/Delimited/WriteToPartitionedDynamicDelimitedFileMutator.cs new file mode 100644 index 00000000..d49ccb22 --- /dev/null +++ b/EtLast/Processes/Delimited/WriteToPartitionedDynamicDelimitedFileMutator.cs @@ -0,0 +1,259 @@ +namespace FizzCode.EtLast; + +public sealed class WriteToPartitionedDynamicDelimitedFileMutator : AbstractMutator, IRowSink +{ + [ProcessParameterMustHaveValue] + public required IPartitionedSinkProvider SinkProvider { get; init; } + + /// + /// Default value is + /// + public Encoding Encoding { get; init; } = Encoding.UTF8; + + public IValueFormatter ValueFormatter { get; init; } = DelimitedValueFormatter.Default; + + /// + /// Default value is ; + /// + public IFormatProvider FormatProvider { get; init; } = CultureInfo.InvariantCulture; + + /// + /// Default value is \r\n + /// + public string LineEnding { get; init; } = "\r\n"; + + /// + /// Default value is " + /// + public char Quote { get; init; } = '\"'; + + /// + /// Default value is " + /// + public char Escape { get; init; } = '\"'; + + /// + /// Default value is ';'. + /// + public char Delimiter { get; init; } = ';'; + + /// + /// Default value is true + /// + public bool WriteHeader { get; init; } = true; + + /// + /// Default value is 10000 + /// + public int BatchSize { get; init; } = 10000; + + public PartitionKeyGenerator PartitionKeyGenerator { get; init; } + + private readonly Dictionary _sinkEntries = []; + private byte[] _delimiterBytes; + private byte[] _lineEndingBytes; + private byte[] _quoteBytes; + private string _escapedQuote; + private char[] _quoteRequiredChars; + private string _quoteAsString; + + private int _rowCounter; + + protected override void StartMutator() + { + _delimiterBytes = Encoding.GetBytes(new[] { Delimiter }); + _lineEndingBytes = Encoding.GetBytes(LineEnding); + _escapedQuote = new string(new[] { Escape, Quote }); + _quoteRequiredChars = [Delimiter, Quote, Escape, '\r', '\n']; + _quoteAsString = Quote.ToString(); + _quoteBytes = Encoding.GetBytes(new[] { Quote }); + + _rowCounter = 0; + } + + private SinkEntry GetSinkEntry(string partitionKey, IReadOnlySlimRow firstRow) + { + var internalKey = partitionKey ?? "\0__nopartition__\0"; + + if (_sinkEntries.TryGetValue(internalKey, out var sinkEntry)) + return sinkEntry; + + var columns = firstRow.Values.Select(x => x.Key).ToArray(); + + sinkEntry = new SinkEntry() + { + NamedSink = SinkProvider.GetSink(this, partitionKey, "delimited", columns), + Buffer = new MemoryStream(), + Columns = columns, + }; + + _sinkEntries.Add(internalKey, sinkEntry); + + if (WriteHeader) + { + if (sinkEntry.NamedSink.SafeGetPosition() == 0) + { + var first = true; + foreach (var columnName in sinkEntry.Columns) + { + if (!first) + sinkEntry.NamedSink.Stream.Write(_delimiterBytes); + + if (!string.IsNullOrEmpty(columnName)) + { + var quoteRequired = columnName.IndexOfAny(_quoteRequiredChars) > -1 + || columnName[0] == ' ' + || columnName[^1] == ' ' + || columnName.Contains(LineEnding, StringComparison.Ordinal); + + if (quoteRequired) + { + sinkEntry.NamedSink.Stream.Write(_quoteBytes); + + if (columnName.Contains(Quote)) + { + sinkEntry.NamedSink.Stream.Write(Encoding.GetBytes(columnName.Replace(_quoteAsString, _escapedQuote, StringComparison.Ordinal))); + } + else + { + sinkEntry.NamedSink.Stream.Write(Encoding.GetBytes(columnName)); + } + + sinkEntry.NamedSink.Stream.Write(_quoteBytes); + } + else + { + sinkEntry.NamedSink.Stream.Write(Encoding.GetBytes(columnName)); + } + } + + first = false; + } + + sinkEntry.NamedSink.Stream.Write(_lineEndingBytes); + sinkEntry.NamedSink.IncreaseRowsWritten(); + } + } + + return sinkEntry; + } + + protected override void CloseMutator() + { + foreach (var sinkEntry in _sinkEntries.Values) + { + WriteBuffer(sinkEntry); + } + + if (SinkProvider.AutomaticallyDispose) + { + foreach (var sinkEntry in _sinkEntries.Values) + { + sinkEntry.NamedSink.Stream.Flush(); + sinkEntry.NamedSink.Stream.Close(); + sinkEntry.NamedSink.Stream.Dispose(); + } + } + + _sinkEntries.Clear(); + } + + protected override IEnumerable MutateRow(IRow row, long rowInputIndex) + { + var partitionKey = PartitionKeyGenerator?.Invoke(row, _rowCounter); + _rowCounter++; + + var sinkEntry = GetSinkEntry(partitionKey, row); + sinkEntry.NamedSink.Sink.RegisterWrite(row); + + try + { + var first = true; + foreach (var columnName in sinkEntry.Columns) + { + if (!first) + sinkEntry.Buffer.Write(_delimiterBytes); + + var value = row[columnName]; + + var str = ValueFormatter.Format(value, FormatProvider); + + if (!string.IsNullOrEmpty(str)) + { + var quoteRequired = str.IndexOfAny(_quoteRequiredChars) > -1 + || str[0] == ' ' + || str[^1] == ' ' + || str.Contains(LineEnding, StringComparison.Ordinal); + + if (quoteRequired) + { + sinkEntry.Buffer.Write(_quoteBytes); + + if (str.Contains(Quote)) + { + sinkEntry.Buffer.Write(Encoding.GetBytes(str.Replace(_quoteAsString, _escapedQuote, StringComparison.Ordinal))); + } + else + { + sinkEntry.Buffer.Write(Encoding.GetBytes(str)); + } + + sinkEntry.Buffer.Write(_quoteBytes); + } + else + { + sinkEntry.Buffer.Write(Encoding.GetBytes(str)); + } + } + + first = false; + } + + sinkEntry.Buffer.Write(_lineEndingBytes); + sinkEntry.RowCount++; + + if (sinkEntry.RowCount >= BatchSize) + WriteBuffer(sinkEntry); + } + catch (Exception ex) + { + sinkEntry.NamedSink.IoCommand.AffectedDataCount += sinkEntry.NamedSink.RowsWritten; + sinkEntry.NamedSink.IoCommand.Failed(ex); + throw; + } + + yield return row; + } + + private void WriteBuffer(SinkEntry sinkEntry) + { + if (sinkEntry.RowCount == 0) + return; + + var data = sinkEntry.Buffer.ToArray(); + sinkEntry.NamedSink.Stream.Write(data, 0, data.Length); + sinkEntry.NamedSink.IncreaseRowsWritten(sinkEntry.RowCount); + sinkEntry.RowCount = 0; + sinkEntry.Buffer.SetLength(0); + } + + private class SinkEntry + { + public required NamedSink NamedSink { get; init; } + public required MemoryStream Buffer { get; init; } + public int RowCount = 0; + public required string[] Columns { get; init; } + } +} + +[EditorBrowsable(EditorBrowsableState.Never)] +public static class WriteToPartitionedDynamicDelimitedFileMutatorFluent +{ + /// + /// Write rows to a delimited stream. The first row if each partition is used to determine the columns of the delimited output. + /// + public static IFluentSequenceMutatorBuilder WriteToPartitionedDynamicDelimitedFile(this IFluentSequenceMutatorBuilder builder, WriteToPartitionedDynamicDelimitedFileMutator mutator) + { + return builder.AddMutator(mutator); + } +} \ No newline at end of file diff --git a/EtLast/Processes/Http/HttpStreamProvider.cs b/EtLast/Processes/Http/HttpStreamProvider.cs index 9049b877..72f8ae25 100644 --- a/EtLast/Processes/Http/HttpStreamProvider.cs +++ b/EtLast/Processes/Http/HttpStreamProvider.cs @@ -3,7 +3,7 @@ namespace FizzCode.EtLast; [ContainsProcessParameterValidation] -public class HttpStreamProvider : IStreamProvider +public class HttpStreamProvider : IManyStreamProvider, IOneStreamProvider { /// /// According to MSDN, it is recommended to reuse HttpClient instances if possible. @@ -22,7 +22,7 @@ public class HttpStreamProvider : IStreamProvider public string GetTopic() => Url; - public IEnumerable GetStreams(IProcess caller) + public NamedStream GetStream(IProcess caller) { var ioCommand = caller.Context.RegisterIoCommand(new IoCommand() { @@ -40,7 +40,7 @@ public IEnumerable GetStreams(IProcess caller) var stream = Client.GetStreamAsync(Url).Result; var namedStream = new NamedStream(Url, stream, ioCommand); namedStream.OnDispose += (sender, args) => cancellationTokenRegistration.Dispose(); - return new[] { namedStream }; + return namedStream; } catch (Exception ex) { @@ -57,7 +57,13 @@ public IEnumerable GetStreams(IProcess caller) ioCommand.AffectedDataCount = 0; ioCommand.End(); - return Enumerable.Empty(); + return null; } } + + public IEnumerable GetStreams(IProcess caller) + { + var stream = GetStream(caller); + return new[] { stream }; + } } \ No newline at end of file diff --git a/EtLast/Processes/Json/DeserializeFromJsonFileJob.cs b/EtLast/Processes/Json/DeserializeFromJsonFileJob.cs index d0374735..946389e5 100644 --- a/EtLast/Processes/Json/DeserializeFromJsonFileJob.cs +++ b/EtLast/Processes/Json/DeserializeFromJsonFileJob.cs @@ -1,9 +1,9 @@ namespace FizzCode.EtLast; -public sealed class DeserializeFromJsonFileJob : AbstractProcessWithResult> +public sealed class DeserializeFromJsonFileJob : AbstractProcessWithResult { [ProcessParameterMustHaveValue] - public required IStreamProvider StreamProvider { get; init; } + public required IOneStreamProvider StreamProvider { get; init; } public Encoding Encoding { get; init; } = Encoding.UTF8; @@ -16,55 +16,49 @@ protected override void ValidateImpl() { } - protected override IEnumerable ExecuteImpl() + protected override T ExecuteImpl() { - var streams = StreamProvider.GetStreams(this); - if (streams == null) - yield break; + var stream = StreamProvider.GetStream(this); + if (stream == null) + return default; - var streamIndex = 0; - foreach (var stream in streams) - { - T result = default; + T result = default; - try + try + { + using (var reader = new StreamReader(stream.Stream, Encoding)) { - using (var reader = new StreamReader(stream.Stream, Encoding)) - { - var content = reader.ReadToEnd(); - result = JsonSerializer.Deserialize(content, SerializerOptions); - } - - stream.IoCommand.AffectedDataCount += 1; - stream.IoCommand.End(); + var content = reader.ReadToEnd(); + result = JsonSerializer.Deserialize(content, SerializerOptions); } - catch (Exception ex) - { - var exception = new JsonDeserializerException(this, "error while deserializing a json file", ex); - exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while deserializing a json file: {0}", stream.Name)); - exception.Data["StreamName"] = stream.Name; - exception.Data["StreamIndex"] = streamIndex; - stream.IoCommand.Failed(exception); - throw exception; - } - finally - { - stream.Stream.Flush(); - stream.Stream.Close(); - stream.Stream.Dispose(); - } + stream.IoCommand.AffectedDataCount += 1; + stream.IoCommand.End(); + } + catch (Exception ex) + { + var exception = new JsonDeserializerException(this, "error while deserializing a json file", ex); + exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while deserializing a json file: {0}", stream.Name)); + exception.Data["StreamName"] = stream.Name; - yield return result; - streamIndex++; + stream.IoCommand.Failed(exception); + throw exception; } + finally + { + stream.Stream.Flush(); + stream.Stream.Close(); + stream.Stream.Dispose(); + } + + return result; } } [EditorBrowsable(EditorBrowsableState.Never)] public static class DeserializeFromJsonFileJobFluent { - public static IFlow DeserializeFromJsonFile(this IFlow builder, out IEnumerable result, Func> processCreator) + public static IFlow DeserializeFromJsonFile(this IFlow builder, out TResult result, Func> processCreator) { return builder.ExecuteProcessWithResult(out result, processCreator); } diff --git a/EtLast/Processes/Json/DeserializeFromJsonFilesJob.cs b/EtLast/Processes/Json/DeserializeFromJsonFilesJob.cs new file mode 100644 index 00000000..99a7627f --- /dev/null +++ b/EtLast/Processes/Json/DeserializeFromJsonFilesJob.cs @@ -0,0 +1,71 @@ +namespace FizzCode.EtLast; + +public sealed class DeserializeFromJsonFilesJob : AbstractProcessWithResult> +{ + [ProcessParameterMustHaveValue] + public required IManyStreamProvider StreamProvider { get; init; } + + public Encoding Encoding { get; init; } = Encoding.UTF8; + + public JsonSerializerOptions SerializerOptions { get; init; } = new JsonSerializerOptions() + { + WriteIndented = true, + }; + + protected override void ValidateImpl() + { + } + + protected override IEnumerable ExecuteImpl() + { + var streams = StreamProvider.GetStreams(this); + if (streams == null) + yield break; + + var streamIndex = 0; + foreach (var stream in streams) + { + T result = default; + + try + { + using (var reader = new StreamReader(stream.Stream, Encoding)) + { + var content = reader.ReadToEnd(); + result = JsonSerializer.Deserialize(content, SerializerOptions); + } + + stream.IoCommand.AffectedDataCount += 1; + stream.IoCommand.End(); + } + catch (Exception ex) + { + var exception = new JsonDeserializerException(this, "error while deserializing a json file", ex); + exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while deserializing a json file: {0}", stream.Name)); + exception.Data["StreamName"] = stream.Name; + exception.Data["StreamIndex"] = streamIndex; + + stream.IoCommand.Failed(exception); + throw exception; + } + finally + { + stream.Stream.Flush(); + stream.Stream.Close(); + stream.Stream.Dispose(); + } + + yield return result; + streamIndex++; + } + } +} + +[EditorBrowsable(EditorBrowsableState.Never)] +public static class DeserializeFromJsonFilesJobFluent +{ + public static IFlow DeserializeFromJsonFiles(this IFlow builder, out IEnumerable result, Func> processCreator) + { + return builder.ExecuteProcessWithResult(out result, processCreator); + } +} \ No newline at end of file diff --git a/EtLast/Processes/Json/JsonArrayReader.cs b/EtLast/Processes/Json/JsonArrayReader.cs index ab0ea656..cdad02a3 100644 --- a/EtLast/Processes/Json/JsonArrayReader.cs +++ b/EtLast/Processes/Json/JsonArrayReader.cs @@ -5,7 +5,7 @@ namespace FizzCode.EtLast; public sealed class JsonArrayReader : AbstractRowSource { [ProcessParameterMustHaveValue] - public required IStreamProvider StreamProvider { get; init; } + public required IManyStreamProvider StreamProvider { get; init; } [ProcessParameterMustHaveValue] public required string ColumnName { get; init; } diff --git a/EtLast/Processes/Json/JsonElementReader.cs b/EtLast/Processes/Json/JsonElementReader.cs index 7f1ca72e..2111a865 100644 --- a/EtLast/Processes/Json/JsonElementReader.cs +++ b/EtLast/Processes/Json/JsonElementReader.cs @@ -3,7 +3,7 @@ public sealed class JsonElementReader : AbstractRowSource { [ProcessParameterMustHaveValue] - public required IStreamProvider StreamProvider { get; init; } + public required IManyStreamProvider StreamProvider { get; init; } [ProcessParameterMustHaveValue] public required string ColumnName { get; init; } diff --git a/EtLast/Processes/Json/SerializeToJsonFileJob.cs b/EtLast/Processes/Json/SerializeToJsonFileJob.cs index 71abfa53..12cb4fc2 100644 --- a/EtLast/Processes/Json/SerializeToJsonFileJob.cs +++ b/EtLast/Processes/Json/SerializeToJsonFileJob.cs @@ -3,7 +3,7 @@ public sealed class SerializeToJsonFileJob : AbstractJob { [ProcessParameterMustHaveValue] - public required ISinkProvider SinkProvider { get; init; } + public required IOneSinkProvider SinkProvider { get; init; } [ProcessParameterMustHaveValue] public T Data { get; init; } @@ -17,7 +17,7 @@ public sealed class SerializeToJsonFileJob : AbstractJob protected override void ExecuteImpl(Stopwatch netTimeStopwatch) { - var namedSink = SinkProvider.GetSink(this, null, "json", []); + var namedSink = SinkProvider.GetSink(this, "json", []); try { var content = JsonSerializer.Serialize(Data, SerializerOptions); diff --git a/EtLast/Streams/SinkProvider/PartitionKeyGenerator.cs b/EtLast/Streams/PartitionKeyGenerator.cs similarity index 97% rename from EtLast/Streams/SinkProvider/PartitionKeyGenerator.cs rename to EtLast/Streams/PartitionKeyGenerator.cs index 294e3ccf..9ae123a4 100644 --- a/EtLast/Streams/SinkProvider/PartitionKeyGenerator.cs +++ b/EtLast/Streams/PartitionKeyGenerator.cs @@ -1,3 +1,3 @@ -namespace FizzCode.EtLast; - -public delegate string PartitionKeyGenerator(IRow row, long index); +namespace FizzCode.EtLast; + +public delegate string PartitionKeyGenerator(IRow row, long index); diff --git a/EtLast/Streams/SinkProvider/MemorySinkProvider.cs b/EtLast/Streams/SinkProvider/MemorySinkProvider.cs index b4732b34..d470fa31 100644 --- a/EtLast/Streams/SinkProvider/MemorySinkProvider.cs +++ b/EtLast/Streams/SinkProvider/MemorySinkProvider.cs @@ -1,12 +1,13 @@ namespace FizzCode.EtLast; [ContainsProcessParameterValidation] -public class MemorySinkProvider : ISinkProvider +public class MemorySinkProvider : IOneSinkProvider { [ProcessParameterMustHaveValue] - public required Func StreamCreator { get; init; } + public required MemoryStream Stream { get; init; } + + public string Name { get; init; } = "MemorySink"; - private readonly string _sinkName = "MemorySink"; private readonly string _sinkLocation = "memory"; private readonly string _sinkPath = "memory"; @@ -15,7 +16,7 @@ public class MemorySinkProvider : ISinkProvider /// public required bool AutomaticallyDispose { get; init; } - public NamedSink GetSink(IProcess caller, string partitionKey, string sinkFormat, string[] columns) + public NamedSink GetSink(IProcess caller, string sinkFormat, string[] columns) { var ioCommand = caller.Context.RegisterIoCommand(new IoCommand() { @@ -30,16 +31,15 @@ public NamedSink GetSink(IProcess caller, string partitionKey, string sinkFormat { var sink = caller.Context.GetSink(_sinkLocation, _sinkPath, sinkFormat, caller, columns); - var stream = StreamCreator.Invoke(); - return new NamedSink(_sinkName, stream, ioCommand, sink); + return new NamedSink(Name, Stream, ioCommand, sink); } catch (Exception ex) { var exception = new EtlException(caller, "error while writing memory stream", ex); exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while writing memory stream: {0}, message: {1}", - _sinkName, ex.Message)); + Name, ex.Message)); - exception.Data["SinkName"] = _sinkName; + exception.Data["SinkName"] = Name; ioCommand.Failed(exception); throw exception; diff --git a/EtLast/Streams/SinkProvider/PartitionedMemorySinkProvider.cs b/EtLast/Streams/SinkProvider/PartitionedMemorySinkProvider.cs new file mode 100644 index 00000000..3d1497a8 --- /dev/null +++ b/EtLast/Streams/SinkProvider/PartitionedMemorySinkProvider.cs @@ -0,0 +1,53 @@ +namespace FizzCode.EtLast; + +[ContainsProcessParameterValidation] +public class PartitionedMemorySinkProvider : IOneSinkProvider, IPartitionedSinkProvider +{ + [ProcessParameterMustHaveValue] + public required Func StreamCreator { get; init; } + + private readonly string _sinkName = "MemorySink"; + private readonly string _sinkLocation = "memory"; + private readonly string _sinkPath = "memory"; + + /// + /// Default value is false + /// + public required bool AutomaticallyDispose { get; init; } + + public NamedSink GetSink(IProcess caller, string sinkFormat, string[] columns) + { + return GetSink(caller, null, sinkFormat, columns); + } + + public NamedSink GetSink(IProcess caller, string partitionKey, string sinkFormat, string[] columns) + { + var ioCommand = caller.Context.RegisterIoCommand(new IoCommand() + { + Process = caller, + Kind = IoCommandKind.memoryWrite, + Location = _sinkLocation, + Path = _sinkPath, + Message = "writing to memory stream", + }); + + try + { + var sink = caller.Context.GetSink(_sinkLocation, _sinkPath, sinkFormat, caller, columns); + + var stream = StreamCreator.Invoke(partitionKey); + return new NamedSink(_sinkName, stream, ioCommand, sink); + } + catch (Exception ex) + { + var exception = new EtlException(caller, "error while writing memory stream", ex); + exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while writing memory stream: {0}, message: {1}", + _sinkName, ex.Message)); + + exception.Data["SinkName"] = _sinkName; + + ioCommand.Failed(exception); + throw exception; + } + } +} \ No newline at end of file diff --git a/EtLast/Streams/SinkProvider/StreamProvider/MemoryStreamProvider.cs b/EtLast/Streams/StreamProvider/MemoryStreamProvider.cs similarity index 92% rename from EtLast/Streams/SinkProvider/StreamProvider/MemoryStreamProvider.cs rename to EtLast/Streams/StreamProvider/MemoryStreamProvider.cs index 24c2d306..5a1a6ecc 100644 --- a/EtLast/Streams/SinkProvider/StreamProvider/MemoryStreamProvider.cs +++ b/EtLast/Streams/StreamProvider/MemoryStreamProvider.cs @@ -1,44 +1,44 @@ -namespace FizzCode.EtLast; - -[ContainsProcessParameterValidation] -public class MemoryStreamProvider : IStreamProvider -{ - [ProcessParameterMustHaveValue] - public required Func StreamCreator { get; init; } - - private readonly string _streamName = "MemoryStream"; - - public string GetTopic() - { - return _streamName; - } - - public IEnumerable GetStreams(IProcess caller) - { - var ioCommand = caller.Context.RegisterIoCommand(new IoCommand() - { - Process = caller, - Kind = IoCommandKind.streamRead, - Message = "reading from memory stream" - }); - - try - { - var stream = StreamCreator.Invoke(); - return new[] - { - new NamedStream(_streamName, stream, ioCommand), - }; - } - catch (Exception ex) - { - var exception = new EtlException(caller, "error while opening memory stream", ex); - exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while opening memory stream: {0}, message: {1}", - _streamName, ex.Message)); - exception.Data["StreamName"] = _streamName; - - ioCommand.Failed(exception); - throw exception; - } - } +namespace FizzCode.EtLast; + +[ContainsProcessParameterValidation] +public class MemoryStreamProvider : IManyStreamProvider +{ + [ProcessParameterMustHaveValue] + public required Func StreamCreator { get; init; } + + private readonly string _streamName = "MemoryStream"; + + public string GetTopic() + { + return _streamName; + } + + public IEnumerable GetStreams(IProcess caller) + { + var ioCommand = caller.Context.RegisterIoCommand(new IoCommand() + { + Process = caller, + Kind = IoCommandKind.streamRead, + Message = "reading from memory stream" + }); + + try + { + var stream = StreamCreator.Invoke(); + return new[] + { + new NamedStream(_streamName, stream, ioCommand), + }; + } + catch (Exception ex) + { + var exception = new EtlException(caller, "error while opening memory stream", ex); + exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while opening memory stream: {0}, message: {1}", + _streamName, ex.Message)); + exception.Data["StreamName"] = _streamName; + + ioCommand.Failed(exception); + throw exception; + } + } } \ No newline at end of file diff --git a/EtLast/Streams/StreamProvider/OneMemoryStreamProvider.cs b/EtLast/Streams/StreamProvider/OneMemoryStreamProvider.cs new file mode 100644 index 00000000..5da2b401 --- /dev/null +++ b/EtLast/Streams/StreamProvider/OneMemoryStreamProvider.cs @@ -0,0 +1,33 @@ +namespace FizzCode.EtLast; + +[ContainsProcessParameterValidation] +public class OneMemoryStreamProvider : IOneStreamProvider, IManyStreamProvider +{ + [ProcessParameterMustHaveValue] + public required MemoryStream Stream { get; init; } + + public string Name { get; init; } = "MemoryStream"; + + public string GetTopic() + { + return Name; + } + + public NamedStream GetStream(IProcess caller) + { + var ioCommand = caller.Context.RegisterIoCommand(new IoCommand() + { + Process = caller, + Kind = IoCommandKind.streamRead, + Message = "reading from memory stream" + }); + + return new NamedStream(Name, Stream, ioCommand); + } + + public IEnumerable GetStreams(IProcess caller) + { + var stream = GetStream(caller); + return new[] { stream }; + } +} \ No newline at end of file diff --git a/Tests/EtLast.Tests.Benchmarks/Tests/ReadFromDelimitedTests.cs b/Tests/EtLast.Tests.Benchmarks/Tests/ReadFromDelimitedTests.cs index dc28986a..124e77a6 100644 --- a/Tests/EtLast.Tests.Benchmarks/Tests/ReadFromDelimitedTests.cs +++ b/Tests/EtLast.Tests.Benchmarks/Tests/ReadFromDelimitedTests.cs @@ -34,7 +34,7 @@ public void SetupDelimited() Quote = '"', SinkProvider = new MemorySinkProvider() { - StreamCreator = () => _stream, + Stream = _stream, AutomaticallyDispose = true, }, Columns = new() @@ -52,7 +52,7 @@ public void SetupDelimited() Quote = '"', SinkProvider = new LocalFileSinkProvider() { - FileNameGenerator = _ => _file, + FileName = _file, ActionWhenFileExists = LocalSinkFileExistsAction.DeleteAndContinue, FileMode = FileMode.CreateNew, }, @@ -87,9 +87,9 @@ public void ReadFromDelimitedStream() var context = new EtlContext(null); var process = new DelimitedLineReader() { - StreamProvider = new MemoryStreamProvider() + StreamProvider = new OneMemoryStreamProvider() { - StreamCreator = () => stream, + Stream = stream, }, Columns = new() { diff --git a/Tests/EtLast.Tests.EPPlus/Tests/EpPlusExcelReaderTests.cs b/Tests/EtLast.Tests.EPPlus/Tests/EpPlusExcelReaderTests.cs index 09c0aa63..e593ee08 100644 --- a/Tests/EtLast.Tests.EPPlus/Tests/EpPlusExcelReaderTests.cs +++ b/Tests/EtLast.Tests.EPPlus/Tests/EpPlusExcelReaderTests.cs @@ -3,7 +3,7 @@ [TestClass] public class EpPlusExcelReaderTests { - private static EpPlusExcelReader GetReader(IEtlContext context, IStreamProvider streamProvider, string sheetName = null, int sheetIndex = -1, bool automaticallyTrimAllStringValues = true) + private static EpPlusExcelReader GetReader(IEtlContext context, IManyStreamProvider streamProvider, string sheetName = null, int sheetIndex = -1, bool automaticallyTrimAllStringValues = true) { return new EpPlusExcelReader() { @@ -65,7 +65,7 @@ public void PartitionedContentBySheetIndex() var context = TestExecuter.GetContext(); var reader = new EpPlusExcelReader() { - StreamProvider = new LocalDirectoryStreamProvider() + StreamProvider = new MultipleLocalFilesInDirectoryStreamProvider() { Directory = @".\TestData\", SearchPattern = "Partition*.xlsx" diff --git a/Tests/EtLast.Tests.EPPlus/Tests/EpPlusSimpleRowWriterTests.cs b/Tests/EtLast.Tests.EPPlus/Tests/EpPlusSimpleRowWriterTests.cs index 9ece7da0..34c10b10 100644 --- a/Tests/EtLast.Tests.EPPlus/Tests/EpPlusSimpleRowWriterTests.cs +++ b/Tests/EtLast.Tests.EPPlus/Tests/EpPlusSimpleRowWriterTests.cs @@ -43,7 +43,7 @@ public void OnePartitionWriteTest() ["lastChangedTime"] = new ExcelColumn().SetNumberFormat("yyyy-mm-dd hh:mm"), }, PartitionKeyGenerator = null, - SinkProvider = new LocalFileSinkProvider() + SinkProvider = new PartitionedLocalFileSinkProvider() { FileNameGenerator = partition => directory + "\\test.xlsx", ActionWhenFileExists = LocalSinkFileExistsAction.DeleteAndContinue, @@ -74,7 +74,7 @@ public void OnePartitionWriteTest() .ReadFromExcel(new EpPlusExcelReader() { SheetName = "person", - StreamProvider = new LocalDirectoryStreamProvider() + StreamProvider = new MultipleLocalFilesInDirectoryStreamProvider() { Directory = directory, SearchPattern = "*.xlsx", @@ -142,7 +142,7 @@ public void ManyPartitionWriteTest() ["lastChangedTime"] = new ExcelColumn().SetNumberFormat("yyyy-mm-dd hh:mm"), }, PartitionKeyGenerator = (row, index) => (index % 2).ToString("D", CultureInfo.InvariantCulture), - SinkProvider = new LocalFileSinkProvider() + SinkProvider = new PartitionedLocalFileSinkProvider() { FileNameGenerator = partition => directory + "\\test-" + partition + ".xlsx", ActionWhenFileExists = LocalSinkFileExistsAction.DeleteAndContinue, @@ -173,7 +173,7 @@ public void ManyPartitionWriteTest() .ReadFromExcel(new EpPlusExcelReader() { SheetName = "person", - StreamProvider = new LocalDirectoryStreamProvider() + StreamProvider = new MultipleLocalFilesInDirectoryStreamProvider() { Directory = directory, SearchPattern = "*.xlsx", diff --git a/Tests/EtLast.Tests.Unit/Tests/Delimited/WriteToDelimitedMutatorTests.cs b/Tests/EtLast.Tests.Unit/Tests/Delimited/WriteToDelimitedMutatorTests.cs index d2e5fc16..fd2f426d 100644 --- a/Tests/EtLast.Tests.Unit/Tests/Delimited/WriteToDelimitedMutatorTests.cs +++ b/Tests/EtLast.Tests.Unit/Tests/Delimited/WriteToDelimitedMutatorTests.cs @@ -37,7 +37,7 @@ public void TestSinkValidator() Columns = [], SinkProvider = new LocalFileSinkProvider() { - FileNameGenerator = null, // should throw an exception + FileName = null, // should throw an exception ActionWhenFileExists = LocalSinkFileExistsAction.Continue, FileMode = FileMode.Append, }, @@ -65,7 +65,7 @@ public void PersonWriterWithReaderTest() FormatProvider = CultureInfo.InvariantCulture, SinkProvider = new MemorySinkProvider() { - StreamCreator = () => outputStream, + Stream = outputStream, AutomaticallyDispose = false, }, Columns = new() @@ -101,9 +101,9 @@ public void PersonWriterWithReaderTest() builder = SequenceBuilder.Fluent .ReadDelimitedLines(new DelimitedLineReader() { - StreamProvider = new MemoryStreamProvider() + StreamProvider = new OneMemoryStreamProvider() { - StreamCreator = () => outputStream, + Stream = outputStream, }, Columns = new() { @@ -155,7 +155,7 @@ public void NewLineTest() FormatProvider = CultureInfo.InvariantCulture, SinkProvider = new MemorySinkProvider() { - StreamCreator = () => outputStream, + Stream = outputStream, AutomaticallyDispose = false, }, Columns = new()