Skip to content

Commit

Permalink
IOneStreamProvider, IOneSinkProvider and implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
wickedmachinator committed Jan 19, 2024
1 parent 206a081 commit 5f91f42
Show file tree
Hide file tree
Showing 33 changed files with 1,191 additions and 423 deletions.
2 changes: 1 addition & 1 deletion EtLast.EPPlus/Mutators/EpPlusSimpleRowWriterMutator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
public sealed class EpPlusSimpleRowWriterMutator : AbstractMutator, IRowSink
{
[ProcessParameterMustHaveValue]
public required ISinkProvider SinkProvider { get; init; }
public required IPartitionedSinkProvider SinkProvider { get; init; }

[ProcessParameterMustHaveValue]
public required string SheetName { get; init; }
Expand Down
2 changes: 1 addition & 1 deletion EtLast.EPPlus/Readers/EpPlusExcelReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
public sealed class EpPlusExcelReader : AbstractEpPlusExcelReader
{
[ProcessParameterMustHaveValue]
public required IStreamProvider StreamProvider { get; init; }
public required IManyStreamProvider StreamProvider { get; init; }

/// <summary>
/// First stream index is (integer) 0
Expand Down
2 changes: 1 addition & 1 deletion EtLast.EPPlus/Readers/EpPlusExcelSheetListReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
public sealed class EpPlusExcelSheetListReader : AbstractRowSource
{
[ProcessParameterMustHaveValue]
public required IStreamProvider StreamProvider { get; init; }
public required IManyStreamProvider StreamProvider { get; init; }

/// <summary>
/// Default value is "Stream".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public static IFlow SerializeToLocalJsonFile<T>(this IFlow builder, string targe
Encoding = customEncoding ?? Encoding.UTF8,
SinkProvider = new LocalFileSinkProvider()
{
FileNameGenerator = _ => targetFileName,
FileName = targetFileName,
ActionWhenFileExists = LocalSinkFileExistsAction.DeleteAndContinue,
FileMode = FileMode.CreateNew,
},
Expand Down
50 changes: 23 additions & 27 deletions EtLast.LocalFiles/Streams/LocalFileSinkProvider.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
namespace FizzCode.EtLast;

public enum LocalSinkFileExistsAction { Continue, Exception, DeleteAndContinue }

[ContainsProcessParameterValidation]
public class LocalFileSinkProvider : ISinkProvider
public class LocalFileSinkProvider : IOneSinkProvider
{
/// <summary>
/// Generates file name based on a partition key.
/// Generates file name.
/// </summary>
[ProcessParameterMustHaveValue]
public required Func<string, string> FileNameGenerator { get; init; }
public required string FileName { get; init; }

/// <summary>
/// Default value is <see cref="LocalSinkFileExistsAction.Exception"/>.
Expand All @@ -33,26 +31,24 @@ public class LocalFileSinkProvider : ISinkProvider

public bool AutomaticallyDispose => true;

public NamedSink GetSink(IProcess caller, string partitionKey, string sinkFormat, string[] columns)
public NamedSink GetSink(IProcess caller, string sinkFormat, string[] columns)
{
var fileName = FileNameGenerator.Invoke(partitionKey);

var ioCommand = caller.Context.RegisterIoCommand(new IoCommand()
{
Process = caller,
Kind = IoCommandKind.fileWrite,
Location = Path.GetDirectoryName(fileName),
Path = Path.GetFileName(fileName),
Location = Path.GetDirectoryName(FileName),
Path = Path.GetFileName(FileName),
Message = "writing to local file",
});

if (ActionWhenFileExists != LocalSinkFileExistsAction.Continue && File.Exists(fileName))
if (ActionWhenFileExists != LocalSinkFileExistsAction.Continue && File.Exists(FileName))
{
if (ActionWhenFileExists == LocalSinkFileExistsAction.Exception)
{
var exception = new LocalFileWriteException(caller, "local file already exist", fileName);
var exception = new LocalFileWriteException(caller, "local file already exist", FileName);
exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "local file already exist: {0}",
fileName));
FileName));

ioCommand.AffectedDataCount = 0;
ioCommand.Failed(exception);
Expand All @@ -62,14 +58,14 @@ public NamedSink GetSink(IProcess caller, string partitionKey, string sinkFormat
{
try
{
File.Delete(fileName);
File.Delete(FileName);
}
catch (Exception ex)
{
var exception = new LocalFileWriteException(caller, "error while writing local file / file deletion failed", fileName, ex);
var exception = new LocalFileWriteException(caller, "error while writing local file / file deletion failed", FileName, ex);
exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while writing local file: {0}, file deletion failed, message: {1}",
fileName, ex.Message));
exception.Data["FileName"] = fileName;
FileName, ex.Message));
exception.Data["FileName"] = FileName;

ioCommand.AffectedDataCount = 0;
ioCommand.Failed(exception);
Expand All @@ -78,7 +74,7 @@ public NamedSink GetSink(IProcess caller, string partitionKey, string sinkFormat
}
}

var directory = Path.GetDirectoryName(fileName);
var directory = Path.GetDirectoryName(FileName);
if (!string.IsNullOrEmpty(directory) && !Directory.Exists(directory))
{
try
Expand All @@ -87,10 +83,10 @@ public NamedSink GetSink(IProcess caller, string partitionKey, string sinkFormat
}
catch (Exception ex)
{
var exception = new LocalFileWriteException(caller, "error while writing local file / directory creation failed", fileName, ex);
var exception = new LocalFileWriteException(caller, "error while writing local file / directory creation failed", FileName, ex);
exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while writing local file: {0}, directory creation failed, message: {1}",
fileName, ex.Message));
exception.Data["FileName"] = fileName;
FileName, ex.Message));
exception.Data["FileName"] = FileName;
exception.Data["Directory"] = directory;

