debugging-kafka-connect-sinks

Posted by

Debugging Kafka Connect Sinks

When we had to scale into billions of dollars of originations and multiple currencies at the same time here at Funding Circle, we chose Kafka for our new global marketplace lending platform. We still have a legacy stack, though, with a lot of important data. We’ve been using Kafka Connect to help legacy apps access new data and new apps access legacy data. Kafka Connect allows streaming of data between Kafka producers and consumers and databases such as PostgreSQL. A Kafka Connect job that takes data from a PostgreSQL database and publishes it to a topic is a “source connector”, and a job that creates a PostgreSQL database out of messages on a topic is a “sink connector”. Kafka Connect has thoughtful configuration options, is quick to set up and–when it works–requires little to no maintenance. Kafka Connect is maintained by Confluent and updated regularly. This information is current as of version 3.1.1.

Set your sink up for success

  1. Consume the topic you want to direct into your sink. Make sure you know the forms the data take. Knowing which key.converter and value.converter you need to use will save you a lot of time and hassles. Keep in mind that Kafka Connect does not currently support many data types (e.g. UUIDs) as keys in sinks, so a string converter is a safe bet.

  2. Think about your table name. If your-topic has hyphens in the name, your table will too. “table.name.format: your_topic” will override this.

  3. “auto.evolve” is an option for sinks; it will add new columns when new data appear in the schemas and is a good one if you envision your schema changing.

Unfortunately, some of the error messages we see when the sink connector doesn’t work can be unclear. Here are a few problems that we’ve run into and their error messages:

The general error that you’ll get when a sink fails:

Exiting WorkerSinkTask due to unrecoverable exception

If your topic is a JSON topic, some not-documented pitfalls are: Kafka Connect cannot set primary keys with JSON topics (“pk.mode”, etc.). It also can’t do “insert.mode: upsert” for this reason. Kafka Connect cannot whitelist fields (“fields.whitelist”) with JSON topics In general, the JDBC sink connector does not currently support the level of table customization for JSON topics that it does for Avro topics.

Kafka expects Avro but gets data that isn’t Avro

org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

You’ll want to check the following settings in your config to see if one or both are expecting Avro data: value.converter key.converter Consume the topics with keys and take a look at them. Sometimes keys aren’t Avro and require a different converter (in our systems this is a string converter). Sometimes there may be a bad message published to the topic, so while the rest of the messages are ok, the one bad message crashes the job.

Kafka expects JSON in a certain format

JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

Consume the topic with keys and check the format. Kafka is expecting something like the following: {“schema”: {“value”:“type”} “payload”: {“key”:“value”}}. If it has encountered a message without this format the job will crash.

Kafka encounters bad data

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: Unrecognized token 'unexpected-data-here-usually-something-weird': was expecting ('true', 'false' or 'null')

You have a message that contains unexpected-data-here-usually-something-weird somewhere in your pipeline and the sink doesn’t know what to do with it. If you grep for the unexpected data you can find out what the bad message was and use that to figure out which process generated it.

There are a lot more errors that you may encounter but they’re usually pretty self-explanatory. Hope this helps!