diff --git a/src/Propulsion/Propulsion.fsproj b/src/Propulsion/Propulsion.fsproj index 0e9819d8..df2ce906 100644 --- a/src/Propulsion/Propulsion.fsproj +++ b/src/Propulsion/Propulsion.fsproj @@ -6,7 +6,7 @@ net6.0 true - 3.0.0-rc.14.3 + diff --git a/src/Propulsion/StreamFilter.fs b/src/Propulsion/StreamFilter.fs index b9a4a7bf..7d327572 100644 --- a/src/Propulsion/StreamFilter.fs +++ b/src/Propulsion/StreamFilter.fs @@ -36,11 +36,11 @@ type StreamFilter([] allowCats, [] denyCats, [] al let denyCats = if includeSystem_ then denyCats else Array.append denyCats [| "^\$" |] let allowSns, denySns = match allowSns, denySns with [||], [||] -> [|".*"|], [||] | x -> x let allowEts, denyEts = match allowEts, denyEts with [||], [||] -> [|".*"|], [||] | x -> x - log.Value.Information("Categories ☑️ {@allowCats} 🚫{@denyCats} Streams ☑️ {@allowStreams} 🚫{denyStreams} Events ☑️ {allowEts} 🚫{@denyEts}", + log.Value.Information("Categories ✅{@allowCats} 🚫{@denyCats} Streams ✅{@allowStreams} 🚫{denyStreams} Events ✅{allowEts} 🚫{@denyEts}", asRe allowCats, asRe denyCats, asRe allowSns, asRe denySns, asRe allowEts, asRe denyEts) fun sn -> validCat sn && validStream sn && (includeSystem || isTransactionalStream sn) - member val EventFilter = filter (fun (x: Propulsion.Sinks.Event) -> x.EventType) (allowEts, denyEts) + member _.CreateEventFilter<'EventBody>() = filter (fun (x: FsCodec.ITimelineEvent<'EventBody>) -> x.EventType) (allowEts, denyEts) diff --git a/tools/Propulsion.Tool/Sync.fs b/tools/Propulsion.Tool/Sync.fs index e3a1c682..2da43236 100644 --- a/tools/Propulsion.Tool/Sync.fs +++ b/tools/Propulsion.Tool/Sync.fs @@ -260,7 +260,8 @@ let run appName (c: Args.Configuration, p: ParseResults) = async { |> Propulsion.Codec.NewtonsoftJson.Serdes.Serialize do! producer.ProduceAsync(FsCodec.StreamName.toString stream, json) |> Async.Ignore return Outcome.render_ stream ham spam 0, Propulsion.Sinks.Events.next events } - Propulsion.Sinks.Factory.StartConcurrent(Log.Logger, maxReadAhead, maxConcurrentProcessors, handle a.Filters.EventFilter, stats, + let eventFilter = a.Filters.CreateEventFilter() + Propulsion.Sinks.Factory.StartConcurrent(Log.Logger, maxReadAhead, maxConcurrentProcessors, handle eventFilter, stats, requireAll = requireAll) | SubCommand.Sync sa -> let eventsContext = sa.ConnectEvents() |> Async.RunSynchronously