How do you integrate Spring Boot with Kafka (@KafkaListener, KafkaTemplate)?

9 minadvancedkafkakafkatemplatekafkalistener

Quick Answer

spring-kafka provides KafkaTemplate for producing messages to a topic and @KafkaListener for consuming them, both configured via standard application.yml properties (bootstrap servers, consumer group ID, serializers/deserializers) rather than manual Kafka client boilerplate. A producer calls kafkaTemplate.send(topic, key, value) to publish a message (with the option to handle the resulting ListenableFuture/CompletableFuture for delivery confirmation), while a consumer method annotated @KafkaListener(topics = "...", groupId = "...") is invoked automatically by the framework whenever a new message arrives on that topic for its consumer group.

Detailed Answer

spring-kafka wraps the plain Apache Kafka Java client with Spring-friendly abstractions, removing most of the manual producer/consumer setup boilerplate.

Configuration (application.yml):

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: order-service-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

Producing messages with KafkaTemplate:

@Service
class OrderEventPublisher {
    private final KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate;

    OrderEventPublisher(KafkaTemplate<String, OrderPlacedEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    void publish(OrderPlacedEvent event) {
        kafkaTemplate.send("order-events", event.orderId(), event)
            .whenComplete((result, ex) -> {
                if (ex != null) log.error("Failed to publish order event", ex);
            });
    }
}

Consuming messages with @KafkaListener:

@Component
class OrderEventConsumer {
    @KafkaListener(topics = "order-events", groupId = "inventory-service-group")
    void handleOrderPlaced(OrderPlacedEvent event) {
        inventoryService.reserveStock(event.orderId());
        // by default, the offset is committed automatically after this method returns successfully
    }
}

Key concepts worth understanding:

  • Consumer groups: multiple instances of the same service sharing the same groupId automatically divide up a topic's partitions among themselves — each message is delivered to only one consumer instance within a given group, which is what makes horizontal scaling of consumers work correctly (as opposed to every instance redundantly processing every message).
  • Serialization: producer and consumer must agree on serializer/deserializer pairs (JsonSerializer/JsonDeserializer being a common, simple default for structured messages) — a mismatch here is a common source of "consumer silently fails to deserialize" issues.
  • Delivery/processing guarantees: by default, spring-kafka commits consumer offsets automatically after a listener method returns without throwing — meaning careful thought is needed about idempotency (can this handler safely process the same message twice, since Kafka's at-least-once delivery model means redelivery is possible after certain failure scenarios) and error handling (a DefaultErrorHandler can be configured with retry/backoff and a dead-letter-topic strategy for messages that repeatedly fail processing).

When Kafka fits well: high-throughput event streaming, durable event logs multiple independent consumers can replay/process at their own pace, and scenarios where strict message ordering within a partition (e.g., all events for a given order ID) matters.

Related Resources