Testing Kafka Streams topologies with Kafka Interceptors

Posted by Nacho Munoz

We rely heavily on Kafka and Kafka Streams at Funding Circle to build event-driven microservices, so, testing those Kafka Streams topologies is key to validate the correctness of our platform.

The aim of this blog post is to compare two integration testing approaches for Kafka Streams topologies: one is based on Kafka TestUtils, which is commonly used in the Apache Kafka project and some of the Kafka Streams examples that can be found in Confluent’s repository; and another one is based on Kafka Interceptors which seem to have some advantages when compared to the other approach.

Kafka Interceptors were introduced in Kafka 0.10.0 as a way to make instrumentation easier for applications relying on the Kafka Client API. It is possible to chain several interceptors and utilise them either on the Consumer or Producer side which provides flexibility and a great range of scenarios where we can apply them: logging, message encrypting, distributed tracing, filtering fields with sensitive information, custom metrics and, as we are going to see soon, testing.

In order to illustrate those different approaches we are going to use a toy topology as an example.


As showed in the image we would like to enrich our investors’ information with their portfolios and split the output in different topics depending on the investor type. This topology will look like the code below:

(def topology
  (let [builder (KStreamBuilder.)
        ;; inputs
        investor (.stream builder (into-array String ["investors"]))
        portfolio (.globalTable builder "portfolios")
        ;; processing
        [institutions retail] (.. investor
                                  (join portfolio
                                         (fn [k v] (:portfolio-id v))
                                         (fn [v v']
                                           (merge v v'))))
                                  (branch (into-array
                                           [(predicate institutional?)
                                            (predicate (complement institutional?))])))]
    ;; outputs
    (.to institutions "enriched-institutional-investors")
    (.to retail "enriched-retail-investors")

How do we check that the logic of this topology is correct? Let’s see some integration testing approaches that we can use to test our topology logic. We will assume that we have Kafka running for the integration tests, maybe by using a docker-compose file. Also, for this kind of tests we will consider the topology as a blackbox, so we will only care about the outputs generated by a given input.

Using a dedicated consumer

In this approach we need to use Kafka’s client API to create a consumer for our tests so we can check if the records in our output topics match our expectations. Putting it in the context of the Four-Phase Test testing pattern, we can describe this approach as:

  • Setup: This step implies to populate the input topics (investors and portfolios) with some sample records. Ideally, we will choose inputs that follow different execution paths in our topology.

  • Exercise: Running the topology and wait for it to process the inputs. How do we know when the topology has finished? We just don’t know, so either we put the thread to sleep providing enough time for the topology to finish or we poll the output topics until a condition is met (e.g. there are 3 messages) or a timeout is reached. Kafka’s test utilities include a static method TestUtils.waitForCondition which does exactly that. The thread sleep approach is simpler, however, it can slow down your tests or make them flaky if the chosen sleep time is too tight.

  • Verify: Checking that the records in the output topics match with our expectations. As mentioned before, we need a dedicated consumer for our tests to retrieve the records from the topic.

  • Teardown: Resetting the system state to its previous state by maybe deleting the output topics and cleaning Kafka Stream’s state folder. As an alternative, you can run Kafka inside of your process using for instance EmbeddedSingleNodeKafkaCluster. In that case, you don’t need to worry about deleting the topics but booting and shutting down this in memory instance of Kafka every time we run a test, could make the test suite slower. Besides, it makes auditability harder because the topics are only reachable while the tests are running, so you cannot use tools like kafkacat or kafka-console-consumer against your embedded cluster as you would do with a separate docker cluster.

We can see these steps applied in the code excerpt below:

(deftest topology-with-consumer-test
  (testing "when institutional investor"
    (let [investor      {:id 1000 :name "lol" :institutional? true :portfolio-id 1}
          portfolio     {:id 1 :amount 100}
          output-topics [:enriched-retail-investors
          consumer      (create-consumer-for output-topics)]

      (send-to :portfolios 1 (str portfolio))
      (send-to :investors 1 (str investor))

      (with-topology topology config
        (TestUtils/waitForCondition (expected-messages consumer 1)
                                    "Timing out"))

      (is (= 1 (count (consumed-messages :enriched-institutional-investors))))
      (is (= (merge investor portfolio) (-> (consumed-messages :enriched-institutional-investors)
      (is (nil? (consumed-messages :enriched-retail-investors)))

      (.close consumer)
      (remove-topics output-topics))))

When following this approach you need to take into account Consumer configuration settings like “GROUP ID” and “AUTO OFFSET RESET CONFIG” to avoid undesirable rebalances and to force the consumer to read from the beginning instead of using any previous committed offset.

In a nutshell, there is nothing wrong with this approach aside from the fact that you need to write some utility code to retrieve the outputs from the topics and clean that state afterwards.

Using Kafka Interceptors

In the previous section we leaned on a dedicated consumer to verify our expectations, lets see how we can leverage interceptors to get rid of that additional consumer. Given the pluggable nature of Kafka Interceptors, we can easily add or remove them from our applications with just a configuration change.

Our goal is to avoid checking the output topics in Kafka by maintaining a map in memory that associates each output topic with produced records. A possible implementation can be found below:

(deftype MyTestingInterceptor []
  (close [_])
  (configure [_ configs])
  (onAcknowledgement [_ metadata exception])
  (onSend [_ record]
    (let [topic (keyword (.topic record))
          key   (deserialize (Serdes/Long) (.key record))
          value (deserialize (Serdes/String) (.value record))]
      (swap! output-results update-in [topic key] conj value)

The implementation is using an atom to accumulate the state in a map using the topic name as the key and a vector with the records written to that topic. Those records are deserialised before appending them to the map to facilitate the assertions on the test.

Revisiting our description based on the Four Phase Test pattern, we would find the following steps:

  • Setup: Same as before but with an additional change in the Kafka Streams configuration to include our interceptor.

  • Exercise: Again running the topology and wait for the outputs but this time we don’t have to poll any topic because of the visibility gained with the interceptor. For instance, we could use a CountDownLatch initialised with the expected amount of messages and decrease the latch when the atom that contains the state changes. Still, we will need to use a timeout to wait for a bounded amount of time in case something goes wrong.

  • Verify: We run our assertions against our in-memory map that contains the results of the execution. One nice thing about it is that we can assert the lack of messages on a given topic without having to expire a timeout.

  • Teardown: In this case we do not need to clean the output topics, we just need to reset our atom to clean the state. However, it is still necessary to clean Kafka Stream’s state folder.

Translating this into code we will have:

(deftest topology-with-interceptors-test
  (testing "when institutional investor"
    ;; setup
    (let [investor  {:id 1000 :name "lol" :institutional? true :portfolio-id 1}
          portfolio {:id 1 :amount 100}
          config    (assoc kafka-stream-settings
          latch     (expected-messages 1)
          _         (add-watch output-results :test (fn[_ _ o n]
                                                      (.countDown latch)))]
      (send-to :portfolios 1 (str portfolio))
      (send-to :investors 1 (str investor))

      ;; exercise
      (with-topology topology config
        (.await latch 30 TimeUnit/SECONDS))

      (is (= 1 (count (:enriched-institutional-investors @output-results))))
      (is (= (merge investor portfolio) (some-> @output-results
      (is (nil? (:enriched-retail-investors @output-results)))

      (remove-watch output-results :test)
      (reset! output-results {}))))

In my opinion, avoiding to consume from the output topics to make our assertions has the following advantages:

  • More visibility over when to transition to the Verify phase
  • Easier state management (e.g. Kafka topics vs Clojure map)
  • Easier to debug failing tests in topologies with intermediate topics


This blog post has presented two different approaches to test our Kafka Streams topologies. The first one is more general and it is widely utilised in Kafka Stream’s codebase and code examples. The second one shows how we can use Kafka interceptors for testing and doing some optimisation over the general approach.

To sum up, being aware of Kafka Interceptors can make our life easier when testing Kafka Streams applications.

Thanks to Sasha Gerrand, Niel Drummond and Shkodran Gerguri for their help reviewing this post