Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible improvements to the codebase #457

Open
Z1kkurat opened this issue Mar 20, 2023 · 0 comments
Open

Possible improvements to the codebase #457

Z1kkurat opened this issue Mar 20, 2023 · 0 comments

Comments

@Z1kkurat
Copy link
Contributor

Here I'm outlining my thoughts on what we can change to improve readability, make introducing new features less complex,
and ease the burden of maintenance of the library.

Different approach to restoring state from snapshots

Current approach to restoring the state from snapshots is dictated by how we restore it from Cassandra:
first, we read all the keys, then we read the snapshots for each individual key.
The logic is implemented
in KeyStateOf
and used in PartitionFlow. This is very inconvenient, in particular, when using Kafka as snapshot storage as
we're supposed to read the state topic only once. What we could do is try to rework KeyStateOf, adding a method
to read all snapshots. This could delegate to a new abstraction which could be implemented separately for all
underlying storages: it can read keys and snapshots from Cassandra or read everything in a single pass from Kafka.
Example of what I'm talking about (we could design it any other way):

trait KeyStateOf[F[_]] {
  // Read state from a snapshot for a single key
  def apply(
             topicPartition: TopicPartition,
             key: String,
             createdAt: Timestamp,
             context: KeyContext[F]
           ): Resource[F, KeyState[F, ConsRecord]]

  // Read all snapshots
  def all(topicPartition: TopicPartition): Stream[F, KeyStateOf[F, ConsRecord]]
}

This will most likely be a huge change as it'll require moving a lot of things.

State consolidation

Currently, we keep multiple fragments of state in different data classes, accessing them via
different APIs in order to read or modify (essentially, getters and setters). This creates more layers of indirection,
making it harder to predict what impact any change may potentially have. The obvious ones I found:

  • state of aggregate itself: created/modified in KeyFlow.of, modified in FoldToState and TickToState
  • timestamps (Timestamps) and offsets designating when the aggregate was last persisted, processed and
    'touched' ('current' timestamp): read in at least 4 different places (usages of methods of ReadTimestamps),
    modified in at least 2 (usages of methods of WriteTimestamps)
  • timestamps (Timers) of scheduled actions for an aggregate (essentially, when to run TimerFlow for an
    aggregate): read and modified in two different places in code
  • an offset to commit for an aggregate (KeyContext): read in PartitionFlow before committing, modified
    in TimerFlowOf when state is flushed.

The last one has the worst impact on discoverability, making the logic really 'implicit'. For example, PartitionFlow
updates the value of the offset of the last record in batch in Timestamps before triggering timers, so
that TimerFlowOf#persistPeriodically can read it from Timestamps and update the value in KeyContext via its hold
method after persisting the snapshot, causing PartitionFlow to calculate a correct offset to commit among all
aggregates via using KeyContext#holding. Changing this logic in any way would require knowing about this
chain of calls and the proper way of passing data around via these mutable shared state references. The more
functionality we have, the more complex it becomes, potentially resulting in breaking something irrelevant to the code
one wants to change.

What I would like to see here is a single immutable data model, describing an internal representation of a state of an
aggregate. Instances of such a model should be stored in a single unit of code with other units of code returning a new
updated data class. Essentially, KeyFlow#apply, TimerFlow#onTimer etc. returning a new version of an aggregate's
state.

Tick and TimerFlow execution

Currently, Tick and TimerFlow are executed only together with a configured interval. This could be inconvenient
when one wants to run Tick at a different interval than TimerFlow (when implementing state clean-up, for example).
We could untangle those two so that they can run at separate intervals.

Tick/Timer parallelism

Timer and Tick are started in a separate fiber for each individual key without limiting parallelism in any way.
This can negatively impact performance of an application when persistPeriodically is used together with a large
number of aggregates, resulting in (potentially) hundreds of thousands of fibers started simultaneously. This puts
pressure on the IO runtime and can even cause it to break. We could limit the degree of parallelism.

Usage of Cache in PartitionFlow

Instances of KeyState are stored in a Cache (from scache) inside PartitionFlow due to a Resource nature of
creating KeyState and Cache supporting it natively. In fact, using it as a Resource is required only for two
purposes that I've found:

  • automatic unregistering of an aggregate from EntityRegistry (in KeyFlow)
  • persisting the snapshot via TimerFlowOf#flushOnRevoke when partitions are revoked and aggregates are deallocated

Using scache might be an overkill in our case as we have no need in TTLs, loading values on the fly etc.
Instead, sometimes we have to deal with its APIs which have its own peculiarities.
We could investigate if our use-cases can be implemented in any other way without dropping the functionality.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant