My name is Sergey Kalinets, I am an architect at Parimatch Tech, and in this article, I want to share our experience in the field of message search in Kafka.
For our company, Kafka is the central nervous system through which microservices exchange information. From input to output, a message can go through a dozen services that filter and transform it, transferring it from one topic to another. These services are owned by different teams, and it is very useful to see what is contained in a particular message. This is especially interesting in cases when something does not go according to plan: it is important to understand at what stage everything treads awry. From a remote perspective, the solution seems to be simple: you need to get the relevant messages from Kafka and see what is wrong with them. But, as usual, the details are the most interesting part.
To begin with, Kafka is not just a message broker, as many people think and use it, but a distributed log. That means a lot of things, but what we find interesting is that messages are not deleted from threads after the recipients have read them, and technically you can read them again at any time and see what's inside. However, this gets complicated by the fact that you can read only sequentially from Kafka. We need to know the offset (for simplicity this is the order number in the topic) from which we want the messages. It is also possible to specify a time as the starting point, but then you can still only read all the messages in order.
It turns out that in order to find the right message, you have to read the pack and find among them those that are interesting. For example, if we want to solve the problem of the player with id=42, we need to find all messages where he is mentioned (playerId: 42), line them up in a chain, and then see at what point everything went wrong.
In contrast to the databases like MySQL or MSSQL which come with GUI client applications, Kafka doesn't treat us with such tricks and only offers console utilities with a rather narrow (at first glance) functionality.
But there is good news as well. There are a number of solutions on the market that help make the whole process easier. I would like to point out right away that “on the market” here is not in the sense of “for money”, all the tools discussed below are free.
I will describe some of the most frequently used ones, and at the end, we will analyze in more detail what, in our opinion, is close to ideal.
So, what can you choose from?
In fact, you just need to add docker run — rm -it taion809/kafka-cli:2.2.0 before the command, which in docker means “run this image, display what it shows on my screen and delete the image when it’s finished”. You can go even further and add an alias like:
The utility seems archaic, and its console nature can scare off inexperienced GUI adepts, but like most console tools, it gives more powerful results when used correctly. How exactly — let’s look at the example of the following tool, also a console-type one.
This is already a very powerful thing that permits you to read and write from Kafka and get a list of topics. It’s also console-based, but at the same time more convenient to use in comparison with standard utilities like kafka-console-consumer (which can also be run from docker).
This is how you can save 10 messages in a messages file (in JSON format):
the file can be used for analysis or to reproduce some kind of scenario. Of course, a native consumer could have been used to solve this problem, but Kafkacat allows you to express this in a shorter way.</p>
<p>Or here is a more complicated example (in the sense of letters a little more, but the solution is simpler than many other alternatives).
The code is taken from the blog of one of the Kafka ecosystem’s leading promoters — Robin Moffatt.
In a nutshell, one Kafkacat instance reads messages from a Kafka topic, then the messages are transformed into the required format and fed to another kafkacat instance, which writes them to another topic. To many, this may seem like aт incomprehensible conundrum, but in fact, we have a ready-made solution that can be run in the docker and it will work. The implementation of such a scenario on these Dotnets and Java of yours will definitely require more letters.
But this is a short digression from the topic. The article is about search. So, in a similar way, you can organize the search for messages: just redirect the output to some kind of grep and that's it.
The opportunities for improvement include the fact that Kafkacat supports Avro out of the box, but does not support Protobuf.
Kafka Connect + ELK
All of the above things work and solve the assigned tasks, but not everyone is comfortable with that. When analyzing incidents, it is necessary to search for messages in different topics for a specific text. This can be a specific identifier or name. Our QA (namely, they are engaged in such investigations in 90% of cases) have gotten used to using the Kafka Tool, and some to console utilities. But this all pales in comparison to the possibilities offered by Kibana, the UI wrapper around the Elasticsearch database. Kibana is also used by our QA to analyze logs. And more than once the question was raised: "Let's log all messages so that we can search in Kibana." But it turned out that there is a way much easier than adding a logger call to each of our services, and its name is Kafka Connect.
Kafka Connect is a solution for integrating Kafka with other systems. In a nutshell, it allows you to export from and import data into Kafka without writing any code. All that is needed is a raised Connect cluster and configurations of our demands in JSON format. The word "cluster" sounds expensive and complicated, but in fact, it is one or more instances that can be set up anywhere. For example, we run them in the same place as regular services in Kubernetes.
Kafka Connect provides a REST API that you can use to manage connectors that pull data from and to Kafka. Connectors are set by configurations, in the case of Elasticsearch this configuration can be like this:
If such config is sent via HTTP PUT to the Connect server, then, under a certain set of circumstances, a connector named ElasticSinkConnector will be created, which will read data from the topic in three threads and write it to Elastic.
Everything looks very simple, but the most interesting thing, of course, is in the details.
Most of the problems are data-related. Usually, as in our case, you need to work with data formats, the developers of which obviously did not think that someday this data would end up in Elasticsearch.
There are transformations to solve nuances with data. These are functions that can be applied to data and mutated, adjusting to the requirements of the recipient. At the same time, it is always possible to use any Kafka client technology for cases when transformations are powerless. What scenarios were we solving with the help of transformations?
In our case, there are 4 transformations. We list them first and then configure them. Transformations are applied in the order they are listed, and this allows you to combine them in an interesting way.
First, we add the ability to search by topic name - we simply supplement our messages with a field with the necessary information.
<p>Elasticsearch works with indexes, which tend to clog and unnerve DevOps. There is a need for support for a manageable indexing scheme. In our case, we settled on an index for a service/team with daily rotation. In addition, the solution is storing data from different topics in one index with the ability to control its age.
In order to be able to search by date in Kibana, you need to specify the field that contains this date. We use the publication date of the post in Kafka. To get it, we first insert a field with the date of the message and then convert it to UTC format. The conversion was added to help Elasticsearch recognize the timestamp in this field, but in our case, this did not always happen, so we added an index template that explicitly said "this field is a date":
As a result, our messages almost instantly become available for analysis, and the time spent on the analysis of incidents decreases.
Here, of course, it is worth noting that this initiative is now at the implementation stage, so it cannot be said that everyone is massively looking for everything they need in Kibana, but we are confidently moving towards this.
In general, Kafka Connect is applicable for more than just such tasks. It can be quite used in cases where integration with other systems is needed. For example, implement a full-text search in your application using two connectors. One will read from the operating base of the updates and write them to Kafka. And the second is to read from Kafka and send it to Elasticsearch. The application makes a search query in Elasticsearch, gets an id, and uses it to find the necessary data in the database.
I really hope that you have learned something new for yourself, otherwise, what was all this for? If something did not work out, or you strongly disagree with something, or there may be more convenient ways to solve such problems we can discuss it.