Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor XML parsing classes to support correlation paths and update … #511

Merged
merged 1 commit into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions src/Paillave.Etl.XmlFile/Core/XmlNodeParsed.cs
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;

namespace Paillave.Etl.XmlFile.Core
namespace Paillave.Etl.XmlFile.Core;

public class XmlNodeParsed
{
public class XmlNodeParsed
public XmlNodeParsed(string sourceName, string nodeDefinitionName, string nodePath, Type type, object value, IList<XmlNodeLevel> correlationKeys)
{
public XmlNodeParsed(string sourceName, string nodeDefinitionName, string nodePath, Type type, object value, IDictionary<Type, Guid> correlationKeys)
{
SourceName = sourceName;
NodeDefinitionName = nodeDefinitionName;
NodePath = nodePath;
Type = type;
Value = value;
CorrelationKeys = new ReadOnlyDictionary<Type, Guid>(correlationKeys);
}
public string SourceName { get; }
public string NodeDefinitionName { get; }
public string NodePath { get; }
public Type Type { get; }
public object Value { get; }
public T GetValue<T>() => (T)Value;
// public object[] ParentValues { get; internal set; }
// public T GetValue<T>(int level = 0) => (T)(level == 0 ? Value : ParentValues[level - 1]);
public ReadOnlyDictionary<Type, Guid> CorrelationKeys { get; }
SourceName = sourceName;
NodeDefinitionName = nodeDefinitionName;
NodePath = nodePath;
Type = type;
Value = value;
CorrelationKeys = new ReadOnlyDictionary<string, XmlNodeLevel>(correlationKeys.ToDictionary(ck => ck.Path));
}
public string SourceName { get; }
public string NodeDefinitionName { get; }
public string NodePath { get; }
public Type Type { get; }
public object Value { get; }
public T GetValue<T>() => (T)Value;
// public object[] ParentValues { get; internal set; }
// public T GetValue<T>(int level = 0) => (T)(level == 0 ? Value : ParentValues[level - 1]);
public IReadOnlyDictionary<string, XmlNodeLevel> CorrelationKeys { get; }
}
2 changes: 1 addition & 1 deletion src/Paillave.Etl.XmlFile/Core/XmlObjectReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private void ProcessEndOfNode(Stack<NodeLevel> nodes, string text, Action<XmlNod
else if (_xmlNodesDefinitionSearch.Contains(key))
{
var (value, nd) = CreateValue(sourceName, key);
pushResult(new XmlNodeParsed(sourceName, nd.Name, nd.NodePath, nd.Type, value, new Dictionary<Type, Guid>()));
pushResult(new XmlNodeParsed(sourceName, nd.Name, nd.NodePath, nd.Type, value, new List<XmlNodeLevel>()));
}
ProcessEndOfAnyNode(nodes);
}
Expand Down
31 changes: 16 additions & 15 deletions src/Paillave.Etl.XmlFile/Core/XmlObjectReaderV2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,14 @@ public XmlObjectReaderV2(XmlFileDefinition xmlFileDefinition, string sourceName,
}
private class XmlPath
{
private readonly struct NodeLevel
{
public NodeLevel(string node, Guid correlationId)
=> (Node, CorrelationId) = (node, correlationId);
public string Node { get; }
public Guid CorrelationId { get; }
}
private readonly Stack<NodeLevel> _nodes = new();
private readonly Stack<XmlNodeLevel> _nodes = new();
private string? _attribute = null;
public void UnStackAttribute() => _attribute = null;
public void StackAttribute(string attribute) => _attribute = attribute;
public void StackNode(string node) => _nodes.Push(new NodeLevel(node, Guid.NewGuid()));
public void StackNode(string node) => _nodes.Push(new XmlNodeLevel(node, Guid.NewGuid(), $"{GetPath()}/{node}"));
public void UnStackNode() => _nodes.Pop();
public string GetPath() => $"/{string.Join("/", _nodes.Select((i) => i.Node).Reverse())}{(_attribute == null ? "" : $"/@{_attribute}")}";
public HashSet<Guid> GetCorrelationKeys() => _nodes.Select(i => i.CorrelationId).ToHashSet();
public IList<XmlNodeLevel> GetCorrelationKeys() => _nodes.Reverse().ToList();
public override string ToString() => GetPath();
}
private class NodePropertyBags
Expand All @@ -56,9 +49,9 @@ public void StartNewNode(string key)
if (_propertyBags.TryGetValue(key, out var propertyBag))
propertyBag.ResetValues();
}
public void EndNode(string key)
public void EndNode(XmlPath xmlPath)
{
if (_propertyBags.TryGetValue(key, out var propertyBag))
if (_propertyBags.TryGetValue(xmlPath.ToString(), out var propertyBag))
{
var value = propertyBag.CreateRow();
_pushResult(new XmlNodeParsed(
Expand All @@ -67,7 +60,7 @@ public void EndNode(string key)
propertyBag.XmlNodeDefinition.NodePath,
propertyBag.XmlNodeDefinition.Type,
value,
new Dictionary<Type, Guid>()));
xmlPath.GetCorrelationKeys()));
}
}
}
Expand Down Expand Up @@ -146,13 +139,13 @@ public void Read(Stream fileStream, CancellationToken cancellationToken)
}
if (isEmptyElement)
{
_nodePropertyBags.EndNode(xmlPath.ToString());
_nodePropertyBags.EndNode(xmlPath);
xmlPath.UnStackNode();
}
break;
case XmlNodeType.EndElement:
_nodePropertyBags.SetValue(xmlPath.ToString(), lastTextValue);
_nodePropertyBags.EndNode(xmlPath.ToString());
_nodePropertyBags.EndNode(xmlPath);
lastTextValue = null;
xmlPath.UnStackNode();
break;
Expand All @@ -163,3 +156,11 @@ public void Read(Stream fileStream, CancellationToken cancellationToken)
}
}
}
public readonly struct XmlNodeLevel
{
public XmlNodeLevel(string node, Guid correlationId, string path)
=> (Node, CorrelationId, Path) = (node, correlationId, path);
public string Node { get; }
public Guid CorrelationId { get; }
public string Path { get; }
}
7 changes: 4 additions & 3 deletions src/Paillave.Etl.XmlFile/XmlFile.Stream.ex.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ public static IStream<T> XmlNodeOfType<T>(this IStream<XmlNodeParsed> stream, st
NodeDefinitionName = nodeDefinitionName
}).Output;
}
public static IStream<Correlated<T>> XmlNodeOfTypeCorrelated<T>(this IStream<XmlNodeParsed> stream, string name, string nodeDefinitionName = null)
public static IStream<Correlated<T>> XmlNodeOfTypeCorrelated<T>(this IStream<XmlNodeParsed> stream, string name, string correlationPath, string? nodeDefinitionName = null)
{
return new XmlNodeOfTypeCorrelatedStreamNode<T>(name, new XmlNodeOfTypeFileArgs<T>
return new XmlNodeOfTypeCorrelatedStreamNode<T>(name, new XmlNodeOfTypeCorrelatedFileArgs<T>
{
MainStream = stream,
NodeDefinitionName = nodeDefinitionName
NodeDefinitionName = nodeDefinitionName,
CorrelationPath = correlationPath
}).Output;
}
}
Expand Down
27 changes: 19 additions & 8 deletions src/Paillave.Etl.XmlFile/XmlNodeOfTypeStreamNode.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Paillave.Etl.Core;
using System.Linq;
using Paillave.Etl.Core;
using Paillave.Etl.Reactive.Operators;
using Paillave.Etl.XmlFile.Core;

