Kafka in the Cloud: Why it’s 10x better with Confluent | Find out more
Confluent has published official Docker containers for many years. They are the basis for deploying clusters with Confluent for Kubernetes (CFK) and one of the underpinning technologies behind Confluent Cloud.
These containers are convenient for quickly setting up a local cluster for testing, including components required, such as the Confluent Schema Registry or Confluent Control Center. You can configure each container via a set of environment variables, using the internal Docker network and hostname resolution to reference the various components, such as an Apache Kafka® broker. This can be done using docker-compose, which creates a network for your cluster and allows you to keep all configurations in a single document.
This process is relatively easy if you have a single ZooKeeper or KRaft controller and a single Kafka broker in your cluster. However, when you are trying to configure a large cluster, for example, for failover testing, you might run into some challenges.
For example, to test how the Kafka cluster handles the loss of a broker, you need to configure multiple Kafka brokers in your docker-compose.yml file. Be careful to not reuse the same hostname or external port number. When referencing your bootstrap servers, you need to use the correct hostnames and ports. Mistakes in copying and pasting or editing entries can lead to a degraded cluster, even if it appears to be running.
You can create a template for a three ZooKeeper/three broker cluster, but it does not scale well if you need to test four or six brokers. This is a common problem we see some customers face who want to test various configurations.
Reconfiguring multiple Docker Compose files for different scenarios can become very tedious. To solve this, you can use a Python script I wrote that generates the docker-compose file: kafka-docker-composer.
In this guide, we'll share more details about the installation, examples, use cases, and more.
The tool requires a working Python 3 environment. (It has been tested with Python 3.8 - 3.11.) You also need Jinja2, which comes automatically with many distributions. To check if Jinja 2 is installed, run this command:
If you see a ModuleNotFoundError
, install jinja2 with pip3:
You also need to have Docker and docker-compose installed. Once you have them, you can clone the repository.
Now you're ready to start creating docker-compose files.
The templating engine Jinja2 allows you to create a template document with variables that can be populated through the application.
This template loops through all configured services and populates the required field in the final docker-compose.yml file for each container. Each container represents a service such as a Confluent Server or a ZooKeeper instance, a Schema Registry, and so on.
For each entry, define the container image to be run as well as the host and container name. Each instance can have optional parameters, including:
Health checks: Define dependencies that will wait for the prerequisites to be healthy.
Dependencies: Define the order in which containers are created. If health checks are defined as a dependency, they will also be listed here.
Environment variables: Used for configuring the service within the container.
Ports: Mapped to the docker host.
Volumes: Mounted to inject additional files, such as Kafka Connect Connector plugins or metrics configurations for monitoring using Prometheus.
The application kafka_docker_composer.py takes a list of arguments and determines how the template should be populated. It creates the dependencies between the different components, ensures that names and ports are unique, sets up advertised listeners correctly, and ensures that dependent services like Schema Registry of Kafka Connect point to the corresponding Confluent Server brokers.
There are many different configurations, such as using ZooKeeper or KRaft, all controlled by a set of arguments or a configuration file. Here is an overview:
This is best explained through the following use cases.
You can create a docker-compose file for the simplest case: one ZooKeeper and one broker using the latest Docker image configuration reference for Confluent Platform (currently, this is 7.6.0).
Not convinced? Try it out.
You might have to use “docker-compose” instead of “docker compose” on your platform or upgrade your Docker environment to use Compose V2.
The broker ports start with 9091 for the first broker. Use kafka-topics to create and list topics:
After you are done with the cluster, shut it down with the following:
The -v
option removes the volumes as well, avoiding the potential problem of reusing stale data.
ZooKeeper is deprecated; therefore, modern versions of Kafka prefer KRaft. Just change the option from zookeeper
to controller
:
Look inside the generated docker-compose.yml file to see which environment variables you must set to create a controller-broker pair successfully.
Setting these variables by hand can be tedious and error-prone, but this tool simplifies the process.
Creating single broker setups is nice, but more is needed to show the power of this tool. A minimum standard cluster consists of three controllers and three brokers:
Note that each broker has its own externally visible port mapped to the host so that you can access each broker individually. The other ports are for the JMX agents if you want to configure Prometheus and Grafana for your cluster:
As you can see, Prometheus is exposed on port 9090 and Grafana on port 3000. Try out Grafana by pointing your browser to http://localhost:3000. The user and password are set to “admin/adminpass”, but you can adjust that in volumes/config.ini.
There are separate dashboards for ZooKeeper and KRaft controllers as indicated by their names.
The exporter configuration files, dashboards, and the exporter jar are in the volumes directory, so you do not have to download anything separately.
Remember that it takes a few minutes for JMX exporters to start up. Check the Status/Targets page in Prometheus to see if your metrics scrapes succeeded.
In addition to the brokers and controllers, you can add Confluent Schema Registries, Kafka Connect worker nodes, ksqlDB nodes, and Confluent Control Center to the mix. You will probably need to increase the memory of your Docker environment, specifically, if you run this on your notebook with Docker Desktop.
Ensure your Docker Desktop is configured with sufficient resources. For example, 8 cores and 16 GB of memory provide ample room to run a large cluster in Docker Compose.
The health checks are built for this purpose. If, for example, Schema Registry starts up before the broker finishes booting, it will fail since it cannot create its topic, and it will not try again. Verifying the brokers are up and running and ready to receive clients ensures that the dependent components do not fail on startup.
One specific component is the Connect cluster. This cluster comes with a bare-bones set of connector plugins installed. By mapping a volume containing unzipped connector plugin jar files before starting up the cluster, you can easily use the same setup to test various connectors. For example, the Datagen Connector is useful for acting as a producer for testing without external dependencies.
Here is an example plugin configuration. Bring up a cluster with:
This will list all installed connector plugins. You might have to install jq, a handy JSON formatting and filtering tool.
Create a target topic, then install the Datagen connector:
Other connector plugins can be installed by downloading the zip file from the connector hub, for example, the Elasticsearch Sink connector. Unzip the file into the volumes/connect-plugin-jars directory and restart your connect clusters:
The Kafka Connect clusters take a while to start up, so you need to be a little patient. You can use the following command to monitor progress:
The original purpose of the kafka-docker-composer was to test failover scenarios in multiple data centers.
For this purpose, there are two additional options: racks and ZooKeeper groups.
Specify the number of racks for the number of unique racks or data centers you want to test with. A typical example is to choose three racks, which is a common setup for production clusters.
The configured brokers will then be assigned round-robin to the racks. This is particularly useful if you configure more brokers than the number of racks to test the distribution of partitions across the different racks.
Have a look at the generated docker-compose.yml file. You will notice that both the controllers and the brokers have a new environment variable, for example:
Create a new topic called products, but add the option --partitions 6
, then run:
If you go through every single partition and compare their placement on the individual replicas, you will notice that each replica is in a different rack. Even in the case of a loss of one rack each partition will still have two replicas online. Since min.insync.replicas
is set to 2, producers and consumers will still be able to work.
You can then test what happens if a data center goes down by using standard docker-compose methods to kill the containers. Attach a producer and consumer to the cluster to convince yourself that the cluster is still accessible and capable of processing requests. This demonstrates the resilience features of Apache Kafka.
In our example, we have the following distribution:
This is because the tool assigned the rack ID round-robin.
To produce and consume some data, start with the consumer readily waiting for some messages:
Note that specifying three brokers across all three racks here is good practice in case a broker or a whole rack (data center) is down while starting the application. For this test, it is not strictly necessary because we will receive the full list of all brokers upon connection anyway.
Then we start the producer:
The tool seq generates a sequence of numbers separated by a newline character. The formatting avoids presenting the numbers in scientific notation. The result is a new message for each number, nicely sequenced for easy verification. Note that there is no guarantee that the consumer will return the numbers in the same order since we created the topic with six partitions.
The producer will work through this sequence quite quickly, so hurry with the next step. To simulate an outage of one data center, kill all containers in one rack. Let’s pick rack-2:
The consumer will stumble for a moment while the active controller sorts out the leadership for each partition, but it will then pick up again. When the consumer no longer finds any new messages, it will wait, and the consumer can be shut down. It should print out the total number of messages: 9000001.
You can vary the sequence to test out the loss of a rack for producers. Many error messages may appear when the producer complains about brokers not being reachable anymore, but it will sort itself out after a while. This is also a good example for the idempotent producer since no messages will be duplicated during this time, as you will be able to verify with your consumer.
Bring the cluster back up again with a simple:
Depending on how long the producer was working while the cluster was in a degraded state, this last command might take a while to finish, because the brokers have to catch up first before they respond to the health-check command.
The other option is for configuring ZooKeeper groups, specifically for a two-data center scenario with hierarchical groups. You will need six ZooKeeper instances at a minimum. The tool will calculate the distribution and set up the docker-compose file accordingly.
If you inspect the generated docker-compose.yml file, you will find these lines:
These are the generated groups, with ZooKeeper instances distributed between them. The cluster will come up just fine with these settings, but when one (simulated) data center is shut down, you will see why a two-data center solution is always inferior to a three-data center setup.
The ZooKeeper instances do not automatically failover to the degraded state of three ZooKeepers. Instead, they will refuse to accept all connections until the situation is resolved. This also means brokers will refuse to acknowledge producers and even consumers will fail because they need to update the ZooKeeper about their progress.
You can resolve the situation by starting the second data center up again, in which case the cluster will recover. If you want to continue in the degraded state, manually remove the groups from the configuration files and restart the remaining ZooKeeper instances.
The required procedure goes beyond the scope of this post. In production environments, we advise administrators to keep a second configuration file handy that can be swapped in after the ZooKeeper instances have been shut down.
This is not something you can and want to automate, but you can script it and invoke the failover script manually should the need arise. The procedure should be well documented and tested in a non-production environment to ensure that administrators know what to do during the panic of an outage. Note that this is not possible without downtime (RTO > 0).
Looking through the help list, you might notice a few other arguments that have not been discussed.
This is an experimental feature for upgrading KRaft controllers to full brokers. This is not officially supported for production environments, but the --shared-mode
argument can be used for experimentation. If nothing else, it will show you which additional environment variables you need for this setup.
When you start up this cluster, you will see that you have six active brokers rather than the original three since the controllers act as brokers as well. If you have kcat (kafkacat) installed, you can use kcat -L -b localhost:9094
to verify this.
A cluster running with KRaft needs a UUID to identify membership for controllers and brokers. A UUID has been created and hardcoded, but if you want to change this value, use the --uuid
option.
The infrastructure for TC is built but not much testing has been done. This option enables a build for all components to create a new image that contains the tc tool that can be used to inject latency.
Check this Medium article that explains a little more about this feature if you are interested, but most of the ideas for this tool come from the Confluent tutorial on multi-region clusters.
Building the Docker images with tc enabled involves multiple steps:
Adjust the .env file in the root directory to the release you want to base your test on. These are straight from the underlying source and have a lot of redundancies. The current update is to 7.6.0.
Run the build script script/build_docker_images.sh
. This will download the configured base images and build new images stored in your local image cache.
Run kafka-docker-composer with the --with-tc
option to create your docker-compose file. Ensure the version matches the version you have just created. Use the --release
option if necessary.
You now have a configuration with Docker images that have been enhanced with the tc utility.
To start this up and test it, do the following:
Bring the cluster up as before
Execute a ping within a container against another instance to verify the normal latency
Enable latency injection in each node
Run ping again to verify the effect
Perform your tests with a multi-region simulator
Here is a simple example:
The option -u0
gives you root access to execute commands with elevated permissions within the container.
You should now observe a difference in the ping round time of around 200ms, with 100ms latency injected from each side.
This is by no means a complete list of all features present since this tool is continuously updated to suit testing and understanding requirements. For example, the JMX and HTTP ports are exposed to the host to be able to use Visual VM to understand more of the metrics and to understand the REST interface to the Confluent server.
Check the GitHub repository for the latest changes. You can set up a watcher if you want to receive updates via email. Why not add a star to the repository while you are there?
This tool does not have any authentication or authorization features yet. This is because there are other tools to test and demonstrate security. Still, it might be a worthwhile project for SASL/PLAIN or even SASL/SCRAM. TLS certificates are a bit trickier because generating these would require a new image. The goal is to start the whole project with a single docker composer up, not a script.
The same is true for Kerberos and LDAP for RBAC, which would require a Samba service in a separate container and some configuration. Let us know in the comments or file an issue on GitHub.
Apache Kafka 3.7 is now available and comes with an official Docker image as of KIP-975.
The image has been successfully tested separately from the kafka-docker-composer. It might be useful to enable the swapping out of the Confluent Server (cp-server) image to test out the new features in the next Kafka release—since these releases are published typically three to six months before the corresponding Confluent release. The main difficulty will be that open-source Apache Kafka has no built-in REST interface, making health checks more challenging.
I have been using and extending kafka-docker-composer for the last five years to demonstrate how to set up a cluster in Docker. The main purpose of this tool was to show how resilient a Confluent cluster is even during a large outage.
Lately, I have used the same tool to teach myself KRaft, experiment with it, and use the setup for connector testing and development.
I'd like to know what you will use this tool for. You can let me know by commenting on the GitHub repository.
Happy hacking!
We are proud to announce the release of Apache Kafka 3.9.0. This is a major release, the final one in the 3.x line. This will also be the final major release to feature the deprecated Apache ZooKeeper® mode. Starting in 4.0 and later, Kafka will always run without ZooKeeper.
In this third installment of a blog series examining Kafka Producer and Consumer Internals, we switch our attention to Kafka consumer clients, examining how consumers interact with brokers, coordinate their partitions, and send requests to read data from Kafka topics.