Skip to content

Commit 72afe39

Browse files
Merge pull request #201 from FizzCodeSoftware/dev
page compression in SBT
2 parents 3a62607 + b13ed85 commit 72afe39

File tree

3 files changed

+174
-42
lines changed

3 files changed

+174
-42
lines changed

EtLast/Processes/StructuredBinaryTable/StructuredBinaryTableReader.cs

Lines changed: 86 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
namespace FizzCode.EtLast;
1+
using System.Buffers.Binary;
2+
using System.IO.Compression;
3+
4+
namespace FizzCode.EtLast;
25

36
public sealed class StructuredBinaryTableReader : AbstractRowSource
47
{
@@ -37,6 +40,8 @@ protected override IEnumerable<IRow> Produce()
3740
? AddRowIndexToColumn
3841
: null;
3942

43+
var lenBuffer = new byte[4];
44+
4045
var streamIndex = 0;
4146
foreach (var stream in streams)
4247
{
@@ -46,52 +51,101 @@ protected override IEnumerable<IRow> Produce()
4651
if (FlowState.IsTerminating)
4752
break;
4853

49-
BinaryReader reader = null;
5054
try
5155
{
52-
reader = new BinaryReader(stream.Stream, Encoding.UTF8, leaveOpen: true);
56+
int columnCount;
57+
string[] columnNames;
58+
string[] columnTypeNames;
59+
BinaryTypeCode[] columnTypeCodes;
5360

54-
var formatVersion = reader.Read7BitEncodedInt();
61+
stream.Stream.ReadExactly(lenBuffer, 0, 4);
62+
var len = BinaryPrimitives.ReadInt32LittleEndian(lenBuffer);
63+
var buffer = new byte[len];
64+
stream.Stream.ReadExactly(buffer, 0, len);
5565

56-
var columnCount = reader.Read7BitEncodedInt();
57-
var columnNames = new string[columnCount];
58-
var columnTypeNames = new string[columnCount];
59-
var columnTypeCodes = new BinaryTypeCode[columnCount];
60-
for (var i = 0; i < columnCount; i++)
66+
using (var headerUncompressed = new MemoryStream())
6167
{
62-
columnNames[i] = reader.ReadString();
63-
columnTypeNames[i] = reader.ReadString();
64-
columnTypeCodes[i] = (BinaryTypeCode)reader.ReadByte();
68+
using (var bufferMs = new MemoryStream(buffer))
69+
{
70+
using (var ds = new DeflateStream(bufferMs, CompressionMode.Decompress, leaveOpen: true))
71+
{
72+
ds.CopyTo(headerUncompressed);
73+
}
74+
}
75+
76+
headerUncompressed.Position = 0;
77+
78+
var headerReader = new BinaryReader(headerUncompressed, Encoding.UTF8, leaveOpen: false);
79+
80+
var formatVersion = headerReader.Read7BitEncodedInt();
81+
82+
columnCount = headerReader.Read7BitEncodedInt();
83+
columnNames = new string[columnCount];
84+
columnTypeNames = new string[columnCount];
85+
columnTypeCodes = new BinaryTypeCode[columnCount];
86+
for (var i = 0; i < columnCount; i++)
87+
{
88+
columnNames[i] = headerReader.ReadString();
89+
columnTypeNames[i] = headerReader.ReadString();
90+
columnTypeCodes[i] = (BinaryTypeCode)headerReader.ReadByte();
91+
}
92+
93+
headerReader.Dispose();
6594
}
6695

6796
while (stream.Stream.Position < stream.Stream.Length)
6897
{
69-
for (var i = 0; i < columnCount; i++)
98+
stream.Stream.ReadExactly(lenBuffer, 0, 4);
99+
len = BinaryPrimitives.ReadInt32LittleEndian(lenBuffer);
100+
buffer = new byte[len];
101+
stream.Stream.ReadExactly(buffer, 0, len);
102+
103+
using (var ums = new MemoryStream())
70104
{
71-
var typeCode = (BinaryTypeCode)reader.ReadByte();
72-
if (typeCode != BinaryTypeCode._null)
105+
using (var bufferMs = new MemoryStream(buffer))
106+
using (var ds = new DeflateStream(bufferMs, CompressionMode.Decompress, leaveOpen: true))
107+
ds.CopyTo(ums);
108+
109+
ums.Position = 0;
110+
111+
var reader = new BinaryReader(ums, Encoding.UTF8, leaveOpen: false);
112+
try
73113
{
74-
var value = BinaryTypeCodeEncoder.Read(reader, typeCode);
75-
initialValues[columnNames[i]] = value;
114+
while (ums.Position < ums.Length)
115+
{
116+
for (var i = 0; i < columnCount; i++)
117+
{
118+
var typeCode = (BinaryTypeCode)reader.ReadByte();
119+
if (typeCode != BinaryTypeCode._null)
120+
{
121+
var value = BinaryTypeCodeEncoder.Read(reader, typeCode);
122+
initialValues[columnNames[i]] = value;
123+
}
124+
else
125+
{
126+
initialValues[columnNames[i]] = null;
127+
}
128+
}
129+
130+
if (addStreamIndex != null)
131+
initialValues[addStreamIndex] = streamIndex;
132+
133+
if (addRowIndex != null)
134+
initialValues[addRowIndex] = rowCount;
135+
136+
rowCount++;
137+
yield return Context.CreateRow(this, initialValues);
138+
initialValues.Clear();
139+
140+
if (FlowState.IsTerminating)
141+
break;
142+
}
76143
}
77-
else
144+
finally
78145
{
79-
initialValues[columnNames[i]] = null;
146+
reader.Dispose();
80147
}
81148
}
82-
83-
if (addStreamIndex != null)
84-
initialValues[addStreamIndex] = streamIndex;
85-
86-
if (addRowIndex != null)
87-
initialValues[addRowIndex] = rowCount;
88-
89-
rowCount++;
90-
yield return Context.CreateRow(this, initialValues);
91-
initialValues.Clear();
92-
93-
if (FlowState.IsTerminating)
94-
break;
95149
}
96150
}
97151
finally
@@ -101,7 +155,6 @@ protected override IEnumerable<IRow> Produce()
101155
stream.IoCommand.AffectedDataCount += rowCount;
102156
stream.IoCommand.End();
103157
stream.Close();
104-
reader?.Dispose();
105158
}
106159
}
107160

