Skip to content

Commit

Permalink
Target Equinox 4.0.0-rc.13
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Aug 24, 2023
1 parent d4a079a commit d873a31
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 51 deletions.
4 changes: 2 additions & 2 deletions src/Propulsion.CosmosStore/ChangeFeedProcessor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ type ChangeFeedProcessor =
let leaseRenewInterval = defaultArg leaseRenewInterval (TimeSpan.FromSeconds 3.)
let leaseTtl = defaultArg leaseTtl (TimeSpan.FromSeconds 10.)

log.Information("ChangeFeed {processorName} Lease acquire {leaseAcquireIntervalS:n0}s ttl {ttlS:n0}s renew {renewS:n0}s feedPollDelay {feedPollDelayS:n0}s Items limit {maxItems}",
processorName, leaseAcquireInterval.TotalSeconds, leaseTtl.TotalSeconds, leaseRenewInterval.TotalSeconds, feedPollDelay.TotalSeconds, Option.toNullable maxItems)
log.Information("ChangeFeed {processorName} Lease acquire {leaseAcquireIntervalS:n0}s ttl {ttlS:n0}s renew {renewS:n0}s feedPollDelay {feedPollDelayS:n0}s Items limit {maxItems} fromTail {fromTail}",
processorName, leaseAcquireInterval.TotalSeconds, leaseTtl.TotalSeconds, leaseRenewInterval.TotalSeconds, feedPollDelay.TotalSeconds, Option.toNullable maxItems, defaultArg startFromTail false)
let processorName_ = processorName + ":"
let leaseTokenToPartitionId (leaseToken : string) = int (leaseToken.Trim[|'"'|])
let processor =
Expand Down
3 changes: 2 additions & 1 deletion src/Propulsion.CosmosStore/CosmosStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,12 @@ type CosmosStoreSource =
processorName, count, total, lagged, synced)
return! Async.Sleep(TimeSpan.toMs interval) }
let maybeLogLag = lagReportFreq |> Option.map logLag
let startFromTail = defaultArg startFromTail false
let source =
ChangeFeedProcessor.Start
( log, monitored, leases, processorName, observer, ?notifyError=notifyError, ?customize=customize,
?maxItems = maxItems, ?feedPollDelay = tailSleepInterval, ?reportLagAndAwaitNextEstimation = maybeLogLag,
startFromTail = defaultArg startFromTail false,
startFromTail = startFromTail,
leaseAcquireInterval = TimeSpan.FromSeconds 5., leaseRenewInterval = TimeSpan.FromSeconds 5., leaseTtl = TimeSpan.FromSeconds 10.)
lagReportFreq |> Option.iter (fun s -> log.Information("ChangeFeed {processorName} Lag stats interval {lagReportIntervalS:n0}s", processorName, s.TotalSeconds))
source
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.12.21" />
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-rc.13" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.10.1" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-rc.12.21" />
<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-rc.13" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.10.1" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStore/Propulsion.EventStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.EventStore" Version="4.0.0-rc.12.21" />
<PackageReference Include="Equinox.EventStore" Version="4.0.0-rc.13" />
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.10.1" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStoreDb/Propulsion.EventStoreDb.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.EventStoreDb" Version="4.0.0-rc.12.21" />
<PackageReference Include="Equinox.EventStoreDb" Version="4.0.0-rc.13" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.MemoryStore/Propulsion.MemoryStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.10.1" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.12.21" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-rc.13" />
</ItemGroup>

<ItemGroup>
Expand Down
46 changes: 5 additions & 41 deletions tools/Propulsion.Tool/Args.fs
Original file line number Diff line number Diff line change
Expand Up @@ -60,42 +60,6 @@ type Configuration(tryGet : string -> string option) =

module Cosmos =

module CosmosStoreConnector =

let logContainer (role: string) (databaseId: string) (containerId: string) =
Log.Information("CosmosDb {role} Database {database} Container {container}", role, databaseId, containerId)

let createMonitoredAndLeases (client: Microsoft.Azure.Cosmos.CosmosClient) databaseId containerId auxContainerId =
logContainer "Source" databaseId containerId
logContainer "Leases" databaseId auxContainerId
let db = client.GetDatabase(databaseId)
db.GetContainer(containerId), db.GetContainer(auxContainerId)