ioCommand.AffectedDataCount = 0;
Expand All @@ -101,17 +97,17 @@ public NamedSink GetSink(IProcess caller, string partitionKey, string sinkFormat

try
{
var sink = caller.Context.GetSink(Path.GetDirectoryName(fileName), Path.GetFileName(fileName), sinkFormat, caller, columns);
var sink = caller.Context.GetSink(Path.GetDirectoryName(FileName), Path.GetFileName(FileName), sinkFormat, caller, columns);

var stream = new FileStream(fileName, FileMode, FileAccess, FileShare);
return new NamedSink(fileName, stream, ioCommand, sink);
var stream = new FileStream(FileName, FileMode, FileAccess, FileShare);
return new NamedSink(FileName, stream, ioCommand, sink);
}
catch (Exception ex)
{
var exception = new LocalFileWriteException(caller, "error while writing local file", fileName, ex);
var exception = new LocalFileWriteException(caller, "error while writing local file", FileName, ex);
exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while writing local file: {0}, message: {1}",
fileName, ex.Message));
exception.Data["FileName"] = fileName;
FileName, ex.Message));
exception.Data["FileName"] = FileName;

ioCommand.AffectedDataCount = 0;
ioCommand.Failed(exception);
Expand Down
19 changes: 11 additions & 8 deletions EtLast.LocalFiles/Streams/LocalFileStreamProvider.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
namespace FizzCode.EtLast;

[ContainsProcessParameterValidation]
public class LocalFileStreamProvider : IStreamProvider
public class LocalFileStreamProvider : IManyStreamProvider, IOneStreamProvider
{
[ProcessParameterMustHaveValue]
public required string FileName { get; init; }
Expand Down Expand Up @@ -33,7 +33,7 @@ public string GetTopic()
: null;
}

public IEnumerable<NamedStream> GetStreams(IProcess caller)
public NamedStream GetStream(IProcess caller)
{
var ioCommand = caller.Context.RegisterIoCommand(new IoCommand()
{
Expand All @@ -59,16 +59,13 @@ public IEnumerable<NamedStream> GetStreams(IProcess caller)

ioCommand.AffectedDataCount = 0;
ioCommand.End();
return Enumerable.Empty<NamedStream>();
return null;
}

try
{
var stream = new FileStream(FileName, Options);
return new[]
{
new NamedStream(FileName, stream, ioCommand),
};
return new NamedStream(FileName, stream, ioCommand);
}
catch (Exception ex)
{
Expand All @@ -82,4 +79,10 @@ public IEnumerable<NamedStream> GetStreams(IProcess caller)
throw exception;
}
}
}

public IEnumerable<NamedStream> GetStreams(IProcess caller)
{
var stream = GetStream(caller);
return new[] { stream };
}
}
Loading

0 comments on commit 5f91f42

Please sign in to comment.