FES consumes events from the fes-async-in
and the sts-delivery-tracking-events
kafka event streams. The latter has never posed a problem, so this document is geared to talk about the former. Regarding fes-async-in
, our order management systems send and receive kafka messages through it to communicate with our fulfillment management system. Prior to the solution, we in FMS would occasionally receive a kafka message that we couldn’t consume due to a bug, bad data, etc. When this happened, our kafka consumer tried this message again and again, literally forever. This “just try again” policy is great for transient errors, such as database locking. However, it's a problem if this error isn’t transient, as all other messages behind it are now blocked and will never be processed. This is what we call a "stuck message".
Inspired by this Uber blog post, we wrote special "punt" code that cordons a stuck message off to a separate "punt" topic. This reduces a stuck message from an emergency to just a (rare) nuisance, since it’s no longer blocking every unrelated message. With it over there, we can take our time deciding whether we want to fix something to make it possible to consume the stuck message, or if we just want to skip it.
There will be a rollbar repeating non-stop, and it will contain app/consumers/
in its stack trace. Here is an example of such a rollbar. Also, if you click "Occurrences" on the rollbar, and you click on one of the occurrences in the table, you will see metadata relating to the event, such as its topic, parition, offset, and payload.
This rollbar may have triggered an automated message in slack, and likely a page to the engineer on-call. Additionally, there's also a datadog alert that will ping slack if it detects that there are a significant number of unconsumed messages in the fes-async-in
stream. This alert is based on this dashboard.
Note however that we can have small amounts of "lag" in consuming messages due to transient errors, such as database locking. While it would be better that we didn't have this kind of performance problem, it usually has no business impact and usually only happens occassionally and for only short periods of time. (For an example of why lag can be a problem, see the story of suspend unsuspend).
Not so fast. We don't skip messages by default for good reason. If we skip a message, none of the other messages about the same order are going to be processable. At which point, our only hope is that someone watching the anomolous order dashboard will step in and manually save the order, because otherwise we're going to have an unhappy customer who has paid for something that they will never receieve.
On the other hand, there's lots of errors that happen in consuming messages that we rescue and skip. This is fertile ground for improvement, as this is probably not the best solution in many cases.
If the message is stuck due to a legitimate data issue, can we fix the data via rails console?
If the message is stuck due to a legitimate bug in the code, can we merge a fix?
If the message is stuck due to it raising an error that we never intend to handle, can we merge a fix with a rescue for that error?
If we have a stuck message that we're sure we want to skip, there's a way to do it. First, find the rollbar related to the stuck message, and take note of what partition it's stuck on. And also save the URL for later, because we'll probably need the fulfillmentReferenceId
(the order id) for an impact report.
The following steps are for how to skip one message on the staging environment.
Our tool of choice is the CLI kafka-consumer-groups
command. This tool is available in the docker image confluentinc/cp-kafka:latest
. We can temporarily deploy this to Kubernetes via a one-off Kubernetes pod. (If you don't have kubectl
setup, follow this guide.)
- Make yourself an alias for
kubectl
Just to make this documentation less verbose, let's use an alias that encompasses kubectl
and the argument for where it should find its config file. In this example, I'm calling it ks
, short for "kubernetes staging."
alias ks="kubectl --kubeconfig=$HOME/.kube/config-staging-blue-wms"
Note: This use of the --kubeconfig
flag assumes that you've got a bunch of config files over in the place that goba
installs them ($HOME/.kube/
) when you do ba login
. These files will look something like this:
- Launch the pod
ks run kafka-client -n wms --image=confluentinc/cp-kafka:latest -it --restart=Never --rm -- bash
Once inside, you can make use of the amazing kafka-consumer-groups
CLI.
- List the topics to which the consumer group is subscribed
You need to know what consumer "group" you're looking for. For FES, it's
fulfillment_management_service_fes-async-in
. If that's not what you need, list out all the groups:
kafka-consumer-groups --bootstrap-server kafka-new.staging.b--a.cloud:9092 --list
We've got two groups that we care a lot about, fulfillment_management_service_fes-async-in
and fulfillment_management_service_fes-async-in-punts
. This former is all messages from blueaproncom and Agora, and the latter is what topic we "punt" messages to from the former after giving up on consuming them. For the rest of this guide, we'll assume that we're targeting the latter.
To start, we need to scale down the appropriate kafka consumer. (The fulfillment_management_service_fes-async-in
group is served by the kafka-client
service, the fulfillment_management_service_fes-async-in-punts
by the kafka-client-punts
service.)
- Scale down the consumer
ba ps scale --instances=0 --environment=staging --repository=fulfillment-engine --service=kafka-client-punts --businessUnit=wms
Once you know the name of the group that you're interested in, use the --group
flag to specify it, and --describe
it:
kafka-consumer-groups --bootstrap-server kafka-new.staging.b--a.cloud:9092 --group fulfillment_management_service_fes-async-in-punts --describe
If your consumer isn't scaled down, this output will be hard to reach due to it having very fat columns and thus line-wrapping. So if this is the case, try piping it to awk
and column
to clean it up:
kafka-consumer-groups --bootstrap-server kafka-new.staging.b--a.cloud:9092 --group fulfillment_management_service_fes-async-in-punts --describe | awk '{ if (NR == 2) { print $2, $3, $4, $5, $6 } else if ($2 ~ /fes-async-in/) { print $2, $3, $4, $5, $6 | "sort -k1,1 -k2,2n" } }' | column -t
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
fes-async-in-punts 0 0 0 0
fes-async-in-punts 1 262 269 7
fes-async-in-punts 2 0 0 0
fes-async-in-punts 3 150 164 14
fes-async-in-punts 4 403 431 28
fes-async-in-punts 5 766 766 0
fes-async-in-punts 6 148 148 0
fes-async-in-punts 7 138 141 3
fes-async-in-punts 8 0 0 0
fes-async-in-punts 9 638 672 34
fes-async-in-punts 10 442 442 0
fes-async-in-punts 11 468 468 0
"CURRENT-OFFSET" is the offset where the consumer is currently at in each of the partitions. "LOG-END-OFFSET" is where it will be when it's caught up. "LAG" is the difference between the two, and if it's greater than a few dozen, we probably have a stuck message.
- Skip just one message
kafka-consumer-groups --bootstrap-server kafka-new.staging.b--a.cloud:9092 --group fulfillment_management_service_fes-async-in-punts --reset-offsets --shift-by 1 --topic fes-async-in-punts:YOUR_PARTITION_HERE --execute
Note the YOUR_PARITION_HERE
part of the argument to --topic
. Replace that with the partition number, so that it looks something like this: --topic fes-async-in-punts:3
. The choice of parition should be informed by the metadata reported in the rollbar.
In this example, if you ran the above command for partition 4
, this will log something along the lines of:
fulfillment_management_service_fes-async-in-punts fes-async-in-punts 4 404
Note that the NEW-OFFSET
is exactly one more that what it just reported when you did --describe
.
Note that if you don't specify a partition, the command applies the shift to all partitions! On production, we never want to skip more than one message at a time, so we definitely don't want this. However, on staging, we often skip stuck messages en masse. One more thing that can help on staging: replace --shift-by 1
with --to-latest
. Combining this with the --topic fes-async-in-punts
argument without the partition suffix, you can very quickly clean up a stuck staging consumer.
- Check if you're good
In a shell outside of the Kubernetes pod, scale back up the consumer. This example scales the consumer only to one because it's for the staging environment. As of the time of this writing, production is scaled to five. After a message has been stuck for a while, there may be lots of messages to process in order to catch up. Assuming that we're not database-bound and adding more consumers will help speed up consumption, feel free to scale all the way up to the number of partitions (12 by default). Scaling any further will have no effect, kafka will never let more than one consumer consume a partition, as this risks race conditions.
ba ps scale --instances=1 --environment=staging --repository=fulfillment-engine --service=kafka-client-punts --businessUnit=wms
This will turn the consumer back on. Now check rollbar. You should either see no more occurrences of the error, or at worst you should see that it's now stuck on a different partion and offset. If so, scale back down the consumer, and get back in that Kubernetes pod, and start handling this new stuck message with the same series of kafka-consumer-groups
commands.
- Delete your Kubernetes pod If you've succeeded in getting the consumer un-stuck, don't forget to clean up your Kubernetes pod.
ks delete pod kafka-client
Also, don't forget that you've probably got some manual work to do elsewhere to compensate for never processing the stuck message.