Skip to content

Commit f97e7f7

Browse files
Merge pull request #19 from FizzcodeSoftware/dev
Dev
2 parents b67a6bd + 206a081 commit f97e7f7

File tree

7 files changed

+118
-4
lines changed

7 files changed

+118
-4
lines changed

EtLast.AdoNet/AdoNetDbReader/AbstractAdoNetDbReader.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ public abstract class AbstractAdoNetDbReader : AbstractRowSource
3636
/// </summary>
3737
public bool InlineArrayParameters { get; init; } = true;
3838

39+
/// <summary>
40+
/// If initialized with a dictionary, then all the columns returned by the ADO.NET connector based on the given query will be stored in it.
41+
/// Key is the column name in the produced row, value is the exact data type of the field.
42+
/// </summary>
43+
public Dictionary<string, Type> ColumnTypeMap = null;
44+
3945
protected abstract CommandType GetCommandType();
4046

4147
protected AbstractAdoNetDbReader()
@@ -155,6 +161,9 @@ protected override IEnumerable<IRow> Produce()
155161
Config = null,
156162
};
157163
}
164+
165+
if (ColumnTypeMap != null && columns[i] != null)
166+
ColumnTypeMap[columns[i].NameInRow] = reader.GetFieldType(i);
158167
}
159168

160169
while (!FlowState.IsTerminating)

EtLast.AdoNet/Jobs/AbstractSqlStatementWithResult.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
[EditorBrowsable(EditorBrowsableState.Never)]
44
public abstract class AbstractSqlStatementWithResult<T> : AbstractProcessWithResult<T>
55
{
6+
[ProcessParameterMustHaveValue]
67
public required NamedConnectionString ConnectionString { get; set; }
78

89
/// <summary>
@@ -22,8 +23,6 @@ protected AbstractSqlStatementWithResult()
2223

2324
protected override void ValidateImpl()
2425
{
25-
if (ConnectionString == null)
26-
throw new ProcessParameterNullException(this, nameof(ConnectionString));
2726
}
2827

2928
protected sealed override T ExecuteImpl()
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace FizzCode.EtLast;
2+
3+
[ComVisible(true)]
4+
[Serializable]
5+
public class JsonDeserializerException(IProcess process, string message, Exception innerException)
6+
: EtlException(process, message, innerException)
7+
{
8+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace FizzCode.EtLast;
2+
3+
[ComVisible(true)]
4+
[Serializable]
5+
public class JsonSerializerException(IProcess process, string message, Exception innerException)
6+
: EtlException(process, message, innerException)
7+
{
8+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
namespace FizzCode.EtLast;
2+
3+
public sealed class DeserializeFromJsonFileJob<T> : AbstractProcessWithResult<IEnumerable<T>>
4+
{
5+
[ProcessParameterMustHaveValue]
6+
public required IStreamProvider StreamProvider { get; init; }
7+
8+
public Encoding Encoding { get; init; } = Encoding.UTF8;
9+
10+
public JsonSerializerOptions SerializerOptions { get; init; } = new JsonSerializerOptions()
11+
{
12+
WriteIndented = true,
13+
};
14+
15+
protected override void ValidateImpl()
16+
{
17+
}
18+
19+
protected override IEnumerable<T> ExecuteImpl()
20+
{
21+
var streams = StreamProvider.GetStreams(this);
22+
if (streams == null)
23+
yield break;
24+
25+
var streamIndex = 0;
26+
foreach (var stream in streams)
27+
{
28+
T result = default;
29+
30+
try
31+
{
32+
using (var reader = new StreamReader(stream.Stream, Encoding))
33+
{
34+
var content = reader.ReadToEnd();
35+
result = JsonSerializer.Deserialize<T>(content, SerializerOptions);
36+
}
37+
38+
stream.IoCommand.AffectedDataCount += 1;
39+
stream.IoCommand.End();
40+
}
41+
catch (Exception ex)
42+
{
43+
var exception = new JsonDeserializerException(this, "error while deserializing a json file", ex);
44+
exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while deserializing a json file: {0}", stream.Name));
45+
exception.Data["StreamName"] = stream.Name;
46+
exception.Data["StreamIndex"] = streamIndex;
47+
48+
stream.IoCommand.Failed(exception);
49+
throw exception;
50+
}
51+
finally
52+
{
53+
stream.Stream.Flush();
54+
stream.Stream.Close();
55+
stream.Stream.Dispose();
56+
}
57+
58+
yield return result;
59+
streamIndex++;
60+
}
61+
}
62+
}
63+
64+
[EditorBrowsable(EditorBrowsableState.Never)]
65+
public static class DeserializeFromJsonFileJobFluent
66+
{
67+
public static IFlow DeserializeFromJsonFile<TResult>(this IFlow builder, out IEnumerable<TResult> result, Func<DeserializeFromJsonFileJob<TResult>> processCreator)
68+
{
69+
return builder.ExecuteProcessWithResult(out result, processCreator);
70+
}
71+
}

EtLast/Processes/Json/SerializeToJsonFileJob.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,18 @@ protected override void ExecuteImpl(Stopwatch netTimeStopwatch)
2424

2525
namedSink.Stream.Write(Encoding.GetBytes(content));
2626
namedSink.IncreaseRowsWritten();
27+
28+
namedSink.IoCommand.AffectedDataCount += 1;
29+
namedSink.IoCommand.End();
30+
}
31+
catch (Exception ex)
32+
{
33+
var exception = new JsonSerializerException(this, "error while serializing into a json file", ex);
34+
exception.AddOpsMessage(string.Format(CultureInfo.InvariantCulture, "error while serializing into a json file: {0}", namedSink.Name));
35+
exception.Data["StreamName"] = namedSink.Name;
36+
37+
namedSink.IoCommand.Failed(exception);
38+
throw exception;
2739
}
2840
finally
2941
{
@@ -32,4 +44,13 @@ protected override void ExecuteImpl(Stopwatch netTimeStopwatch)
3244
namedSink.Stream.Dispose();
3345
}
3446
}
47+
}
48+
49+
[EditorBrowsable(EditorBrowsableState.Never)]
50+
public static class SerializeToJsonFileJobFluent
51+
{
52+
public static IFlow SerializeToJsonFile<T>(this IFlow builder, Func<SerializeToJsonFileJob<T>> processCreator)
53+
{
54+
return builder.ExecuteProcess(processCreator);
55+
}
3556
}

Samples/Sample.Etl.Modules/Modules/SampleModule/Startup.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,4 @@ public void BuildSession(ISessionBuilder session, IArgumentCollection arguments)
1111
.LogOpsToFile()
1212
.LogIoToFile();
1313
}
14-
15-
public Dictionary<string, Func<IArgumentCollection, IEtlTask>> CustomTasks => [];
1614
}

0 commit comments

Comments
 (0)