This is the fourth part of Testing with Kafka for JUnit. See the table underneath for links to the other parts.

Part 1: Writing component tests for Kafka producers
Part 2: Writing component tests for Kafka consumers
Part 3: Writing system tests for a Kafka-enabled microservice
Part 4: Using Kafka for JUnit with Spring Kafka

The last articles gave a couple of examples on how to write Kafka-enabled integration tests at various levels of abstraction using Kafka for JUnit. For component-tests, we kept the scenarios quite simple and built a minimal producer and consumer on top of the official kafka-clients library for Java. This is perfectly fine and my personal recommendation is that you stick to this approach if you have any requirements that aren't particulary standard. Oftentimes, another abstraction layer on top of kafka-clients that integrates well with your chosen application framework will suffice though. We will take a look at the Spring ecosystem for that matter. Hence, the question is: What do I need to do if I want write integration tests with Kafka for JUnit in the context of a Spring-based application that leverages Spring Kafka to integrate messaging capabilities?

Spring Kafka comes with its own set of testing tools (cf. spring-kafka-test) that you can use to write integration tests. While I do appreciate the effort that went into the design of the spring-kafka-test library, I'd rather use Kafka for JUnit, as it enables me to write integration tests that - at least I - consider more readable and concise.

Suppose we have written a simple producer abstraction like the one underneath.

@Component
public class ResultAwareMessageProducer {

  private static final Logger log = LoggerFactory.getLogger(ResultAwareMessageProducer.class);

  private final String topicName;

  private final KafkaTemplate<String, String> kafkaTemplate;

  @Autowired
  public ResultAwareMessageProducer(
      @Value("${spring-kafka-introduction.topic}") String topicName,
      KafkaTemplate<String, String> kafkaTemplate) {
    this.topicName = topicName;
    this.kafkaTemplate = kafkaTemplate;
  }

  public void sendMessage(String message) {
    kafkaTemplate
      .send(topicName, message)
      .addCallback(this::onSuccess, this::onFailure);
  }

  private void onSuccess(SendResult<String, String> result) {
    log.info("Message has been written to partition {} of topic {} with ingestion timestamp {}.",
      result.getRecordMetadata().partition(),
      result.getRecordMetadata().topic(),
      result.getRecordMetadata().timestamp());
  }

  private void onFailure(Throwable t) {
    log.warn("Message has not been written to topic {}.", topicName, t);
  }
}

Testing this component with Kafka for JUnit is easy. There are a couple of things that you should keep in mind though, and I'll walk you through this.

First of all, we need to annotate our test class, call it ResultAwareMessageProducerTest, properly with @SpringBootTest so that we do have access to the application context and are able to wire our beans. In my example I'll also add a @ActiveProfile("test") since I do have a couple of configuration properties set that should be used when building the application context. We will use the embedded Kafka broker from Kafka for JUnit and thus, we have to initiate all Kafka-enabled components with the proper broker URLs. This may vary in-between integration tests, so I'd advice you to use a @DirtiesContext in this case to ensure that the components that you want to use are configured against the correct environment. To be frank, this is a bit annoying and one of the reasons why I try to avoid tests that have to use the Spring application context at all costs. Sometimes this is the best option, though. By the way, this has nothing to do with Kafka for JUnit in particular. You'd have to do this as well if you were using spring-kafka-test. In order to set the correct broker URL, we'll simply set the system property spring.kafka.bootstrap-servers after the embedded Kafka setup has been initialized and started. So, what have we got so far?

@SpringBootTest
@ActiveProfiles("test")
@DirtiesContext
public class ResultAwareMessageProducerTest {

  private static EmbeddedKafkaCluster kafka = provisionWith(defaultClusterConfig());

  @Autowired
  private ResultAwareMessageProducer producer;

  @BeforeAll
  static void prepareEnvironment() {
    kafka.start();
    System.setProperty("spring.kafka.bootstrap-servers", kafka.getBrokerList());
  }

  @AfterAll
  static void tearDown() {
    if (kafka != null) kafka.stop();
  }
}

With this setup at hand, we are now able to leverage the full power of Kafka for JUnit to exercise the subject-under-test. For instance, a first testcase would be to publish a message using the ResultAwareMessageProducer and checking if it has been written correctly to the topic.

@Test
void producerShouldPublishMessageToKafkaTopic() throws InterruptedException {

  producer.sendMessage("message");

  var records = kafka.observe(ObserveKeyValues.on("test", 1));

  assertThat(records.get(0).getValue(), is("message"));
}

And that's all you have to do to use Kafka for JUnit in combination with Spring Kafka. The code for this example is available at GitHub.

If you'd like to learn more about Kafka for JUnit, be sure to check out the other articles in this series and consult the comprehensive user's guide.

Hi there! I'm Markus!

I'm an independent freelance IT consultant, a well-known expert for Apache Kafka and Apache Solr, software architect (iSAQB certified) and trainer.

How can I support you?

GET IN TOUCH