Expand All @@ -7,7 +8,7 @@ namespace Paillave.Etl.XmlFile
public class XmlNodeOfTypeFileArgs<TOut>
{
public IStream<XmlNodeParsed> MainStream { get; set; }
public string NodeDefinitionName { get; set; }
public string? NodeDefinitionName { get; set; }
}
public class XmlNodeOfTypeStreamNode<TOut> : StreamNodeBase<TOut, IStream<TOut>, XmlNodeOfTypeFileArgs<TOut>>
{
Expand All @@ -24,22 +25,32 @@ protected override IStream<TOut> CreateOutputStream(XmlNodeOfTypeFileArgs<TOut>
return CreateUnsortedStream(obs.Map(i => (TOut)i.Value));
}
}
public class XmlNodeOfTypeCorrelatedStreamNode<TOut> : StreamNodeBase<Correlated<TOut>, IStream<Correlated<TOut>>, XmlNodeOfTypeFileArgs<TOut>>

public class XmlNodeOfTypeCorrelatedFileArgs<TOut>
{
public IStream<XmlNodeParsed> MainStream { get; set; }
public string? NodeDefinitionName { get; set; }
public string CorrelationPath { get; set; }
}
public class XmlNodeOfTypeCorrelatedStreamNode<TOut> : StreamNodeBase<Correlated<TOut>, IStream<Correlated<TOut>>, XmlNodeOfTypeCorrelatedFileArgs<TOut>>
{
public override ProcessImpact PerformanceImpact => ProcessImpact.Light;

public override ProcessImpact MemoryFootPrint => ProcessImpact.Light;
public XmlNodeOfTypeCorrelatedStreamNode(string name, XmlNodeOfTypeFileArgs<TOut> args) : base(name, args) { }
protected override IStream<Correlated<TOut>> CreateOutputStream(XmlNodeOfTypeFileArgs<TOut> args)
public XmlNodeOfTypeCorrelatedStreamNode(string name, XmlNodeOfTypeCorrelatedFileArgs<TOut> args) : base(name, args) { }
protected override IStream<Correlated<TOut>> CreateOutputStream(XmlNodeOfTypeCorrelatedFileArgs<TOut> args)
{
var type = typeof(TOut);
var obs = args.MainStream.Observable.Filter(i => i.Type == type);
if (args.NodeDefinitionName != null)
obs = obs.Filter(i => i.NodeDefinitionName == args.NodeDefinitionName);
return CreateUnsortedStream(obs.Map(i => new Correlated<TOut>
return CreateUnsortedStream(obs.Map(i =>
{
CorrelationKeys = default,
Row = (TOut)i.Value
return new Correlated<TOut>
{
CorrelationKeys = new[] { i.CorrelationKeys[args.CorrelationPath].CorrelationId }.ToHashSet(),
Row = (TOut)i.Value
};
}));
}
}
Expand Down
Loading
Loading