type Equinox.CosmosStore.CosmosStoreConnector with

member private x.LogConfiguration(role) =
let o = x.Options
let timeout, retries429, timeout429 = o.RequestTimeout, o.MaxRetryAttemptsOnRateLimitedRequests, o.MaxRetryWaitTimeOnRateLimitedRequests
Log.Information("CosmosDb {role} {mode} {endpointUri} timeout {timeout}s; Throttling retries {retries}, max wait {maxRetryWaitTime}s",
role, o.ConnectionMode, x.Endpoint, timeout.TotalSeconds, retries429, let t = timeout429.Value in t.TotalSeconds)

// NOTE uses CreateUninitialized as the Database/Container may not actually exist yet
member x.CreateLeasesContainer(databaseId, auxContainerId) =
x.LogConfiguration("Feed")
CosmosStoreConnector.logContainer "Leases" databaseId auxContainerId
x.CreateUninitialized().GetDatabase(databaseId).GetContainer(auxContainerId)

member x.ConnectFeed(databaseId, containerId, auxContainerId) = async {
x.LogConfiguration("Feed")
let! cosmosClient = x.Connect(databaseId, [| containerId; auxContainerId |])
return CosmosStoreConnector.createMonitoredAndLeases cosmosClient databaseId containerId auxContainerId }

/// Connect a CosmosStoreClient Configure with default packing and querying policies, including warming up
member x.Connect(role, databaseId, containerId: string) =
x.LogConfiguration(role, databaseId, containerId)
let maxEvents = 256
Equinox.CosmosStore.CosmosStoreContext.Connect(x, databaseId, containerId, tipMaxEvents = maxEvents)

open Configuration.Cosmos
type [<NoEquality; NoComparison>] Parameters =
| [<AltCommandLine "-m">] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode
Expand Down Expand Up @@ -140,7 +104,7 @@ module Cosmos =
member _.MaybeLogLagInterval = p.TryGetResult LagFreqM |> Option.map TimeSpan.FromMinutes

member x.CreateCheckpointStore(group, cache, storeLog) = async {
let! context = connector.Connect("Checkpoints", databaseId, x.ContainerId)
let! context = connector.ConnectContext("Checkpoints", databaseId, x.ContainerId, 256)
return Propulsion.Feed.ReaderCheckpoint.CosmosStore.create storeLog (group, checkpointInterval) (context, cache) }

module Dynamo =
Expand All @@ -159,10 +123,10 @@ module Dynamo =

type Equinox.DynamoStore.DynamoStoreClient with

member x.CreateContext(role, table, ?queryMaxItems, ?maxBytes) =
let c = Equinox.DynamoStore.DynamoStoreContext(x, table, ?queryMaxItems = queryMaxItems, ?maxBytes = maxBytes)
Log.Information("DynamoStore {role:l} Table {table} QueryMaxItems {queryMaxItems} MaxBytes {maxBytes}",
role, table, c.QueryOptions.MaxItems, c.TipOptions.MaxBytes)
member x.CreateContext(role, table, ?queryMaxItems, ?maxBytes, ?archiveTableName: string) =
let c = Equinox.DynamoStore.DynamoStoreContext(x, table, ?queryMaxItems = queryMaxItems, ?maxBytes = maxBytes, ?archiveTableName = archiveTableName)
Log.Information("DynamoStore {role:l} Table {table} Archive {archive} Tip thresholds: {maxTipBytes}b {maxTipEvents}e Query paging {queryMaxItems} items",
role, table, Option.toObj archiveTableName, c.TipOptions.MaxBytes, Option.toNullable c.TipOptions.MaxEvents, c.QueryOptions.MaxItems)
c

type [<NoEquality; NoComparison>] Parameters =
Expand Down
51 changes: 51 additions & 0 deletions tools/Propulsion.Tool/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,54 @@ type Logging() =
[<System.Runtime.CompilerServices.Extension>]
static member Sinks(configuration : LoggerConfiguration, configureMetricsSinks, verboseStore, verboseConsole) =
configuration.Sinks(configureMetricsSinks, Sinks.console verboseConsole, ?isMetric = if verboseStore then None else Some Metrics.logEventIsMetric)