EtLast/Processes/StructuredBinaryTable/WriteToStructuredBinaryTableMutator.cs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
namespace FizzCode.EtLast;
1+
using System.Buffers.Binary;
2+
using System.IO.Compression;
3+
4+
namespace FizzCode.EtLast;
25

36
public sealed class WriteToStructuredBinaryTableMutator : AbstractMutator, IRowSink
47
{
@@ -8,7 +11,7 @@ public sealed class WriteToStructuredBinaryTableMutator : AbstractMutator, IRowS
811
/// <summary>
912
/// Default value is 10000
1013
/// </summary>
11-
public int BatchSize { get; init; } = 10000;
14+
public int BufferSizeBytes { get; init; } = 10000;
1215

1316
private SinkEntry _sinkEntry;
1417

@@ -114,7 +117,7 @@ protected override IEnumerable<IRow> MutateRow(IRow row, long rowInputIndex)
114117

115118
sinkEntry.RowCount++;
116119

117-
if (sinkEntry.RowCount >= BatchSize)
120+
if (sinkEntry.Buffer.Length >= BufferSizeBytes)
118121
WriteBuffer();
119122
}
120123
catch (Exception ex)
@@ -129,14 +132,29 @@ protected override IEnumerable<IRow> MutateRow(IRow row, long rowInputIndex)
129132

130133
private void WriteBuffer()
131134
{
132-
if (_sinkEntry.RowCount == 0)
135+
if (_sinkEntry.Buffer.Position == 0)
133136
return;
134137

135138
_sinkEntry.BufferWriter.Flush();
136139

137-
var data = _sinkEntry.Buffer.ToArray();
138-
_sinkEntry.NamedSink.Stream.Write(data, 0, data.Length);
139-
_sinkEntry.NamedSink.Sink.IncreaseBytes(data.Length);
140+
using (var targetMs = new MemoryStream())
141+
{
142+
using (var ds = new DeflateStream(targetMs, CompressionLevel.Fastest))
143+
{
144+
_sinkEntry.Buffer.Position = 0;
145+
_sinkEntry.Buffer.CopyTo(ds);
146+
}
147+
148+
var data = targetMs.ToArray();
149+
150+
var buffer = new byte[4];
151+
BinaryPrimitives.WriteInt32LittleEndian(buffer, data.Length);
152+
_sinkEntry.NamedSink.Stream.Write(buffer);
153+
154+
_sinkEntry.NamedSink.Stream.Write(data, 0, data.Length);
155+
_sinkEntry.NamedSink.Sink.IncreaseBytes(data.Length);
156+
}
157+
140158
_sinkEntry.RowCount = 0;
141159
_sinkEntry.Buffer.SetLength(0);
142160
}

