You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
In order to function as a Kafka read gateway, Dekaf needs to handle several Kafka APIs related to consumer group management:
JoinGroup
SyncGroup
LeaveGroup
Heartbeat
OffsetFetch
OffsetCommit
These APIs hide some substantial complexity, so in order to ship Dekaf in a reasonable timeframe we initially decided to forward group management requests to a real Kafka cluster rather than trying to implement them ourselves. This has worked out fairly well for us so far, insofar as that we have a bunch of users of Dekaf successfully using consumer groups.
Now that things are stable and Dekaf usage is growing, we're seeing this model of forwarding to an existing Kafka cluster start to break down in various ways. So in this proposal, we'll see what it would take to implement our own group management protocol handling from the ground up.
Architecture Overview
Today, Dekaf instances are intentionally stateless, interchangeable servers speaking the Kafka wire protocol. They talk to Gazette to serve reads and some metadata like high-watermarks, agent-api for most other metadata and authentication, and the control plane database for some other data like AVRO schemas.
Kafka's implementation of group management, on the other hand, uses a "coordinator" model where each group is assigned to a specific broker for the duration of its lifetime, and all relevant requests must be sent to that coordinator.
The proposed design keeps Dekaf itself stateless by introducing two more external sources of state. The role of the "group coordinator" will be purely logical, fulfilled by any Dekaf instance that receives a client request. There is no single, sticky Dekaf instance responsible for a given group.
Etcd: Serves as the shared source of truth for all group-related state (membership, generation, leader, assignments, etc). It is also used for inter-instance signaling for long-polling operations via its watches. All state modifications in etcd will use optimistic locking.
BigTable: Used for storing committed offsets, providing fast read/write access and strong consistency.
Data Models
Group State (etcd)
Group state will be stored in etcd as a JSON object.
o:commit_ts (int64): Unix timestamp of commit. Maybe use bigtable row timestamp for this?
Need to understand how these fields from OffsetCommitRequest fit in:
generation_id
member_epoch
metadata
Group State Machine & Transitions
A group is a state machine, and the group management APIs represent its transitions.
The group state is stored in etcd. Transitions are effected by Dekaf instances performing optimistically-locked operations on this key. Dekaf instances also watch this key for changes relevant to their long-polling requests.
Group States
Empty
No members are part of this group. It is either brand new, or all members have left Data:
Epoch: This is the high-level generation counter for the group, used only to distinguish between different "instances" of this group ID. Only incremented when transitioning from Empty->PreparingRebalance.
PreparingRebalance
The group has at least one member, or an existing member has indicated a need to rebalance (e.g., a member left, a new member joined, or metadata changed). The group is waiting for all known members to (re-)join the group via JoinGroup requests. Data:
Epoch
GenerationId: A unique ID for the current generation of members. This is incremented each time the group rebalances.
LeaderId: Either the ID of the member designated as the leader in the previous generation, or unset. Will be determined in this state.
PendingMembers: A map of MemberId to member metadata. These members have sent a JoinGroup request in the current rebalance cycle but haven't been acknowledged with a JoinGroup response yet (and they won't until we transition to AwaitingSync)
MemberId: Unique ID for each member (can be client-generated or server-generated).
ClientId: Client-specified ID.
ClientHost: Hostname of the client.
SessionTimeoutMs: How long a member can be silent before being considered dead.
RebalanceTimeoutMs: How long the group waits for members to rejoin during a rebalance.
SupportedProtocols: A list of (protocol_name, protocol_metadata) pairs supported by this member (e.g., for partition assignment strategies like "range", "roundrobin", "sticky").
GroupInstanceId: Optional client-provided persistent ID for static membership. Stored if provided by the client.
RebalanceDeadline: Timestamp by which all members must send their JoinGroup requests.
AwaitingSync
All expected members have JoinGroup'd (or the RebalanceDeadline has passed). A leader has been elected from the members. The group is now waiting for the leader to send a SyncGroup request containing the assignment of partitions/resources to each member. It is also waiting for all expected PendingMembers to call SyncGroup. Once every member has called in, including the leader, assignments are distributed to all members. Data:
Epoch
GenerationId
ProtocolName: The specific protocol chosen from the leader's SupportedProtocols. All members must support this protocol. If a common protocol cannot be found, fail.
LeaderId: MemberId of the chosen group leader
Members: Same as in PreparingRebalance, but now all members are considered joined for this generation
Assignments: Either the assignments returned by the leader, or empty. We store this so that we can still wait for other members to call SyncGroup after getting the assignments from the leader, so long as we're still before the deadline.
SyncDeadline: Timestamp by which the leader (or all members in client-coordinated groups) must send SyncGroup requests.
Stable
The leader has provided assignments, and these have been sent to all members via SyncGroup responses. The group is now actively consuming. The coordinator primarily expects Heartbeat requests from members. Data:
Epoch
GenerationId
ProtocolType
ProtocolName
LeaderId
Members: Same as in AwaitingSync. Each member entry now also includes:
Assignment: The opaque byte array representing the resources assigned to this member (provided by the leader via SyncGroup).
LastHeartbeat: Timestamp of the last successful heartbeat received from this member.
Direct State Watching & Implications:
Sessions with parked long-polling requests (JoinGroup, SyncGroup) will place an etcd watch directly on the group's /state key. When the watch fires, we will evaluate if the new state and its contents satisfy the conditions for unparking its specific request.
API Implementation Details
JoinGroup
Each member sends a JoinGroup request containing member-specific metadata. The join group request will park at the coordinator until all expected members have sent their own join group requests ("expected" in this case means all members that were part of the previous generation). Once they have done so, the coordinator randomly selects a leader from the group and sends JoinGroup responses to all the pending requests.
The JoinGroup request contains an array with the group protocols that it supports along with member-specific metadata. This is basically used to ensure compatibility of group member metadata within the group. The coordinator chooses a protocol which is supported by all members of the group and returns it in the respective JoinGroup responses. If a member joins and doesn't support any of the protocols used by the rest of the group, then it will be rejected.
"Coordinator" Actions (end of PreparingRebalance / rebalance_deadline):
If conditions met (still PreparingRebalance, members available): Elect leader, select common protocol_name.
Transition to AwaitingSync, moving pending_members to members, set sync_deadline.
Watchers for parked JoinGroup re-evaluate. Successful joiners get NO_ERROR, generation_id, leader_id, protocol_name, and the randomly selected leader gets a list of all current members (including their group_instance_id if provided).
Static Membership Handling:
If group_instance_id is provided in the request:
Store the group_instance_id with the member's details in etcd.
Fencing: Check for an existing member with the same group_instance_id but a different active member_id.
If such a collision is detected, the existing (old) member associated with that group_instance_id is considered fenced.
The coordinator must ensure that subsequent requests from the fenced (old) member.id (for that group_instance_id) will receive a FENCED_INSTANCE_ID error.
The new joining member (with its new session/member_id) is then allowed to proceed.
SyncGroup
Once the group members have been stabilized by the completion of phase 1, the active leader must propagate state to the other members in the group. This is used in the new consumer protocol to set partition assignments. Similar to phase 1, all members send SyncGroup requests to the coordinator. Once group state has been provided by the leader, the coordinator forwards each member's state respectively in the SyncGroup response.
Reads current group state from /dekaf/groups/{TaskName}/{GroupId}/state
If state is not AwaitingSync or generation_id/member_id is invalid: Respond with ILLEGAL_GENERATION, UNKNOWN_MEMBER_ID, or REBALANCE_IN_PROGRESS.
If request is from Leader:
Store request.group_assignment into the assignments field in the etcd group state object.
Write updated state to etcd. This change will be picked up by watchers.
Mark this member as having sent SyncGroup
Park request.
"Coordinator" Actions:
When sync_deadline is hit OR all members in current generation_id have sent SyncGroup:
If leader failed to provide assignments: Rebalance failed. Transition to PreparingRebalance, increment generation_id.
Otherwise transition to Stable. Parked sessions get notified and distribute the assignment to their respective members.
Heartbeat
Reads current group state.
If state is Stable and generation_id/member_id match:
Update members[member_id].last_heartbeat to current time.
Write updated state
Respond NO_ERROR.
If state is PreparingRebalance or AwaitingSync: Respond REBALANCE_IN_PROGRESS.
If generation_id is stale: Respond ILLEGAL_GENERATION.
If member_id is unknown: Respond UNKNOWN_MEMBER_ID.
Static Memberships: If the requesting member_id has been fenced due to a newer instance claiming that group_instance_id: Respond FENCED_INSTANCE_ID.
Session Timeout Check: Periodically (e.g., by a background task in one or more Dekaf instances, or triggered on any group interaction), scan groups in etcd:
If a member in a Stable group: (now - member.last_heartbeat) > member.session_timeout_ms:
This member is dead. Lock group state.
Remove member from members.
Transition to PreparingRebalance, increment generation_id. Clear assignments.
Write updated state to etcd.
LeaveGroup
Reads current group state.
If member exists in members or pending_members with matching generation_id:
Remove member.
If group becomes empty: Transition to Empty (or delete key after a grace period).
Else (if group was Stable or AwaitingSync): Transition to PreparingRebalance, increment generation_id. Clear assignments.
Write updated state to etcd.
Respond NO_ERROR.
Else: Respond with UNKNOWN_MEMBER_ID or ILLEGAL_GENERATION.
OffsetCommit
Reads group state from etcd. Verify member_id and generation_id if group is not Empty. (Kafka allows commits from unknown members or old generations if the group is Empty - this is the "simple consumer" case).
If group is Stable and member/generation is valid OR if allowing commits to Empty groups:
For each topic/partition: Construct BigTable row key: {TaskName}#{GroupId}#{TopicName}#{PartitionId}.
Write offset, commit_ts to BigTable.
Respond NO_ERROR for successfully committed partitions.
Else: Respond ILLEGAL_GENERATION, UNKNOWN_MEMBER_ID, or REBALANCE_IN_PROGRESS.
OffsetFetch
For each topic/partition: Construct BigTable row key.
Read offset data from BigTable.
Respond with fetched offsets or NO_ERROR with a special offset (-1) if not found.
Failure Scenarios
Dekaf Instance Failure: Stateless. If an instance fails, client connections are dropped. Clients will reconnect to other instances and retry as they are able. Parked requests on the failed instance are lost; etcd state remains valid.
This probably means that Dekaf restarts will cause group rebalances. That's fine, but worth noting.
Client (Member) Failure: Detected by missed heartbeats. We will need some logic to periodically check for these and update etcd state, triggering a rebalance.
Etcd/BigTable Unavailability: We should have retry logic for transient issues. Prolonged unavailability will lead to service disruption.
Opaque Data
protocol_metadata (JoinGroup) and Assignment data (SyncGroup) are treated as opaque byte arrays. Dekaf does not need to interpret the contents of these fields.
For SyncGroup, the leader provides a map of MemberId to AssignmentData. When responding to follower SyncGroup requests, the Session retrieves this map and sends the specific opaque AssignmentData to the corresponding member.
Admin APIs
ListGroups: Could be implemented by scanning keys in etcd under /dekaf/groups/{TaskName}/. Nice-to-have.
DeleteGroups: Could be implemented by deleting the group's /state key in etcd and all associated offset data in BigTable. Also nice-to-have, nobody so far has cared that we're missing it. Make sure to only delete Empty groups. Empty groups should probably also time out after a while...
Testing
In order to have any hope of getting this right, we need robust end to end testing of Dekaf. Currently we have good snapshot tests for AVRO document generation, field selection, schema etc. But we don't have much of anything in the way of e2e coverage, nor coverage of wire protocol or group management. So here's what I think we'll need before even trying to build group management from scratch.
Raw bytes in order to test various deserialization failure modes
Various different volumes of data to test e.g max wait ms timeout
Easily validate that data read matches mock data written
Group state
Committed Offsets
Mocking Gazette
We'll need to come up with a generalized interface over reading data from journals. Either adapting Read to be generic over some kind of ReadJsonLines trait, or adding mockability to ReadJsonLines itself
Mocking control-plane
We'll need to generalize Dekaf over the control-plane in order to implement a mock for testing
Mocking State
We'll need to generalize the interfaces to group state (etcd) and committed offsets (bigtable).
Example happy path test:
fntest_single_consumer_happy_path(){let instance = dekaf::mock_server(vec![MockTask{
name:"my-test",
bindings: vec![MockBinding{topic_name, schema, ...}]}]);let handle = instance.serve();// Writes documents to the mocked "journals"let writer = dekaf::mock_writer(instance);let docs_written = writer.write("my_topic_name",100).await?;// uses real librdkafka, since that's what we care about testing// could also have lower level test infra using e.g `kafka-protocol`let reader = dekaf::mock_consumer(instance);
reader.join_group("test_group").await?;assert_snapshot!(instance.get_group_state("test_group"))let docs_read = reader.consume("my_topic_name").await?;// Assert that we can read all of the docs we wroteassert_eq(docs_read, docs_written);
reader.commit().await?;let offsets = reader.offsets;
reader.disconnect().await?;// Now re-join the same grouplet reader_2 = dekaf::mock_consumer(instance);
reader_2.join_group("test_group").await?;assert_snapshot!(instance.get_group_state("test_group"))let offsets_2 = reader_2.offsets;// Assert that we see the offset that the previous member committedassert_eq(offsets_2, offsets);// Assert no more docs were readassert_empty(reader_2.consume("my_topic_name").await?);}
Open Questions & Future Considerations
Load ballparking: Conservatively we could see 1 write per second per actively consumed partition. We could easily have thousands of partitions being read. Can bigtable handle that? I assume the answer is "yes, easily". Etcd should have much less load, given that it'll only be heavily used/watched during rebalances.
Do we need to break out group member state into sub-keys in etcd for optimization? For example, updates to group state due to updating the "last heartbeat" could come in at something like one every 3 seconds, per group member (partition). That could be substantial. So far I've seen upwards of 40 group members, so this would look like ~14 writes/sec to a single key. So... yeah, this is going to get contested quickly, we probably do need to separate these.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
History
In order to function as a Kafka read gateway, Dekaf needs to handle several Kafka APIs related to consumer group management:
JoinGroupSyncGroupLeaveGroupHeartbeatOffsetFetchOffsetCommitThese APIs hide some substantial complexity, so in order to ship Dekaf in a reasonable timeframe we initially decided to forward group management requests to a real Kafka cluster rather than trying to implement them ourselves. This has worked out fairly well for us so far, insofar as that we have a bunch of users of Dekaf successfully using consumer groups.
Now that things are stable and Dekaf usage is growing, we're seeing this model of forwarding to an existing Kafka cluster start to break down in various ways. So in this proposal, we'll see what it would take to implement our own group management protocol handling from the ground up.
Architecture Overview
Today, Dekaf instances are intentionally stateless, interchangeable servers speaking the Kafka wire protocol. They talk to Gazette to serve reads and some metadata like high-watermarks, agent-api for most other metadata and authentication, and the control plane database for some other data like AVRO schemas.
Kafka's implementation of group management, on the other hand, uses a "coordinator" model where each group is assigned to a specific broker for the duration of its lifetime, and all relevant requests must be sent to that coordinator.
The proposed design keeps Dekaf itself stateless by introducing two more external sources of state. The role of the "group coordinator" will be purely logical, fulfilled by any Dekaf instance that receives a client request. There is no single, sticky Dekaf instance responsible for a given group.
Data Models
Group State (etcd)
Group state will be stored in etcd as a JSON object.
/dekaf/groups/{TaskName}/{GroupId}/stateTaskName: The Dekaf materialization this group belongs to.GroupId: The consumer-provided group identifier.mod_revision.Committed Offsets (BigTable)
{TaskName}#{GroupId}#{TopicName}#{PartitionId}myMaterialization#myConsumerGroup#myTopic#0o(offsets):o:offset(int64): The committed offset.o:commit_ts(int64): Unix timestamp of commit. Maybe use bigtable row timestamp for this?OffsetCommitRequestfit in:generation_idmember_epochmetadataGroup State Machine & Transitions
A group is a state machine, and the group management APIs represent its transitions.
The group state is stored in etcd. Transitions are effected by Dekaf instances performing optimistically-locked operations on this key. Dekaf instances also watch this key for changes relevant to their long-polling requests.
Group States
EmptyNo members are part of this group. It is either brand new, or all members have left
Data:
Epoch: This is the high-level generation counter for the group, used only to distinguish between different "instances" of this group ID. Only incremented when transitioning fromEmpty->PreparingRebalance.PreparingRebalanceThe group has at least one member, or an existing member has indicated a need to rebalance (e.g., a member left, a new member joined, or metadata changed). The group is waiting for all known members to (re-)join the group via
JoinGrouprequests.Data:
EpochGenerationId: A unique ID for the current generation of members. This is incremented each time the group rebalances.LeaderId: Either the ID of the member designated as the leader in the previous generation, or unset. Will be determined in this state.PendingMembers: A map ofMemberIdto member metadata. These members have sent aJoinGrouprequest in the current rebalance cycle but haven't been acknowledged with aJoinGroupresponse yet (and they won't until we transition toAwaitingSync)MemberId: Unique ID for each member (can be client-generated or server-generated).ClientId: Client-specified ID.ClientHost: Hostname of the client.SessionTimeoutMs: How long a member can be silent before being considered dead.RebalanceTimeoutMs: How long the group waits for members to rejoin during a rebalance.SupportedProtocols: A list of (protocol_name, protocol_metadata) pairs supported by this member (e.g., for partition assignment strategies like "range", "roundrobin", "sticky").GroupInstanceId: Optional client-provided persistent ID for static membership. Stored if provided by the client.RebalanceDeadline: Timestamp by which all members must send theirJoinGrouprequests.AwaitingSyncAll expected members have
JoinGroup'd (or theRebalanceDeadlinehas passed). A leader has been elected from the members. The group is now waiting for the leader to send aSyncGrouprequest containing the assignment of partitions/resources to each member. It is also waiting for all expectedPendingMembersto callSyncGroup. Once every member has called in, including the leader, assignments are distributed to all members.Data:
EpochGenerationIdProtocolName: The specific protocol chosen from the leader'sSupportedProtocols. All members must support this protocol. If a common protocol cannot be found, fail.LeaderId:MemberIdof the chosen group leaderMembers: Same as inPreparingRebalance, but now all members are considered joined for this generationAssignments: Either the assignments returned by the leader, or empty. We store this so that we can still wait for other members to callSyncGroupafter getting the assignments from the leader, so long as we're still before the deadline.SyncDeadline: Timestamp by which the leader (or all members in client-coordinated groups) must sendSyncGrouprequests.StableThe leader has provided assignments, and these have been sent to all members via
SyncGroupresponses. The group is now actively consuming. The coordinator primarily expectsHeartbeatrequests from members.Data:
EpochGenerationIdProtocolTypeProtocolNameLeaderIdMembers: Same as inAwaitingSync. Each member entry now also includes:Assignment: The opaque byte array representing the resources assigned to this member (provided by the leader viaSyncGroup).LastHeartbeat: Timestamp of the last successful heartbeat received from this member.Direct State Watching & Implications:
Sessions with parked long-polling requests (
JoinGroup,SyncGroup) will place an etcd watch directly on the group's/statekey. When the watch fires, we will evaluate if the new state and its contents satisfy the conditions for unparking its specific request.API Implementation Details
JoinGroupGroup Assignment Protocol
/dekaf/groups/{TaskName}/{GroupId}/state.Empty:PreparingRebalance. Incrementepoch, initializegeneration_id.pending_members(generatemember_idif empty).PreparingRebalance:request.generation_idis 0 (new member) or matches currentgeneration_id: Add/update member inpending_members.ILLEGAL_GENERATION.AwaitingSyncorStable:PreparingRebalance.generation_id. Clearleader_id,protocol_name,assignments.pending_members.rebalance_deadline.PreparingRebalance/rebalance_deadline):PreparingRebalance, members available): Elect leader, select commonprotocol_name.AwaitingSync, movingpending_memberstomembers, setsync_deadline.JoinGroupre-evaluate. Successful joiners getNO_ERROR,generation_id,leader_id,protocol_name, and the randomly selected leader gets a list of all current members (including theirgroup_instance_idif provided).group_instance_idis provided in the request:group_instance_idwith the member's details in etcd.group_instance_idbut a different activemember_id.group_instance_idis considered fenced.member.id(for thatgroup_instance_id) will receive aFENCED_INSTANCE_IDerror.member_id) is then allowed to proceed.SyncGroupGroup Assignment Protocol
/dekaf/groups/{TaskName}/{GroupId}/stateAwaitingSyncorgeneration_id/member_idis invalid: Respond withILLEGAL_GENERATION,UNKNOWN_MEMBER_ID, orREBALANCE_IN_PROGRESS.request.group_assignmentinto theassignmentsfield in the etcd group state object.SyncGroupWhen
sync_deadlineis hit OR all members in currentgeneration_idhave sentSyncGroup:PreparingRebalance, incrementgeneration_id.Stable. Parked sessions get notified and distribute the assignment to their respective members.HeartbeatStableandgeneration_id/member_idmatch:members[member_id].last_heartbeatto current time.NO_ERROR.PreparingRebalanceorAwaitingSync: RespondREBALANCE_IN_PROGRESS.generation_idis stale: RespondILLEGAL_GENERATION.member_idis unknown: RespondUNKNOWN_MEMBER_ID.member_idhas been fenced due to a newer instance claiming thatgroup_instance_id: RespondFENCED_INSTANCE_ID.Stablegroup:(now - member.last_heartbeat) > member.session_timeout_ms:members.PreparingRebalance, incrementgeneration_id. Clear assignments.LeaveGroupmembersorpending_memberswith matchinggeneration_id:Empty(or delete key after a grace period).StableorAwaitingSync): Transition toPreparingRebalance, incrementgeneration_id. Clear assignments.NO_ERROR.UNKNOWN_MEMBER_IDorILLEGAL_GENERATION.OffsetCommitmember_idandgeneration_idif group is notEmpty. (Kafka allows commits from unknown members or old generations if the group isEmpty- this is the "simple consumer" case).Stableand member/generation is valid OR if allowing commits toEmptygroups:{TaskName}#{GroupId}#{TopicName}#{PartitionId}.NO_ERRORfor successfully committed partitions.ILLEGAL_GENERATION,UNKNOWN_MEMBER_ID, orREBALANCE_IN_PROGRESS.OffsetFetchNO_ERRORwith a special offset (-1) if not found.Failure Scenarios
Dekaf Instance Failure: Stateless. If an instance fails, client connections are dropped. Clients will reconnect to other instances and retry as they are able. Parked requests on the failed instance are lost; etcd state remains valid.
This probably means that Dekaf restarts will cause group rebalances. That's fine, but worth noting.
Client (Member) Failure: Detected by missed heartbeats. We will need some logic to periodically check for these and update etcd state, triggering a rebalance.
Etcd/BigTable Unavailability: We should have retry logic for transient issues. Prolonged unavailability will lead to service disruption.
Opaque Data
protocol_metadata(JoinGroup) andAssignmentdata (SyncGroup) are treated as opaque byte arrays. Dekaf does not need to interpret the contents of these fields.SyncGroup, the leader provides a map ofMemberIdtoAssignmentData. When responding to followerSyncGrouprequests, the Session retrieves this map and sends the specific opaqueAssignmentDatato the corresponding member.Admin APIs
ListGroups: Could be implemented by scanning keys in etcd under/dekaf/groups/{TaskName}/. Nice-to-have.DeleteGroups: Could be implemented by deleting the group's/statekey in etcd and all associated offset data in BigTable. Also nice-to-have, nobody so far has cared that we're missing it. Make sure to only deleteEmptygroups.Emptygroups should probably also time out after a while...Testing
In order to have any hope of getting this right, we need robust end to end testing of Dekaf. Currently we have good snapshot tests for AVRO document generation, field selection, schema etc. But we don't have much of anything in the way of e2e coverage, nor coverage of wire protocol or group management. So here's what I think we'll need before even trying to build group management from scratch.
Kafka Tests inspiration
Things that need to be easily mockable:
Mocking Gazette
We'll need to come up with a generalized interface over reading data from journals. Either adapting
Readto be generic over some kind ofReadJsonLinestrait, or adding mockability toReadJsonLinesitselfMocking control-plane
We'll need to generalize Dekaf over the control-plane in order to implement a mock for testing
Mocking State
We'll need to generalize the interfaces to group state (etcd) and committed offsets (bigtable).
Example happy path test:
Open Questions & Future Considerations
Beta Was this translation helpful? Give feedback.
All reactions