module CosmosStoreConnector =

let private get (role: string) (client: Microsoft.Azure.Cosmos.CosmosClient) databaseId containerId =
Log.Information("CosmosDB {role} Database {database} Container {container}", role, databaseId, containerId)
client.GetDatabase(databaseId).GetContainer(containerId)
let getSource = get "Source"
let getLeases = get "Leases"
let createMonitoredAndLeases client databaseId containerId auxContainerId =
getSource client databaseId containerId, getLeases client databaseId auxContainerId

type Equinox.CosmosStore.CosmosStoreContext with

member x.LogConfiguration(role, databaseId: string, containerId: string) =
Log.Information("CosmosStore {role:l} {db}/{container} Tip maxEvents {maxEvents} maxSize {maxJsonLen} Query maxItems {queryMaxItems}",
role, databaseId, containerId, x.TipOptions.MaxEvents, x.TipOptions.MaxJsonLength, x.QueryOptions.MaxItems)

type Equinox.CosmosStore.CosmosStoreClient with

member x.CreateContext(role: string, databaseId, containerId, tipMaxEvents, ?queryMaxItems, ?tipMaxJsonLength, ?skipLog) =
let c = Equinox.CosmosStore.CosmosStoreContext(x, databaseId, containerId, tipMaxEvents, ?queryMaxItems = queryMaxItems, ?tipMaxJsonLength = tipMaxJsonLength)
if skipLog = Some true then () else c.LogConfiguration(role, databaseId, containerId)
c

type Equinox.CosmosStore.CosmosStoreConnector with

member private x.LogConfiguration(role, databaseId: string, containers: string[]) =
let o = x.Options
let timeout, retries429, timeout429 = o.RequestTimeout, o.MaxRetryAttemptsOnRateLimitedRequests, o.MaxRetryWaitTimeOnRateLimitedRequests
Log.Information("CosmosDB {role} {mode} {endpointUri} {db} {containers} timeout {timeout}s Throttling retries {retries}, max wait {maxRetryWaitTime}s",
role, o.ConnectionMode, x.Endpoint, databaseId, containers, timeout.TotalSeconds, retries429, let t = timeout429.Value in t.TotalSeconds)
member private x.CreateAndInitialize(role, databaseId, containers) =
x.LogConfiguration(role, databaseId, containers)
x.CreateAndInitialize(databaseId, containers)
member private x.Connect(role, databaseId, containers) =
x.LogConfiguration(role, databaseId, containers)
x.Connect(databaseId, containers)

// NOTE uses CreateUninitialized as the Database/Container may not actually exist yet
member x.CreateLeasesContainer(databaseId, auxContainerId) =
x.LogConfiguration("Feed", databaseId, [| auxContainerId |])
let client = x.CreateUninitialized()
CosmosStoreConnector.getLeases client databaseId auxContainerId

member x.ConnectFeed(databaseId, containerId, auxContainerId) = async {
let! cosmosClient = x.CreateAndInitialize("Feed", databaseId, [| containerId; auxContainerId |])
return CosmosStoreConnector.createMonitoredAndLeases cosmosClient databaseId containerId auxContainerId }

member x.ConnectContext(role, databaseId, containerId: string, maxEvents) = async {
let! client = x.Connect(role, databaseId, [| containerId |])
return client.CreateContext(role, databaseId, containerId, tipMaxEvents = maxEvents) }
2 changes: 0 additions & 2 deletions tools/Propulsion.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,6 @@ module Project =
| Choice2Of3 sa -> Choice2Of3 sa, Equinox.DynamoStore.Core.Log.InternalMetrics.dump
| Choice3Of3 sa -> Choice3Of3 sa, (fun _ -> ())
let group, startFromTail, maxItems = p.GetResult ConsumerGroupName, p.Contains FromTail, p.TryGetResult MaxItems
match maxItems with None -> () | Some bs -> Log.Information("ChangeFeed Max items Count {changeFeedMaxItems}", bs)
if startFromTail then Log.Warning("ChangeFeed (If new projector group) Skipping projection of all existing events.")
let producer =
match p.GetSubCommand() with
| Kafka a ->
Expand Down

0 comments on commit d873a31

Please sign in to comment.