I worked with the Apache Kafka protocol on the low level quite a bit. It wasn't easy to start doing this following the official guide only and I read the code a lot. With this post, I want to give you a head start by guiding you step by step from primitive values to meaningful requests.
In this post:
- Explore the Kafka protocol code and the protocol in action with Wireshark.
- Learn how to read and write primitive values.
- Combine primitives to perform meaningful requests.
We will use Python as the programming language. However, the code will be zero-dependency and easily portable to the language of your choice.
Intro
Apache Kafka has a custom binary protocol that is versioned, with various data types, optional fields, etc. Unfortunately, it doesn't use a well-known serialization format like Protobuf. The protocol message schema is described in JSON. The actual Java code that does serialization and deserialization is generated from this description.
When you're in the Java world, you can use the official client library. But if you're using another platform, you're relying on third-party implementations. They exist, but they're focusing mostly on the producer and consumer, rarely on some aspects of the admin client. If you need to do something else, you're on your own.
This post will help you start hacking on the Kafka protocol. (If you are looking for a ready Python (de-)serialization library for the Kafka protocol, check out Kio1. For Rust, have a look at the library I'm working on.)
You can find the code from this post and some more like tests in this repository on Github.
Protocol overview
You can find the official protocol description on this page. I encourage you to familiarize yourself with it, at least read the "Preliminaries" and "The Protocol" sections.
Here are some highlights. The Kafka protocol is a TCP-based binary request-response protocol:
- TCP-based: the Kafka broker listens at ports on the TCP stack (which gives some benefits like ordering guarantees).
- Binary: messages are encoded in binary form and require special serialization and deserialization according to predefined schemas.
- Request-response: exchanges are initiated by the client, the server is passive and only replies to requests.
Each API message type consists of the request and response pair and is identified by a numeric value called the API key. For example, Produce and Fetch, the most characteristic Kafka RPCs, have API keys 0 and 1 correspondingly. Nowadays, there are close to 90 API message types (some of them are inter-broker, not client-broker).
Requests and responses are described by versioned schemas. Versioning allows for protocol evolution, for example, adding or removing fields or changing their data type.
First steps
Here are some things you can do to start working with the Kafka protocol.
Learn Kafka protocol code
The Kafka code is the source of truth (practically) about the protocol. Check out the Kafka code from Github and switch to the release you're interested in (e.g. 3.8.0):
git clone git@github.com:apache/kafka.git git checkout 3.8.0
You can find the API message definitions in JSON in clients/src/main/resources/common/message. Each JSON file contains the definition of one message2 type with all its versions. clients/src/main/resources/common/message/README.md gives a good overview of the schema definition format. Pay attention to stuff like default values, flexible versions, and tagged fields.
Apart from the concrete API message types you're interested in, have a look at clients/src/main/resources/common/message/RequestHeader.json and ResponseHeader.json, which describe headers used in each request-response exchange.
Let's run the code generator:
./gradlew processMessages
Now you can find the generated classes in clients/src/generated/java/org/apache/kafka/common/message.
Have a look at clients/src/generated/java/org/apache/kafka/common/message/ApiMessageType.java. This utility:
- describes the whole set of existing API message types along with their schemas and versions;
- maps API message versions to request and response header versions in the requestHeaderVersion and responseHeaderVersion functions.
Other files are one-to-one generated from the corresponding schema JSONs (sometimes with the Data postfix, it's a compatibility thing). In these files you'll find:
- The versioned schema definitions SCHEMA_0, SCHEMA_1, etc. Sometimes schemas stay the same between versions. This is normal and means only the request-response counterpart changed.
- read and write methods where you can find the ground truth for the protocol serialization and deserialization.
Pay attention to the inner classes as well as they represent the complex structure of the message.
Run Kafka in Docker
Running Kafka in Docker is a convenient way to get a broker running to test the protocol or capture the network exchange. Since version 3.7.0, the Kafka team builds official Docker images, which you can run as:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
If you're interested in older versions, search Docker Hub for other images. However, this may be not needed considering the Kafka protocol is backward and forward compatible: new brokers will recognize the old protocol versions just fine and the old clients can communicate with newer brokers.
If you read this, you probably already have the Kafka command line tools on your machine, but just in case, you can also run them in Docker. For example, run this to create a topic:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Inspect protocol with Wireshark
Having familiarized ourselves with the Kafka code, let's look at the protocol in action. Wireshark is a widely used tool for such inspections. It can dissect the Kafka protocol (and supports the latest versions if your version is fresh enough).
I built Wireshark from sources of version 4.5.0, because my operating system package is old and not able to dissect Kafka protocol with new versions. Wireshark 4.5.0 should mostly support Kafka 3.7 protocol versions. However, you can try the available version and see how it works for you.
Let's run Wireshark on the loopback interface with the port 9092 capture filter (1) and the kafka display filter (2):
Create a topic and see what Wireshark shows us:
./gradlew processMessages
The display filter removes everything irrelevant and leaves only Kafka requests and responses. As Wireshark understands most message versions in the protocol (depending on the Wireshark version, of course), you can conveniently look into the structure of each message. Wireshark will also show the corresponding bytes.
Wireshark is a great debugging tool that could help you understand how the protocol works in a particular case and what's wrong with your implementation.
Reading and writing primitive values
The protocol defines a number of primitive types, which full description you can find here. Let's implement the read and write code for them. You can find all functions in this file, also check out the corresponding test file.
Fixed length integer values: INT8, INT16, INT32, INT64, and UINT16
These are integer numbers with known fixed length: 1, 2, 4, or 8 bytes. Naturally, you can find such fields a lot throughout the protocol. In this class you may see how (trivially) their reading and writing are implemented in Kafka.
Let's first define the function for reading an exact number of bytes from a buffer3:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
The BinaryIO type hint in Python represents an object from which bytes can be read and to which they can be written. It has methods like read, write, tell (for getting the current position), seek (for changing the position).
Now we can implement reading INT8:
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
Kafka uses the big-endian (AKA network) byte ordering, hence byteorder="big".
Now writing:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
I won't repeat this for INT16, INT32, and INT64: the only significant differences are the number of bytes (2, 4, and 8 correspondingly) and checked ranges ([-(2**15), 2**15 - 1], [-(2**31), 2**31 - 1], and [-(2**63), 2**63 - 1] correspondingly).
UINT16 is similar to INT16:
./gradlew processMessages
Note the signed=False here.
BOOLEAN
BOOLEAN is essentially INT8 with extra logic: == 0 means false, != 0 means true.
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
You can see an example of BOOLEAN in the allowAutoTopicCreation field of the MetadataRequestData generated class.
FLOAT64
FLOAT64 is a double-precision 64-bit IEEE 754 value. Python doesn't have to_bytes and from_bytes for float like it has for int. So instead we will use the struct module from the standard library.
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
>d means "a double value in the big-endian byte order".
UNSIGNED_VARINT: Variable length integer values
Variable length integers are an approach that allows using of fewer bits per value when values are small. Kafka uses the varint approach from Protocol Buffers. The idea is simple:
Each byte in the varint has a continuation bit that indicates if the byte that follows it is part of the varint. This is the most significant bit (MSB) of the byte (sometimes also called the sign bit). The lower 7 bits are a payload; the resulting integer is built by appending together the 7-bit payloads of its constituent bytes.
You can check the Protobuf specification and Kafka implementation (read, write) for details.
This type isn't used in the protocol fields per se, but it's used for compact collections described below.
Let's implement it. For confidence, we get some examples directly from the source of truth, Kafka's ByteUtils class:
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
Running this, we'll get:
def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes: value = buffer.read(num_bytes) if len(value) != num_bytes: raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}") return value
Let's implement this in probably not the most performant, but a straightforward way:
def read_int8(buffer: BinaryIO) -> int: return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
UUID
UUID are 128-bit values used for uniquely identifying entities. For example, they are used to pass topic IDs in CreateTopicsResponse.
You can see how they are read and written in the Kafka code. It's simple to reproduce:
def write_int8(value: int, buffer: BinaryIO) -> None: if -(2**7) <= value <= 2**7 - 1: buffer.write(value.to_bytes(1, byteorder="big", signed=True)) else: raise ValueError(f"Value {value} is out of range for INT8")
Note that Kafka treats null/None as zero UUID, so we're doing the same here.
Strings
The Kafka protocol has 4 types of strings:
compact | non-compact | |
---|---|---|
nullable | COMPACT_NULLABLE_STRING | NULLABLE_STRING |
non-nullable | COMPACT_STRING | STRING |
Compactness indicates whether the string length is encoded with INT16 or with UNSIGNED_VARINT. It depends on the message version (it was introduced around 2017). Nullability is whether the value can be null. It depends on the message purpose and the version as well (sometimes string fields become optional during the protocol evolution).
Strings are ubiquitous in the protocol. For example, see the field name in the generated class MetadataRequestData.MetadataRequestTopic.
Strings are encoded pretty straightforwardly: first goes the length and then comes the UTF-8 encoded body. The maximum allowed length is 32767 bytes. Null strings have the length of -1 and obviously no body.
As the only difference between compact and non-compact is how the string length is encoded, we can have one function for both modes.
Let's start with reading and writing nullable strings:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
Non-nullable string functions can be built on top of these:
./gradlew processMessages
Byte arrays
Byte arrays are very similar to strings. They have the same potential nullability and compactness:
compact | non-compact | |
---|---|---|
nullable | COMPACT_NULLABLE_BYTES | NULLABLE_BYTES |
non-nullable | COMPACT_BYTES | BYTES |
They are also encoded in the same way: length body. Naturally, the body is not treated as an UTF-8 string, but as an opaque byte array. The max length of a byte array is 2147483647;
You can find an example of bytes in the field metadata in the generated class JoinGroupRequestData.JoinGroupRequestProtocol.
git clone git@github.com:apache/kafka.git git checkout 3.8.0
As you can see, the difference between these functions and the corresponding functions for strings is small.
Other arrays
The protocol supports arrays of types other than bytes: strings, numbers, structs (but not nested arrays): ARRAY and COMPACT_ARRAY. Compactness is the same as in byte arrays and strings.
Nullability is not explicitly mentioned in the protocol specification for some reason. However, arrays can be nullable. This is controlled by nullableVersions in the schema definitions, like here.
Considering we already implemented read_array_length and write_array_length, let's implement the reader and writer functions:
./gradlew processMessages
RECORDS
RECORDS encode Kafka records. The structure is pretty complex and I'm not going to describe it in this guide (however, please let me know in the comments ?? if you would like to have it.) For simplicity, we can treat records as NULLABLE_BYTES or COMPACT_NULLABLE_BYTES (depending on the message version).
Tagged fields
Tagged fields are an extension to the Kafka protocol which allows optional data to be attached to messages. The idea is twofold:
- If the client of service doesn't understand the tagged field, it'll save it as unknown and ignore it.
- If a field is rarely used, its default value can be skipped from transferring.
Have a look, for instance, at this field. It has taggedVersions, which says since which version this field is tagged (in most cases, it's the same version when the field was added).
A tagged field consists of:
- The tag of the UNSIGNED_VARINT type.
- The data of the COMPACT_BYTES type.
You can find more details about tagged fields in KIP-482.
Let's implement:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
Here they are titled "unknown". Known fields need to be made so inside their structures.
Message structure
The high-level message structure is very straightforward. According to the specification:
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
That is, it's a message itself preceded by its size in bytes. Both request and response messages consist of the header immediately followed by the body. For some reason, this isn't explicitly documented4, but you can trust me ? or check the code.
Request and response header
The request header exists in three versions: 0, 1, and 2. They are specified in the protocol as:
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
TAG_BUFFER is the tagged fields mentioned earlier.
Let's implement them as Python data classes:
git clone git@github.com:apache/kafka.git git checkout 3.8.0
As you can see, there are some tagged fields in version 2, there are no expected known fields. If some tagged field is sent erroneously to the broker, it will be ignored.
The response header exists in two versions: 0 and 1. They are specified in the protocol as:
./gradlew processMessages
Let's also implement them:
docker run --rm -ti -p 9092:9092 apache/kafka:3.8.0
We don't implement read for the request headers and write for the response ones. This is for brevity: we're not going to send response headers and receive the request ones in our examples as we're not programming the server side. However, if you're interested in the server side as well, you need to implement both functions (which should be straightforward).
Correlation ID
Note particularly the correlation_id field in the request and response headers. The protocol supports pipelining: the client can have more than one outstanding request per connection. The correlation ID allows it to match responses to requests.
Header version selection
Which version must be used is a function of the API key and message version. It's not currently documented in the protocol guide explicitly5.
Use the requestHeaderVersion and responseHeaderVersion functions in the generated class ApiMessageType as the reference.
Sending requests and receiving responses
Now, having all this knowledge and code, let's finally send an ApiVersions request and receive and read a response. ApiVersions is normally the first request that the client sends. It's purpose is to find the API versions and features supported by the broker. We implement the latest version 3.
In the protocol specification, it's defined as:
docker run --rm -ti --net=host apache/kafka:3.8.0 \ /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
Let's make the data class:
/opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 --create \ --topic test-topic1 --partitions 2
And the response:
def read_exact(buffer: BinaryIO, num_bytes: int) -> bytes: value = buffer.read(num_bytes) if len(value) != num_bytes: raise ValueError(f"Buffer underflow: expected {num_bytes}, got {len(value)}") return value
[api_keys] means "an array of api_keys", where api_keys is the structure defined two lines below.
Converting this to Python data classes:
def read_int8(buffer: BinaryIO) -> int: return int.from_bytes(read_exact(buffer, 1), byteorder="big", signed=True)
When we speak about arrays, we need to know whether we need compact or non-compact ones. To find this out, let's have a look at the schema definition in ApiVersionsRequest.json. You can see "flexibleVersions": "3 ", which means that compact arrays are used starting from version 3 (more on this in README.md in the schema directory). Since we're working with version 3 here, we use compact arrays.
Having the request and response classes implemented, we can send and receive these requests. For this ApiVersions v3 we need the v2 request header and the v0 response header (check the generated ApiMessageType.java). The API key (18) you can find in ApiVersionsRequest.json or in the protocol specification.
git clone git@github.com:apache/kafka.git git checkout 3.8.0
If you run this code, you will see the response header and message printed in the console. Congratulations, you've performed a correct network exchange with the Kafka broker!
You will notice three tagged fields put in _unknownTaggedFields. The read and write methods of the generated ApiVersionsResponseData class and also the message definition in ApiVersionsResponse.json will help you to interpret them. Consider this homework ?
-
In my day job, we developed an open source library Kio. It allows us to do arbitrary Kafka API calls from Python easily. The serialization /deserialization code, like in Kafka itself, is generated from the JSON protocol definitions. The generated code is rigorously tested, including property testing against real Java Kafka code.??
-
Or "message" if you like: some of the schemas are not for API, but e.g. for on-disk data.??
-
The read_exact function has a drawback that it duplicates the data when the underlying buffer is already in memory. However, it's more convenient for education purposes.??
-
I made a PR to fix this.??
-
Again, I made a PR to fix this.??
The above is the detailed content of Kafka protocol practical guide. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undress AI Tool
Undress images for free

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics

Python's unittest and pytest are two widely used testing frameworks that simplify the writing, organizing and running of automated tests. 1. Both support automatic discovery of test cases and provide a clear test structure: unittest defines tests by inheriting the TestCase class and starting with test\_; pytest is more concise, just need a function starting with test\_. 2. They all have built-in assertion support: unittest provides assertEqual, assertTrue and other methods, while pytest uses an enhanced assert statement to automatically display the failure details. 3. All have mechanisms for handling test preparation and cleaning: un

PythonisidealfordataanalysisduetoNumPyandPandas.1)NumPyexcelsatnumericalcomputationswithfast,multi-dimensionalarraysandvectorizedoperationslikenp.sqrt().2)PandashandlesstructureddatawithSeriesandDataFrames,supportingtaskslikeloading,cleaning,filterin

