Spring Boot Kafka example


The goal of this tutorial is to create a working project with Spring Boot and Kafka to start the development without spending too much time playing with the Kafka configuration. This is useful if you need to create a Proof of Concept or learn / test in your environment

Our focus here is Spring Boot.

You can find the code source of this example on GitHub: https://github.com/marco76/demo-spring-kafka

Kafka quick-start with Docker

To quickly start with Kafka we use the docker compose that instantiate the broker, the registry and a UI to monitor the Kafka cluster.

Our yaml is a ‘light’ version of the one provided by Kafka UI available here: https://github.com/provectus/kafka-ui/blob/master/documentation/compose/kafka-ui.yaml

In our yaml we have only one Cluster and we removed Kafka Connect.

version: '2' 
services: 
 
  kafka-ui: 
    container_name: kafka-ui 
    image: provectuslabs/kafka-ui:latest 
    ports: 
      - 8082:8080 
    depends_on: 
      - zookeeper0 
      - kafka0 
      - schemaregistry0 
    environment: 
      KAFKA_CLUSTERS_0_NAME: local 
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092 
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181 
      KAFKA_CLUSTERS_0_JMXPORT: 9997 
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085 
  zookeeper0: 
    image: confluentinc/cp-zookeeper:5.2.4 
    environment: 
      ZOOKEEPER_CLIENT_PORT: 2181 
      ZOOKEEPER_TICK_TIME: 2000 
    ports: 
      - 2181:2181 
 
  kafka0: 
    image: confluentinc/cp-kafka:5.3.1 
    depends_on: 
      - zookeeper0 
    ports: 
      - 9092:9092 
      - 9997:9997 
    environment: 
      KAFKA_BROKER_ID: 1 
      KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2181 
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092 
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT 
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT 
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 
      JMX_PORT: 9997 
      KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997 
 
  schemaregistry0: 
    image: confluentinc/cp-schema-registry:5.5.0 
    ports: 
      - 8085:8085 
    depends_on: 
      - zookeeper0 
      - kafka0 
    environment: 
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092 
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2181 
      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT 
      SCHEMA_REGISTRY_HOST_NAME: schemaregistry0 
      SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085 
 
      SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" 
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO 
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas 

You can start the services with: docker-compose up

If you are running the Docker Desktop application you can have an overview of the containers:

Docker Kafka containers

When everything started you can access the UI: http://localhost:8082/ In the original docker-compose the UI uses the port 8080 but we changed it because it's conflicting with our Spring Boot instance.

Kafka UI

The 3 existing Topics are internal and are not relevant for our tutorial.

Kafka UI All Topics

Spring Boot

Our Spring Boot application simply:

  • create a Topic if not existing already
  • create a REST controller to receive messages from HTTP POST
  • send a test message to the Topic
  • receive the messages from the Topic and write it in the output stream

At the start send and initial message and a @Controller allows to send messages using POST.

In our pom.xml we have spring-kafka as dependency. More information are in the official Spring for Apache Kafka documentation

<dependency> 
  <groupId>org.springframework.kafka</groupId> 
  <artifactId>spring-kafka</artifactId> 
</dependency> 
<dependency> 
  <groupId>org.springframework.kafka</groupId> 
  <artifactId>spring-kafka-test</artifactId> 
  <scope>test</scope> 
</dependency> 
<dependency> 
  <groupId>org.springframework.boot</groupId> 
  <artifactId>spring-boot-starter-web</artifactId> 
</dependency> 
<dependency> 
  <groupId>org.springframework.boot</groupId> 
  <artifactId>spring-boot-starter-test</artifactId> 
  <scope>test</scope> 
</dependency> 

Spring Boot Application

Our Application is super simple, we integrated the configuration in the application class for simplify the post, this is not recommended for your application:

@SpringBootApplication 
public class DemoKafkaApplication { 
     
    private final String TOPIC_NAME = "kafka-spring-demo"; 
 
    public static void main(String[] args) { 
        SpringApplication.run(DemoKafkaApplication.class, args); 
    } 
 
    /** 
     * With NewTopic we create a topic in kafka if it doesn't exist yet 
     */ 
    @Bean 
    public NewTopic topic() { 
        return TopicBuilder.name(TOPIC_NAME) 
                .partitions(5) 
                .replicas(1) 
                .build(); 
    } 
 
    @KafkaListener(id = "kafka-spring-listener", topics = TOPIC_NAME ) 
    public void listen(String message) { 
        System.out.println("message received:" + message); 
    } 
 
