author-avatar
Dominik Martyniak
Kafka 8 minut

Kafka w Spring Boot: Dynamiczne Tworzenie Listenerów

W dzisiejszym świecie technologii, elastyczność i skalowalność są kluczowe dla wydajnych systemów przetwarzania danych. W tym artykule przedstawie, jak stworzyć dynamiczny listener Kafka w Spring Boot, który może obsługiwać zmienną liczbę partycji. Dzięki temu rozwiązaniu będziesz mógł łatwo skalować swoją aplikację w zależności od potrzeb.

 

Apache Kafka jest jedną z najpopularniejszych platform do przetwarzania strumieniowego, która umożliwia obsługę dużych ilości danych w czasie rzeczywistym. Spring Boot, z jego prostotą i bogatym ekosystemem, ułatwia integrację z Kafką. W tym artykule pokaże, jak stworzyć dynamiczne listenery Kafka, które automatycznie dostosowują się do liczby partycji.

 

Krok po kroku: Tworzenie Dynamicznego Listenera Kafka

 

Krok 1: Konfiguracja Projektu

Na początek, musisz mieć zainstalowane Spring Boot oraz Kafka. Poniżej przedstawiamy niezbędne zależności w pliku pom.xml:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <version>4.1.1</version>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    <version>4.1.1</version>
</dependency>

 

lub gradle:

 

dependencies {
	implementation 'org.springframework.cloud:spring-cloud-stream:4.1.1'
	implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka:4.1.1'
}

 

Krok 2: Implementacja Dynamicznego Listenera

Poniżej znajduje się kompletny kod klasy DynamicKafkaListeners, która dynamicznie tworzy listenery Kafka:

package pl.devset.scse;


import jakarta.annotation.PostConstruct;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.apache.kafka.common.header.Header;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

@Configuration
public class DynamicKafkaListeners {

    @Value("${spring.kafka.partition:1}")
    private int partitionCount;

    @Value("${spring.kafka.consumer.topic-name}")
    private String topicName;

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
    private final EventServices services;

    public DynamicKafkaListeners(ConcurrentKafkaListenerContainerFactory<String, String> factory, EventServices services) {
        this.factory = factory;
        this.services = services;
    }

    private final List<ConcurrentMessageListenerContainer<String, String>> containers = new ArrayList<>();

    @PostConstruct
    public void createListeners() {
        for (int i = 0; i < partitionCount; i++) {
            containers.add(createContainer(i));
        }
    }

    private ConcurrentMessageListenerContainer<String, String> createContainer(int partition) {
        ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset(topicName, partition));
        containerProps.setGroupId("devset");

        containerProps.setMessageListener(new MessageListener<String, String>() {
            @Override
            public void onMessage(ConsumerRecord<String, String> message) {
                var userId= extractHeaderValue(message, "userId");

                services.processEvent(userId, message.value());
            }
        });

        ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(factory.getConsumerFactory(), containerProps);
        container.setConcurrency(1);
        container.start();
        return container;
    }

    private String extractHeaderValue(ConsumerRecord<String, String> record, String headerKey) {
        for (Header header : record.headers().headers(headerKey)) {
            return new String(header.value(), StandardCharsets.UTF_8);
        }
        return null;
    }
}

 

Szczegóły implementacji


Pola konfiguracyjne:

  • partitionCount: Liczba partycji w temacie Kafka, określona w pliku konfiguracyjnym aplikacji (application.properties).
  • topicName: Nazwa tematu Kafka, również określona w pliku konfiguracyjnym.

List kontenerów:

  • containers: Lista przechowująca dynamicznie stworzone kontenery Kafka.

Metoda createListeners:

  • Adnotacja @PostConstruct zapewnia, że metoda ta zostanie wywołana po inicjalizacji beana Spring.
  • Tworzy listenery dla każdej partycji, zgodnie ze zdeklarowaną ilościa w partitionCount

Metoda createContainer w 4 krokach: 

1. Tworzenie właściwości kontenera (ContainerProperties):

  • ContainerProperties to klasa konfigurująca właściwości kontenera Kafka.
  • new ContainerProperties(new TopicPartitionOffset(topicName, partition)): Określa, dla której partycji (przekazywanej jako argument partition) danego tematu (topicName) ma być stworzony kontener.
  • containerProps.setGroupId("devset"): Ustawia identyfikator grupy konsumentów na "devset". Grupa konsumentów to logiczna grupa konsumentów, które wspólnie konsumują komunikaty z partycji tematu.