Dynamic programming (DP) optimizes the solution process by breaking down complex problems into simpler subproblems and storing their results to avoid repeated calculations. There are two main methods: 1. Top-down (memorization): recursively decompose the problem and use cache to store intermediate results; 2. Bottom-up (table): Iteratively build solutions from the basic situation. Suitable for scenarios where maximum/minimum values, optimal solutions or overlapping subproblems are required, such as Fibonacci sequences, backpacking problems, etc. In Python, it can be implemented through decorators or arrays, and attention should be paid to identifying recursive relationships, defining the benchmark situation, and optimizing the complexity of space.

To implement a custom iterator, you need to define the __iter__ and __next__ methods in the class. ① The __iter__ method returns the iterator object itself, usually self, to be compatible with iterative environments such as for loops; ② The __next__ method controls the value of each iteration, returns the next element in the sequence, and when there are no more items, StopIteration exception should be thrown; ③ The status must be tracked correctly and the termination conditions must be set to avoid infinite loops; ④ Complex logic such as file line filtering, and pay attention to resource cleaning and memory management; ⑤ For simple logic, you can consider using the generator function yield instead, but you need to choose a suitable method based on the specific scenario.

Future trends in Python include performance optimization, stronger type prompts, the rise of alternative runtimes, and the continued growth of the AI/ML field. First, CPython continues to optimize, improving performance through faster startup time, function call optimization and proposed integer operations; second, type prompts are deeply integrated into languages ??and toolchains to enhance code security and development experience; third, alternative runtimes such as PyScript and Nuitka provide new functions and performance advantages; finally, the fields of AI and data science continue to expand, and emerging libraries promote more efficient development and integration. These trends indicate that Python is constantly adapting to technological changes and maintaining its leading position.

