This demo showcases Confluent Platform's Multi-Region Replication capability built directly into Confluent Server starting with release 5.4.
For more information:
- Blog post: Built-In Multi-Region Replication with Confluent Platform 5.4-preview
- Confluent Platform documentation
NOTE: There is a different demo for a multi-datacenter design with two instances of Confluent Replicator copying data bidirectionally between the datacenters.
This demo has the following architecture.
There are three regions west
, central
, and east
.
The naming convention of the brokers are broker-[region]-[broker_id]
.
The full broker configurations are in the docker-compose.yml file. Here are some relevant configuration parameters that are used by Multi-Region Replication.
broker.rack
: identifies the location of the broker. For the demo, it represents a region, eithereast
orwest
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
: allows clients to read from followers (in contrast, clients are typically only allowed to read from leaders)
client.rack
: identifies the location of the client. For the demo, it represents a region, eithereast
orwest
--replica-placement <path-to-replica-placement-policy-json>
: at topic creation, this defines the replica placement policy for a given topic
Replicas are brokers assigned to a topic-partition, and they can be a Leader, Follower, or Observer. A Leader is the broker/replica accepting produce messages. A Follower is a broker/replica that can join an ISR list and participate in the calculation of the high watermark (used by the leader when acknowledging messages back to the producer).
An ISR list (in-sync replicas) includes brokers that have a given topic-partition. The data is copied from the leader to every member of the ISR before the producer gets an acknowledgement. The followers in an ISR can become the leader if the current leader fails.
An Observer is a broker/replica that also has a copy of data for a given topic-partition, and consumers are allowed to read from them even though it is not the leader (known as "Follower Fetching"). However, the data is copied asynchronously from the leader such that a producer does not wait on observers to get back an acknowledgement. By default, observers do not participate in the ISR list and cannot automatically become the leader if the current leader fails, but if a user manually changes leader assignment then they can participate in the ISR list.
Clone the confluentinc/examples GitHub repo and go to the folder with the Multi-Region Replication demo:
git clone https://github.com/confluentinc/examples
cd examples/multiregion
Run the following command.
docker-compose up -d
You should see the following Docker containers with docker-compose ps
:
Name Command State Ports
----------------------------------------------------------------------------------------------------------------
broker-east-3 /etc/confluent/docker/run Up 0.0.0.0:8093->8093/tcp, 9092/tcp, 0.0.0.0:9093->9093/tcp
broker-east-4 /etc/confluent/docker/run Up 0.0.0.0:8094->8094/tcp, 9092/tcp, 0.0.0.0:9094->9094/tcp
broker-west-1 /etc/confluent/docker/run Up 0.0.0.0:8091->8091/tcp, 0.0.0.0:9091->9091/tcp, 9092/tcp
broker-west-2 /etc/confluent/docker/run Up 0.0.0.0:8092->8092/tcp, 0.0.0.0:9092->9092/tcp
zookeeper-central /etc/confluent/docker/run Up 2181/tcp, 0.0.0.0:2182->2182/tcp, 2888/tcp, 3888/tcp
zookeeper-east /etc/confluent/docker/run Up 2181/tcp, 0.0.0.0:2183->2183/tcp, 2888/tcp, 3888/tcp
zookeeper-west /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
This demo injects latency between the regions and packet loss to simulate the WAN link. It uses Pumba.
Run the Dockerized Pumba scripts:
./scripts/latency_docker.sh
You should see the following Docker containers with docker container ls --filter "name=pumba"
:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
652fcf244c4d gaiaadm/pumba:0.6.4 "/pumba netem --dura…" 9 seconds ago Up 8 seconds pumba-loss-east-west
5590c230aef1 gaiaadm/pumba:0.6.4 "/pumba netem --dura…" 9 seconds ago Up 8 seconds pumba-loss-west-east
e60c3a0210e7 gaiaadm/pumba:0.6.4 "/pumba netem --dura…" 9 seconds ago Up 8 seconds pumba-high-latency-west-east
d3c1faf97ba5 gaiaadm/pumba:0.6.4 "/pumba netem --dura…" 9 seconds ago Up 8 seconds pumba-medium-latency-central
View IP addresses in the demo:
docker inspect -f '{{.Name}} - {{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' $(docker ps -aq)
Create three Kafka topics by running the following script.
./scripts/create-topics.sh
The script creates each topic with a different replica placement policy that specifies a set of matching constraints, i.e., count
and rack
for replicas
and observers
.
The replica placement policy file is defined with the argument --replica-placement <path-to-replica-placement-policy-json>
mentioned earlier (these files are in the config directory).
Each placement also has a minimum count
associated with it that allows users to guarantee a certain spread of replicas throughout the cluster.
Topic name | Leader | Followers (sync replicas) | Observers (async replicas) | ISR list |
---|---|---|---|---|
single-region | 1x west | 1x west | n/a | {1,2} |
multi-region-sync | 1x west | 1x west, 2x east | n/a | {1,2,3,4} |
multi-region-async | 1x west | 1x west | 2x east | {1,2} |
The playbook below highlights client performance differences between these topics depending on the relative location of clients and brokers.
Verify topic replica placement:
./scripts/describe-topics.sh
Sample output:
==> Describe topic single-region
Topic: single-region PartitionCount: 1 ReplicationFactor: 2 Configs: min.insync.replicas=1,confluent.placement.constraints={"version":1,"replicas":[{"count":2,"constraints":{"rack":"west"}}],"observers":[]}
Topic: single-region Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Offline:
==> Describe topic multi-region-sync
Topic: multi-region-sync PartitionCount: 1 ReplicationFactor: 4 Configs: min.insync.replicas=1,confluent.placement.constraints={"version":1,"replicas":[{"count":2,"constraints":{"rack":"west"}},{"count":2,"constraints":{"rack":"east"}}],"observers":[]}
Topic: multi-region-sync Partition: 0 Leader: 1 Replicas: 1,2,3,4 Isr: 1,2,3,4 Offline:
==> Describe topic multi-region-async
Topic: multi-region-async PartitionCount: 1 ReplicationFactor: 4 Configs: min.insync.replicas=1,confluent.placement.constraints={"version":1,"replicas":[{"count":2,"constraints":{"rack":"west"}}],"observers":[{"count":2,"constraints":{"rack":"east"}}]}
Topic: multi-region-async Partition: 0 Leader: 2 Replicas: 2,1,3,4 Isr: 2,1 Offline: Observers: 3,4
Observations:
- Topic
multi-region-async
has replicas acrosswest
andeast
regions, but only 1 and 2 are in the ISR, and 3 and 4 are observers.
This section tests the differences in replication policies on producers. Run the producer perf test:
./scripts/run-producer.sh
Sample output:
==> Produce: Single-region Replication (topic: single-region)
5000 records sent, 240.453977 records/sec (1.15 MB/sec), 10766.48 ms avg latency, 17045.00 ms max latency, 11668 ms 50th, 16596 ms 95th, 16941 ms 99th, 17036 ms 99.9th.
==> Produce: Multi-region Sync Replication (topic: multi-region-sync)
100 records sent, 2.145923 records/sec (0.01 MB/sec), 34018.18 ms avg latency, 45705.00 ms max latency, 34772 ms 50th, 44815 ms 95th, 45705 ms 99th, 45705 ms 99.9th.
==> Produce: Multi-region Async Replication to Observers (topic: multi-region-async)
5000 records sent, 228.258388 records/sec (1.09 MB/sec), 11296.69 ms avg latency, 18325.00 ms max latency, 11866 ms 50th, 17937 ms 95th, 18238 ms 99th, 18316 ms 99.9th.
Observations:
- In the first and third cases, topics
single-region
andmulti-region-async
have nearly the same throughput performance (e.g.,1.15 MB/sec
and1.09 MB/sec
, respectively, in the above example), because only the replicas in thewest
region need to ack. - In the second case for topic
multi-region-sync
, due to the poor network bandwidth between theeast
andwest
regions and due to an ISR made up of brokers in both regions, it took a big throughput hit (e.g.,0.01 MB/sec
in the above example). This is because the producer is waiting for anack
from all members of the ISR before continuing, including those inwest
andeast
. - The observers in the third case for topic
multi-region-async
didn't affect the overall producer throughput because thewest
region is sending anack
back to the producer after it has been replicated twice in thewest
region, and it is not waiting for the async copy to theeast
region.
This section tests the differences in follower fetching in the consumers. Run the consumer perf test where the consumer is in east
:
./scripts/run-consumer.sh
Sample output:
==> Consume from east: Multi-region Async Replication reading from Leader in west (topic: multi-region-async)
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2019-09-25 17:10:27:266, 2019-09-25 17:10:53:683, 23.8419, 0.9025, 5000, 189.2721, 1569431435702, -1569431409285, -0.0000, -0.0000
==> Consume from east: Multi-region Async Replication reading from Observer in east (topic: multi-region-async)
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2019-09-25 17:10:56:844, 2019-09-25 17:11:02:902, 23.8419, 3.9356, 5000, 825.3549, 1569431461383, -1569431455325, -0.0000, -0.0000
Observations:
- In the first case, the consumer running in
east
reads from the leader inwest
, and so it is negatively impacted by the low bandwidth betweeneast
andwest
. Its throughput is lower (e.g.0.9025
MB.sec in the above example). - In the second case, the consumer running in
east
reads from the follower that is also ineast
. Its throughput is higher (e.g.3.9356
MB.sec in the above example).
Notice that the multi-region-async
topic has a JMX metric ReplicasCount
that includes observers, whereas InSyncReplicasCount
excludes observers.
The new JMX metric CaughtUpReplicasCount
(kafka.cluster:type=Partition,name=CaughtUpReplicasCount,topic=([-.\w]+),partition=([0-9]+)
) across all brokers in the cluster reflects whether all the replicas, including observers, are caught up with the leader such that their log end offset is at least at the high watermark.
./scripts/jmx_metrics.sh
Sample output:
==> Monitor ReplicasCount
single-region: 2
multi-region-sync: 4
multi-region-async: 4
==> Monitor InSyncReplicasCount
single-region: 2
multi-region-sync: 4
multi-region-async: 2
==> Monitor CaughtUpReplicasCount
single-region: 2
multi-region-sync: 4
multi-region-async: 4
docker-compose stop broker-west-1 broker-west-2 zookeeper-west
Verify the new topic replica placement:
./scripts/describe-topics.sh
Sample output:
==> Describe topic single-region
Topic: single-region PartitionCount: 1 ReplicationFactor: 2 Configs: min.insync.replicas=1,confluent.placement.constraints={"version":1,"replicas":[{"count":2,"constraints":{"rack":"west"}}],"observers":[]}
Topic: single-region Partition: 0 Leader: none Replicas: 2,1 Isr: 1 Offline: 2,1
==> Describe topic multi-region-sync
Topic: multi-region-sync PartitionCount: 1 ReplicationFactor: 4 Configs: min.insync.replicas=1,confluent.placement.constraints={"version":1,"replicas":[{"count":2,"constraints":{"rack":"west"}},{"count":2,"constraints":{"rack":"east"}}],"observers":[]}
Topic: multi-region-sync Partition: 0 Leader: 3 Replicas: 1,2,3,4 Isr: 3,4 Offline: 1,2
==> Describe topic multi-region-async
Topic: multi-region-async PartitionCount: 1 ReplicationFactor: 4 Configs: min.insync.replicas=1,confluent.placement.constraints={"version":1,"replicas":[{"count":2,"constraints":{"rack":"west"}}],"observers":[{"count":2,"constraints":{"rack":"east"}}]}
Topic: multi-region-async Partition: 0 Leader: none Replicas: 2,1,3,4 Isr: 1 Offline: 2,1 Observers: 3,4
Observations:
- In the first case, the topic
single-region
has no leader, because it had only two replicas in the ISR, both of which were in thewest
region and are now down. - In the second case, the topic
multi-region-sync
automatically elected a new leader ineast
(e.g. replica 3 in the above output). Clients can failover to those replicas in the east region. - In the third case, the topic
multi-region-async
also has no leader, because it had only two replicas in the ISR, both of which were both in thewest
region and are now down. The observers in theeast
region are not eligible to become leaders automatically because they were not in the ISR.
To explicitly fail over the observers in the topic multi-region-async
to the east
region, trigger leader election (note that unclean
leader election may result in data loss):
docker-compose exec broker-east-4 kafka-leader-election --bootstrap-server broker-east-4:19094 --election-type UNCLEAN --topic multi-region-async --partition 0
Describe the topics again.
./scripts/describe-topics.sh
Sample output:
...
==> Describe topic multi-region-async
Topic: multi-region-async PartitionCount: 1 ReplicationFactor: 4 Configs: min.insync.replicas=1,confluent.placement.constraints={"version":1,"replicas":[{"count":2,"constraints":{"rack":"west"}}],"observers":[{"count":2,"constraints":{"rack":"east"}}]}
Topic: multi-region-async Partition: 0 Leader: 3 Replicas: 2,1,3,4 Isr: 3,4 Offline: 2,1 Observers: 3,4
...
Observations for topic multi-region-async
:
- It has a leader again (e.g. replica 3 in the above output)
- The observers are now in the ISR list (e.g. replicas 3,4 in the above output)
docker-compose start broker-west-1 broker-west-2 zookeeper-west
Wait for 5 minutes, which is the default duration for leader.imbalance.check.interval.seconds
, until the leadership election restores the preferred replicas.
(You can also trigger it with docker-compose exec broker-east-4 kafka-leader-election --bootstrap-server broker-east-4:19094 --election-type PREFERRED --all-topic-partitions
).
Verify the new topic replica placement is restored.
./scripts/describe-topics.sh
Sample output:
Topic: single-region PartitionCount: 1 ReplicationFactor: 2 Configs: min.insync.replicas=1,confluent.placement.constraints={"version":1,"replicas":[{"count":2,"constraints":{"rack":"west"}}],"observers":[]}
Topic: single-region Partition: 0 Leader: 2 Replicas: 2,1 Isr: 1,2 Offline:
==> Describe topic multi-region-sync
Topic: multi-region-sync PartitionCount: 1 ReplicationFactor: 4 Configs: min.insync.replicas=1,confluent.placement.constraints={"version":1,"replicas":[{"count":2,"constraints":{"rack":"west"}},{"count":2,"constraints":{"rack":"east"}}],"observers":[]}
Topic: multi-region-sync Partition: 0 Leader: 1 Replicas: 1,2,3,4 Isr: 3,4,2,1 Offline:
==> Describe topic multi-region-async
Topic: multi-region-async PartitionCount: 1 ReplicationFactor: 4 Configs: min.insync.replicas=1,confluent.placement.constraints={"version":1,"replicas":[{"count":2,"constraints":{"rack":"west"}}],"observers":[{"count":2,"constraints":{"rack":"east"}}]}
Topic: multi-region-async Partition: 0 Leader: 2 Replicas: 2,1,3,4 Isr: 2,1 Offline: Observers: 3,4
Observations:
- All topics have leaders again, in particular
single-region
which lost its leader when the west region failed - The leaders are restored to the east region. If they are not, then wait a full 5 minutes (duration of
leader.imbalance.check.interval.seconds
)
Note: On failback from a failover to observers, any data that was not replicated to observers will be lost because logs are truncated before catching up and joining the ISR.
All the individual steps above can be run with this automated script:
./scripts/start.sh
Stop the demo and all Docker containers.
./scripts/stop.sh
- If containers fail to ping each other (e.g., failures seen in running
./scripts/validate_connectivity.sh
), then stop the demo, clean up the Docker environment, and restart the demo. If it still fails, restart Docker and run again.
# Stop demo
./scripts/stop.sh
# Clean up the Docker environment
for c in $(docker container ls -q --filter "name=pumba"); do docker container stop "$c" && docker container rm "$c"; done
docker-compose down -v --remove-orphans
for v in $(docker volume ls -q --filter="dangling=true"); do docker volume rm "$v"; done
# Restart demo
./scripts/start.sh
- Pumba may be overloading the Docker inter-container network. Consider tweaking the Pumba settings in scripts/latency_docker.sh and re-test in your environment.