W dzisiejszym świecie technologii, elastyczność i skalowalność są kluczowe dla wydajnych systemów przetwarzania danych. W tym artykule przedstawie, jak stworzyć dynamiczny listener Kafka w Spring Boot, który może obsługiwać zmienną liczbę partycji. Dzięki temu rozwiązaniu będziesz mógł łatwo skalować swoją aplikację w zależności od potrzeb.
Apache Kafka jest jedną z najpopularniejszych platform do przetwarzania strumieniowego, która umożliwia obsługę dużych ilości danych w czasie rzeczywistym. Spring Boot, z jego prostotą i bogatym ekosystemem, ułatwia integrację z Kafką. W tym artykule pokaże, jak stworzyć dynamiczne listenery Kafka, które automatycznie dostosowują się do liczby partycji.
Krok po kroku: Tworzenie Dynamicznego Listenera Kafka
Krok 1: Konfiguracja Projektu
Na początek, musisz mieć zainstalowane Spring Boot oraz Kafka. Poniżej przedstawiamy niezbędne zależności w pliku pom.xml:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>4.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>4.1.1</version>
</dependency>
lub gradle:
dependencies {
implementation 'org.springframework.cloud:spring-cloud-stream:4.1.1'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka:4.1.1'
}
Krok 2: Implementacja Dynamicznego Listenera
Poniżej znajduje się kompletny kod klasy DynamicKafkaListeners, która dynamicznie tworzy listenery Kafka:
package pl.devset.scse;
import jakarta.annotation.PostConstruct;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.apache.kafka.common.header.Header;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@Configuration
public class DynamicKafkaListeners {
@Value("${spring.kafka.partition:1}")
private int partitionCount;
@Value("${spring.kafka.consumer.topic-name}")
private String topicName;
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
private final EventServices services;
public DynamicKafkaListeners(ConcurrentKafkaListenerContainerFactory<String, String> factory, EventServices services) {
this.factory = factory;
this.services = services;
}
private final List<ConcurrentMessageListenerContainer<String, String>> containers = new ArrayList<>();
@PostConstruct
public void createListeners() {
for (int i = 0; i < partitionCount; i++) {
containers.add(createContainer(i));
}
}
private ConcurrentMessageListenerContainer<String, String> createContainer(int partition) {
ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset(topicName, partition));
containerProps.setGroupId("devset");
containerProps.setMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> message) {
var userId= extractHeaderValue(message, "userId");
services.processEvent(userId, message.value());
}
});
ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(factory.getConsumerFactory(), containerProps);
container.setConcurrency(1);
container.start();
return container;
}
private String extractHeaderValue(ConsumerRecord<String, String> record, String headerKey) {
for (Header header : record.headers().headers(headerKey)) {
return new String(header.value(), StandardCharsets.UTF_8);
}
return null;
}
}
Szczegóły implementacji
Pola konfiguracyjne:
- partitionCount: Liczba partycji w temacie Kafka, określona w pliku konfiguracyjnym aplikacji (application.properties).
- topicName: Nazwa tematu Kafka, również określona w pliku konfiguracyjnym.
List kontenerów:
- containers: Lista przechowująca dynamicznie stworzone kontenery Kafka.
Metoda createListeners:
- Adnotacja @PostConstruct zapewnia, że metoda ta zostanie wywołana po inicjalizacji beana Spring.
- Tworzy listenery dla każdej partycji, zgodnie ze zdeklarowaną ilościa w partitionCount
Metoda createContainer w 4 krokach:
1. Tworzenie właściwości kontenera (ContainerProperties):
- ContainerProperties to klasa konfigurująca właściwości kontenera Kafka.
- new ContainerProperties(new TopicPartitionOffset(topicName, partition)): Określa, dla której partycji (przekazywanej jako argument partition) danego tematu (topicName) ma być stworzony kontener.
- containerProps.setGroupId("devset"): Ustawia identyfikator grupy konsumentów na "devset". Grupa konsumentów to logiczna grupa konsumentów, które wspólnie konsumują komunikaty z partycji tematu.
2. Definiowanie listenera wiadomości:
- containerProps.setMessageListener(new MessageListener<String, String>() {...}): Ustawia listener wiadomości, który będzie reagował na komunikaty z Kafka.
- Listener (MessageListener) implementuje metodę onMessage, która zostanie wywołana dla każdej otrzymanej wiadomości.
- var userId= extractHeaderValue(message, "userId"): Wyodrębnia wartość z nagłówka wiadomości o kluczu "userId" (przydatne np. do identyfikacji użytkownika).
- services.processEvent(userId, message.value()): Przekazuje wyodrębnioną wartość nagłówka oraz treść wiadomości do serwisu EventServices w celu przetworzenia.
3.Tworzenie i konfigurowanie kontenera listenera:
- ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(factory.getConsumerFactory(), containerProps): Tworzy nowy kontener listenera, korzystając z fabryki konsumentów (factory.getConsumerFactory()) oraz właściwości kontenera (containerProps).
- container.setConcurrency(1): Ustawia stopień równoległości przetwarzania na 1, co oznacza, że dla tej partycji będzie działał jeden wątek konsumenta.
4. Uruchomienie kontenera:
- container.start(): Uruchamia kontener, aby zaczął nasłuchiwać komunikatów z przypisanej partycji tematu.
- return container: Zwraca utworzony i uruchomiony kontener
Listener
- Implementacja MessageListener odbiera wiadomości z Kafka i przetwarza je przy pomocy serwisu EventServices.
- Metoda onMessage wywołuje processEvent z EventServices, przekazując wyodrębnioną wartość z nagłówka i treść wiadomości.
Metoda extractHeaderValue:
- Wyodrębnia wartość z nagłówka wiadomości Kafka na podstawie klucza nagłówka.
Konfiguracja application.yml
Poniżej znajduje się przykładowa konfiguracja pliku application.yml, zawierająca właściwości dla dynamicznych listenerów Kafka:
spring:
kafka:
partition: 4
bootstrap-servers: localhost:29092
consumer:
topic-name: incoming-events
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.partition:
Określa liczbę partycji tematu Kafka, które będą obsługiwane przez dynamiczne listenery. W tym przykładzie ustawiono 4 partycje.
spring.kafka.bootstrap-servers:
Adres(y) serwera(ów) Kafka. W tym przypadku jest to localhost:29092. Może to być pojedynczy adres lub lista adresów, które umożliwiają połączenie z klastrem Kafka.
spring.kafka.consumer.topic-name:
Nazwa tematu, z którego konsument będzie odbierał wiadomości. W tym przykładzie jest to incoming-events.
spring.kafka.consumer.auto-offset-reset:
Określa, co zrobić, gdy nie ma początkowego przesunięcia lub przesunięcie jest nieprawidłowe (np. gdy partycja nie istnieje). earliest oznacza, że konsument zacznie czytać wiadomości od najwcześniejszego dostępnego przesunięcia.
spring.kafka.consumer.key-deserializer:
Klasa deserializatora dla kluczy wiadomości. W tym przypadku używana jest klasa org.apache.kafka.common.serialization.StringDeserializer, która deserializuje klucze jako ciągi znaków (String).
spring.kafka.consumer.value-deserializer:
Klasa deserializatora dla wartości wiadomości. Podobnie jak w przypadku kluczy, używana jest klasa org.apache.kafka.common.serialization.StringDeserializer, która deserializuje wartości jako ciągi znaków (String).
Podsumowanie
Implementacja dynamicznego listenera Kafka w Spring Boot pozwala na elastyczne skalowanie aplikacji, dostosowując ją do liczby partycji tematu. Powyższy kod jest prosty w implementacji i zapewnia efektywne przetwarzanie wiadomości w systemach opartych na Kafce. Dzięki takiemu podejściu, Twoja aplikacja będzie mogła łatwo obsługiwać rosnące obciążenia, co jest kluczowe w nowoczesnych rozwiązaniach przetwarzania strumieniowego.
Link do repo