    @Bean 
    public ApplicationRunner runner(KafkaTemplate<String, String> template) { 
        return args -> { 
            template.send(TOPIC_NAME, "test message at bootstrap"); 
        }; 
    } 
} 

private final String TOPIC_NAME = "kafka-spring-demo"; we define the name of the Topic used in Kafka, in a production project this should be in the external configuration.

The @Bean topic initialize a new topic in Kafka if it doesn't exist yet. We ca define some properties, like partitions and replicas. @KafkaListener is the consumer and it waits for messages added to the topic. When a message is received it publish it in the output stream. In a real project this consumer will call the services that have to process the data received.

@Bean ApplicationRunner send a test message when the application is started.

As you can notice the KafkaTemplate is automatically wired by Spring without any explicit declaration.

Spring Boot controller

We can dinamically send messages to Kafka using REST.

In our @Controller we declare KafkaTemplate and we use it to send a message to the kafka topic specifying the topic name (‘kafka-spring-demo’) and the message (text).

@Autowired 
private KafkaTemplate<Object, Object> template; 
 
/** 
* We create a rest controller that receives a text and forwards it to kafka 
*/ 
@PostMapping(path = "/send/message/{text}") 
public void sendFoo(@PathVariable String text) { 
  this.template.send("kafka-spring-demo", text); 
} 

(No) Configuration

In our application we didn’t do any special configuration. Spring Boot search a Kafka instance using the ‘standard’ ports:

INFO 11756 --- [-listener-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-kafka-spring-listener-1, groupId=kafka-spring-listener] Cluster ID: f0GOdnHLRquHKai9Gf4IfA 
INFO 11756 --- [-listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-kafka-spring-listener-1, groupId=kafka-spring-listener] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null) 
INFO 11756 --- [-listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-kafka-spring-listener-1, groupId=kafka-spring-listener] (Re-)joining group 
INFO 11756 --- [-listener-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-kafka-spring-listener-1, groupId=kafka-spring-listener] Resetting offset for partition kafka-spring-demo-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}. 
INFO 11756 --- [-listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : kafka-spring-listener: partitions assigned: [kafka-spring-demo-4, kafka-spring-demo-3, kafka-spring-demo-2, kafka-spring-demo-1, kafka-spring-demo-0] 

You can override the default behavior in application.properties :

spring.kafka.bootstrap-servers=localhost:9093 

Running the application

After you start the application you can see that the new Topic has been created

Kafka UI Topics

and the test message

@Bean 
public ApplicationRunner runner(KafkaTemplate<String, String> template) { 
  return args -> { 
    template.send(TOPIC_NAME, "test message at bootstrap"); 
  }; 
} 

has been received by Kafka

Kafka messages

… published … and received by the Spring listener

@KafkaListener(id = "kafka-spring-listener", topics = TOPIC_NAME ) 
public void listen(String message) { 
  System.out.println("message received:" + message); 
} 

The Spring Boot listener is visible in the UI in the Consumer category, as subscriber it receives the notification of a new messages from Kafka.

Kafka Spring listener

The listener receives the notification from Kafka and read the message, in the terminal you should see the following result:

INFO 75599 --- [-listener-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-kafka-spring-listener-1, groupId=kafka-spring-listener] Resetting offset for partition kafka-spring-demo-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=absent}}. 
INFO 75599 --- [-listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : kafka-spring-listener: partitions assigned: [kafka-spring-demo-4, kafka-spring-demo-3, kafka-spring-demo-2, kafka-spring-demo-1, kafka-spring-demo-0] 
message received:"test message at bootstrap" 

If now you try to send a POST request to the @Controller curl -X POST http://localhost:8080/send/message/hello**

Spring Boot receives the request and send the message to the Topic queue in Kafka.

@PostMapping(path = "/send/message/{text}") 
public void sendFoo(@PathVariable String text) { 
  this.template.send(TOPIC_NAME, text); 
} 

the terminal should show the initialization of the Spring DispatcherServlet and show the message

INFO 75599 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet' 
INFO 75599 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet' 
INFO 75599 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 17 ms 
message received:"hello" 

In the Kafka UI we can see that the message has been received
Kafka Spring messages received

… coming soon ...

Kafka testing with Spring Boot


You could be interested in

Spring Boot: REST controller Test example

How to test the @RestController with Spring Boot
2017-10-01

How to deploy a Java and Angular webapp in one JAR/WAR

How to configure a fullstack Angular and Spring Boot application
2018-04-26

WebApp built by Marco using SpringBoot, Java 17, Mustache, Markdown and in Azure