Tests/EtLast.Tests.Unit/Tests/StructuredBinaryTable/StructuredBinaryTableTests.cs

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,6 @@ public void WriteThenReadBackTest_RandomValuesTest()
146146
[TestMethod]
147147
public void StreamDisposeTest()
148148
{
149-
var memoryStream = new MemoryStream();
150-
151149
var context = TestExecuter.GetContext();
152150

153151
var rows = TestData.Person().TakeRowsAndReleaseOwnership(null);
@@ -199,4 +197,67 @@ public void StreamDisposeTest()
199197

200198
File.Delete(fileName);
201199
}
200+
201+
[TestMethod]
202+
public void EndToEndTest()
203+
{
204+
var context = TestExecuter.GetContext();
205+
206+
var rows = TestData.Person().TakeRowsAndReleaseOwnership(null);
207+
208+
var rowCache = SequenceBuilder.Fluent
209+
.ReadFrom(TestData.Person())
210+
.BuildToInMemoryRowCache();
211+
212+
var fileName = Path.GetTempFileName();
213+
214+
SequenceBuilder.Fluent
215+
.ReadFromInMemoryRowCache(rowCache)
216+
.WriteToStructuredBinaryTable(new WriteToStructuredBinaryTableMutator()
217+
{
218+
DynamicColumns = () => new()
219+
{
220+
["id"] = typeof(int),
221+
["name"] = typeof(string),
222+
["age"] = typeof(int),
223+
["height"] = typeof(int),
224+
["eyeColor"] = typeof(string),
225+
["countryId"] = typeof(int),
226+
["birthDate"] = typeof(DateTime),
227+
["lastChangedTime"] = typeof(DateTime),
228+
},
229+
SinkProvider = new LocalFileSinkProvider()
230+
{
231+
Path = fileName,
232+
ActionWhenFileExists = LocalSinkFileExistsAction.Overwrite,
233+
FileMode = FileMode.Create,
234+
},
235+
})
236+
.Build()
237+
.Execute(context);
238+
239+
context = TestExecuter.GetContext();
240+
var builder = SequenceBuilder.Fluent
241+
.ReadStructuredBinaryTable(new StructuredBinaryTableReader()
242+
{
243+
StreamProvider = new LocalFileStreamProvider()
244+
{
245+
Path = fileName,
246+
},
247+
});
248+
249+
var result = TestExecuter.Execute(context, builder);
250+
Assert.AreEqual(7, result.MutatedRows.Count);
251+
Assert.That.ExactMatch(result.MutatedRows, [
252+
new() { ["id"] = 0, ["name"] = "A", ["age"] = 17, ["height"] = 160, ["eyeColor"] = "brown", ["countryId"] = 1, ["birthDate"] = new DateTime(2010, 12, 9, 0, 0, 0, 0), ["lastChangedTime"] = new DateTime(2015, 12, 19, 12, 0, 1, 0) },
253+
new() { ["id"] = 1, ["name"] = "B", ["age"] = 8, ["height"] = 190, ["eyeColor"] = null, ["countryId"] = 1, ["birthDate"] = new DateTime(2011, 2, 1, 0, 0, 0, 0), ["lastChangedTime"] = new DateTime(2015, 12, 19, 13, 2, 0, 0) },
254+
new() { ["id"] = 2, ["name"] = "C", ["age"] = 27, ["height"] = 170, ["eyeColor"] = "green", ["countryId"] = 2, ["birthDate"] = new DateTime(2014, 1, 21, 0, 0, 0, 0), ["lastChangedTime"] = new DateTime(2015, 11, 21, 17, 11, 58, 0) },
255+
new() { ["id"] = 3, ["name"] = "D", ["age"] = 39, ["height"] = 160, ["eyeColor"] = "fake", ["countryId"] = null, ["birthDate"] = "2018.07.11", ["lastChangedTime"] = new DateTime(2017, 8, 1, 4, 9, 1, 0) },
256+
new() { ["id"] = 4, ["name"] = "E", ["age"] = -3, ["height"] = 160, ["eyeColor"] = null, ["countryId"] = 1, ["birthDate"] = null, ["lastChangedTime"] = new DateTime(2019, 1, 1, 23, 59, 59, 0) },
257+
new() { ["id"] = 5, ["name"] = "A", ["age"] = 11, ["height"] = 140, ["eyeColor"] = null, ["countryId"] = null, ["birthDate"] = new DateTime(2013, 5, 15, 0, 0, 0, 0), ["lastChangedTime"] = new DateTime(2018, 1, 1, 0, 0, 0, 0) },
258+
new() { ["id"] = 6, ["name"] = "fake", ["age"] = null, ["height"] = 140, ["eyeColor"] = null, ["countryId"] = 5, ["birthDate"] = new DateTime(2018, 1, 9, 0, 0, 0, 0), ["lastChangedTime"] = null }]);
259+
Assert.AreEqual(0, result.Process.FlowState.Exceptions.Count);
260+
261+
File.Delete(fileName);
262+
}
202263
}

0 commit comments

Comments
 (0)