How we upgraded Kafka

This is a story about upgrading Kafka from 0.8.2.0 to 0.9.0.1 and what to look out for.

The documentation tells us the upgrade process is straightforward. It almost was for us, but there were a couple of pain points which I’ll talk about below.

alt

Watch out for the log cleaner

Problem

Until Kafka 0.9.0.1, the log cleaner wasn’t enabled by default. This means that when we restarted the first broker upgraded, it started cleaning logs.

The log cleaner only affects topics which use log compaction. We don’t use this in our topics, but Kafka uses it to handle the consumer offsets topic (__consumer_offsets).

In our cluster some consumer offsets partition logs were almost 300GB: it took a really long time for Kafka to clean it (more than an hour in some cases).

This wouldn’t matter if we could still commit/fetch offsets while the log cleaner does its job, but for some reason that I don’t quite understand, a request for a consumer group coordinator pointed us to the broker that is still loading/cleaning the partition and the broker responded with either a GroupLoadInProgress error or a NotCoordinatorForGroup error.

It stayed that way until the broker finished cleaning and loading the partition, ultimately meaning some of our consumer groups were stuck, unable to fetch or commit offsets.

Workaround

Unfortunately for the first broker we had no idea what to do, so we let it do its thing and accepted that the consumer groups affected couldn’t progress.

But we can’t do that for all brokers, because at some point we’ll migrate a broker that handles a partition for consumer groups that we can’t bring down for long since they serve our live services.

At that point we had the cluster in the following state:

  • 1 broker migrated to 0.9.0.1, all logs cleaned and fully operational.
  • all remaining brokers still in 0.8.2.0.

We ended up doing the following:

  • list biggest (by byte size) partitions on the broker to be migrated.
  • for each partition where the broker is the leader, reassign it to a already migrated broker.

To do that we used the tool kafka-reassign-partitions.sh and wrote the JSON input file manually:

reassignment

Unfortunately again, this triggered another problem: the log cleaner crashed on some partitions, complaining about not having enough deduplication buffer capacity:

log cleaner

The error gives us a tip, but with a buffer capacity of 1Gb already, and the buffer being technically limited to 2Gb, there is no way that much messages would fit.

A quick trip on Google suggested that this was a known issue, with no clear workaround or fix on sight. So we waited, not knowing exactly how we would fix this in the future.

A week later, we attempted another fix: since (we thought at the time), the segment size determines the number of messages, why not reduce the segment size to something that can contain all those messages ? This is something we can control easily for the __consumer_offsets.

We modified the configuration segment.bytes on the topic __consumer_offsets, reassigned the partition away and back on the broker (in order to rewrite the logs completely) and restarted the broker, wishing this would work.

It didn't work, but something was strange: now all segments were 1Mb but the error was the same ! The same number of messages in the segment. This was weird.

Of course, in hindsight we could have realized this way earlier: the number of messages is determined by looking at the offset in the filename of the log segment, and calculating the difference with the next log segment.

If you notice in the screenshot above, the first log segment is at offset 0, while the next log segment is at offset 1232493227. Kafka then thinks there are 1232493227 messages in the log segment, which is of course completely false.

Fortunately for us, the fix to this is easy: just delete the first log segment with offset 0 and restart the broker.

Make sure you respect the client protocol.

Problem

This problem wasn’t documented in any way and surprised us a bit. Maybe it shouldn’t have, but I’ll let you decide that.

As said, we use the Offset commit/fetch API from Kafka to handle our offsets. When we commit an offset we send a commit request.

Up until now, we used the v1 format but we forgot to explicitly set the field ConsumerGroupGenerationId to -1 (which is what needs to be used on 0.9 brokers to indicate it’s not a commit that is part of a group membership). Kafka 0.8.2.0 accepted that happily but the new version returned a IllegalGeneration error.

Workaround

The fix was simple: set the ConsumerGroupGenerationId to -1 in every app.

Deploying the fix in every app on the other hand was annoying and it took me a day to do.

Future

This is the first migration we did on our Kafka cluster. It could have gone a lot smoother if we knew exactly what and why each broker did what it did, but since we’re not familiar with the internals of Kafka it took us by surprise and we had to figure things out on the fly, mainly by looking over the source code and rereading the documentation, again.

I think this is the biggest take away for me: understand how your tools work so that you can efficiently debug it if it doesn’t do what you expect. Kafka’s internals are not super hard to understand, it’s just a matter of taking time to read and analyze the source code and documentation.

Of course, maybe we could have detected the problems without affecting our production services if we had a staging environment identical to the production. This is also something we’ll look into in the future.