Python's socket module is the basis of network programming, providing low-level network communication functions, suitable for building client and server applications. To set up a basic TCP server, you need to use socket.socket() to create objects, bind addresses and ports, call .listen() to listen for connections, and accept client connections through .accept(). To build a TCP client, you need to create a socket object and call .connect() to connect to the server, then use .sendall() to send data and .recv() to receive responses. To handle multiple clients, you can use 1. Threads: start a new thread every time you connect; 2. Asynchronous I/O: For example, the asyncio library can achieve non-blocking communication. Things to note

The core answer to Python list slicing is to master the [start:end:step] syntax and understand its behavior. 1. The basic format of list slicing is list[start:end:step], where start is the starting index (included), end is the end index (not included), and step is the step size; 2. Omit start by default start from 0, omit end by default to the end, omit step by default to 1; 3. Use my_list[:n] to get the first n items, and use my_list[-n:] to get the last n items; 4. Use step to skip elements, such as my_list[::2] to get even digits, and negative step values ??can invert the list; 5. Common misunderstandings include the end index not

Polymorphism is a core concept in Python object-oriented programming, referring to "one interface, multiple implementations", allowing for unified processing of different types of objects. 1. Polymorphism is implemented through method rewriting. Subclasses can redefine parent class methods. For example, the spoke() method of Animal class has different implementations in Dog and Cat subclasses. 2. The practical uses of polymorphism include simplifying the code structure and enhancing scalability, such as calling the draw() method uniformly in the graphical drawing program, or handling the common behavior of different characters in game development. 3. Python implementation polymorphism needs to satisfy: the parent class defines a method, and the child class overrides the method, but does not require inheritance of the same parent class. As long as the object implements the same method, this is called the "duck type". 4. Things to note include the maintenance