2. Definiowanie listenera wiadomości:

  • containerProps.setMessageListener(new MessageListener<String, String>() {...}): Ustawia listener wiadomości, który będzie reagował na komunikaty z Kafka.
  • Listener (MessageListener) implementuje metodę onMessage, która zostanie wywołana dla każdej otrzymanej wiadomości.
  • var userId= extractHeaderValue(message, "userId"): Wyodrębnia wartość z nagłówka wiadomości o kluczu "userId" (przydatne np. do identyfikacji użytkownika).
  • services.processEvent(userId, message.value()): Przekazuje wyodrębnioną wartość nagłówka oraz treść wiadomości do serwisu EventServices w celu przetworzenia.

3.Tworzenie i konfigurowanie kontenera listenera:

  • ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(factory.getConsumerFactory(), containerProps): Tworzy nowy kontener listenera, korzystając z fabryki konsumentów (factory.getConsumerFactory()) oraz właściwości kontenera (containerProps).
  • container.setConcurrency(1): Ustawia stopień równoległości przetwarzania na 1, co oznacza, że dla tej partycji będzie działał jeden wątek konsumenta.


4. Uruchomienie kontenera:

  • container.start(): Uruchamia kontener, aby zaczął nasłuchiwać komunikatów z przypisanej partycji tematu.
  • return container: Zwraca utworzony i uruchomiony kontener

Listener

  • Implementacja MessageListener odbiera wiadomości z Kafka i przetwarza je przy pomocy serwisu EventServices.
  • Metoda onMessage wywołuje processEvent z EventServices, przekazując wyodrębnioną wartość z nagłówka i treść wiadomości.

Metoda extractHeaderValue:

  • Wyodrębnia wartość z nagłówka wiadomości Kafka na podstawie klucza nagłówka.

 

Konfiguracja application.yml

Poniżej znajduje się przykładowa konfiguracja pliku application.yml, zawierająca właściwości dla dynamicznych listenerów Kafka:

spring:
  kafka:
    partition: 4
    bootstrap-servers: localhost:29092
    consumer:
      topic-name: incoming-events
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 

spring.kafka.partition:

Określa liczbę partycji tematu Kafka, które będą obsługiwane przez dynamiczne listenery. W tym przykładzie ustawiono 4 partycje.


spring.kafka.bootstrap-servers:

Adres(y) serwera(ów) Kafka. W tym przypadku jest to localhost:29092. Może to być pojedynczy adres lub lista adresów, które umożliwiają połączenie z klastrem Kafka.


spring.kafka.consumer.topic-name:

Nazwa tematu, z którego konsument będzie odbierał wiadomości. W tym przykładzie jest to incoming-events.


spring.kafka.consumer.auto-offset-reset:

Określa, co zrobić, gdy nie ma początkowego przesunięcia lub przesunięcie jest nieprawidłowe (np. gdy partycja nie istnieje). earliest oznacza, że konsument zacznie czytać wiadomości od najwcześniejszego dostępnego przesunięcia.


spring.kafka.consumer.key-deserializer:

Klasa deserializatora dla kluczy wiadomości. W tym przypadku używana jest klasa org.apache.kafka.common.serialization.StringDeserializer, która deserializuje klucze jako ciągi znaków (String).


spring.kafka.consumer.value-deserializer:

Klasa deserializatora dla wartości wiadomości. Podobnie jak w przypadku kluczy, używana jest klasa org.apache.kafka.common.serialization.StringDeserializer, która deserializuje wartości jako ciągi znaków (String).

 

Podsumowanie


Implementacja dynamicznego listenera Kafka w Spring Boot pozwala na elastyczne skalowanie aplikacji, dostosowując ją do liczby partycji tematu. Powyższy kod jest prosty w implementacji i zapewnia efektywne przetwarzanie wiadomości w systemach opartych na Kafce. Dzięki takiemu podejściu, Twoja aplikacja będzie mogła łatwo obsługiwać rosnące obciążenia, co jest kluczowe w nowoczesnych rozwiązaniach przetwarzania strumieniowego.

Link do repo

2
[spring boot, kafka]

Więcej od Dominik Martyniak

Więcej artykułów