author-avatar
Dominik Martyniak
Spring Boot 13 minut

Spring Cloud Stream 4.0: Konfiguracja, Konsumenci, Producenci

Wraz z pojawieniem się Spring Boota wersji 3, zaktualizowana została także biblioteka Spring Cloud Stream , która umożliwia komunikację z systemami kolejek takimi jak RabbitMQ, Kafka i inne. Przejście na nową wersję 4.0 wiąże się z istotnymi zmianami, które zmusiły mnie do głębszego zanurzenia się w dokumentacji. Większość dostępnych poradników i tutoriali skupia się na zastosowaniu adnotacji, które w najnowszej wersji zostały zastąpione przez konfigurację w pliku application.yml. 


Czym jest Spring Cloud Stream ?



Spring Cloud Stream to framework opracowany na bazie Spring Boot i Spring Integration, jego głównym celem jest umożliwienie łatwej i efektywnej komunikacji przez wykorzystanie wiadomości.


Dependency:

 

Cały kod znajduje się w repozytorium na koncu artykułu.


W projekcie wykorzystującym Spring Cloud Stream dla komunikacji z RabbitMQ, konieczne jest dodanie odpowiednich zależności w pliku build.gradle. Poniżej przedstawiam wymagane zależności:

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-web' //webówka
	testImplementation 'org.springframework.boot:spring-boot-starter-test' //testy
	implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit:4.1.0' //binder dla rabbita
}

 

  • Spring Boot Starter Web: Ta zależność (org.springframework.boot:spring-boot-starter-web) jest niezbędna do tworzenia aplikacji webowych. Zapewnia wsparcie dla tworzenia RESTful aplikacji z wykorzystaniem Spring MVC. Umożliwia łatwe tworzenie endpointów webowych.

 

  • Spring Boot Starter Test: Moduł testowy (org.springframework.boot:spring-boot-starter-test) dostarcza niezbędne narzędzia i biblioteki do przeprowadzania testów jednostkowych i integracyjnych. Zawiera takie biblioteki jak JUnit, Spring Test, AssertJ, Hamcrest i inne, co ułatwia efektywne tworzenie i zarządzanie testami w projekcie.

 

  • Spring Cloud Stream Binder Rabbit: Zależność org.springframework.cloud:spring-cloud-stream-binder-rabbit:4.1.0 jest kluczowa dla integracji z RabbitMQ. Binder ten umożliwia aplikacji komunikację z RabbitMQ przez abstrakcję dostarczaną przez Spring Cloud Stream, co znacznie upraszcza konfigurację i zarządzanie strumieniami danych.

 

Konsumer zdarzeń



Poniższy fragment kodu ilustruje, jak można zaimplementować prostego listinera na wiadomości w aplikacji używającej RabbitMQ z Spring Cloud Stream, z prostym printowaniem wiadomosci 

package pl.devset.scse.events;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.function.Consumer;

@Configuration
public class MQEvent {

    @Bean
    public Consumer<String>  consumeEvent() {
        return message -> System.out.println("Received: " + message);
    }

}

@Configuration:

Ta adnotacja wskazuje, że klasa służy jako źródło definicji beanów. Klasy oznaczone tą adnotacją są przetwarzane przez kontener Springa w celu generowania definicji beanów i usług.


@Bean:

Adnotacja używana nad metodą wskazuje, że metoda ta generuje bean, który ma być zarządzany przez kontener Spring. Bean ten jest rejestrowany pod domyślną nazwą metody, chyba że podano inną.


Consumer<String>

Typ zwracany przez metodę, wskazuje na interfejs Consumer z generykiem String. Oznacza to, że akceptuje on funkcję przyjmującą jeden argument typu String.


Lambda message -> System.out.println("Received: " + message):

Ciało metody definiuje, że dla każdego przetwarzanego komunikatu (typu String) ma zostać wyświetlona informacja o jego odbiorze na standardowym wyjściu.

Dodawanie producenta do Spring Cloud Stream


Po skonfigurowaniu odbiorcy (listenera) w Twojej aplikacji Spring Cloud Stream, kolejnym krokiem jest dodanie producenta, który będzie odpowiedzialny za wysyłanie wiadomości do kolejki RabbitMQ. 

package pl.devset.scse.events;

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;

import java.time.LocalDateTime;

@Service
public class MQService {


    @Autowired
    private StreamBridge streamBridge;


    @Scheduled(fixedDelay =1000 )
    void send(){
        Message<String> message = MessageBuilder.withPayload("Hello, RabbitMQ from DEVSET! " + LocalDateTime.now().toString()).build();
        streamBridge.send("events-stream-producer", message);
    }
}

 

Ten fragment kodu demonstruje jak można skonfigurować serwis do cyklicznego wysyłania wiadomości do kolejki RabbitMQ, wykorzystując mechanizm planowania zadań (scheduler) oraz abstrakcje Spring Cloud Stream. Klasa MQService jest przykładem usługi, która wysyła testowe wiadomości w określonych odstępach czasu.

 

StreamBridge to komponent Spring Cloud Stream, który umożliwia dynamiczne wysyłanie wiadomości do określonych kanałów bez konieczności definiowania wyjść w interfejsie programowania aplikacji (API). Jest to szczególnie przydatne w aplikacjach mikroserwisowych, gdzie elastyczność i zdolność do adaptacji do zmieniających się wymagań są kluczowe. Oto główne powody, dla których warto używać StreamBridge:

  1. Dynamiczne kierowanie wiadomości
    StreamBridge pozwala na wysyłanie wiadomości do różnych kanałów w czasie rzeczywistym bez potrzeby rekonfiguracji lub ponownego uruchamiania usługi. 
  2. Prostota użycia
    Za pomocą StreamBridge można wysyłać wiadomości bez deklarowania dedykowanych binderów lub konfiguracji specyficznych dla kanałów
  3. Integracja z mikroserwisami
    StreamBridge jest naturalnym wyborem w architekturze mikroserwisowej, gdzie poszczególne serwisy często muszą komunikować się w elastyczny sposób.
  4. Obsługa różnorodnych źródeł danych
    Możliwość wysyłania wiadomości do różnych kanałów pozwala na efektywniejszą obsługę różnorodnych źródeł i celów danych. Na przykład, aplikacja może odbierać dane z wielu źródeł i na podstawie logiki biznesowej decydować, gdzie dane powinny być dalej przekierowane lub przetworzone.
  5. Zwiększenie odporności systemu
    Użycie StreamBridge może przyczynić się do zwiększenia odporności systemu poprzez dekupling producentów danych od konsumentów. Producenci nie muszą znać szczegółów konsumpcji, co sprawia, że system jest bardziej modularny i łatwiejszy w utrzymaniu.
  6. Łatwa skalowalność
    Wraz z rosnącymi wymaganiami dotyczącymi przetwarzania danych, StreamBridge pozwala na łatwe skalowanie wysyłki wiadomości bez zmian w kodzie aplikacji. Można dynamicznie dodawać nowe kanały i obsługiwać większą ilość danych bez wprowadzania istotnych zmian w infrastrukturze.

Podsumowując, StreamBridge oferuje znaczną elastyczność i wygodę w zarządzaniu komunikacją w aplikacjach opartych na Spring Cloud Stream, co czyni go idealnym narzędziem dla dynamicznie rozwijających się, złożonych systemów mikroserwisowych.

Ale wracając do kolejnych linijek kodu

@Service:

Oznacza, że klasa pełni rolę serwisu, który zarządza logiką biznesową aplikacji.

@Autowired:

Adnotacja używana do automatycznego wstrzykiwania zależności. Tutaj StreamBridge jest wstrzykiwany do serwisu, co umożliwia wysyłanie wiadomości bez konieczności ręcznej konfiguracji. (zalecałbym jednak używać wstrzykiwania przez konstruktor, ale na potzreby tego prostego przykładu użyłem tej adnotacji)

@Scheduled(fixedDelay = 1000):

Adnotacja określająca, że metoda send powinna być wykonywana co 1000 milisekund (co sekundę). Jest to przydatne do testowania i symulacji ciągłego strumienia danych.

MessageBuilder.withPayload(...):

 Tworzy nową wiadomość z załączonym ładunkiem, który zawiera tekst i aktualny czas. To demonstruje, jak dynamicznie tworzyć treść wiadomości.

streamBridge.send(...):

Metoda wysyłająca skonstruowaną wiadomość do kanału o nazwie events-stream-producer. 

 

Konfiguracja Spring Cloud Stream i RabbitMQ: Odkrywanie Magii

 

Poniżej przedstawiam prosty przykład konfiguracji, jak możesz ona wyglądać dla naszego konsumera i producenta

 

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: ''
    password: ''
    virtual-host: 

  cloud:
    stream:
      bindings:
        # Defines a specific binding for your application
        incoming-events-stream-consumer:
          destination: incoming-events
          group: devset
        events-stream-producer:
          destination: incoming-events
          group: devset
      rabbit:
        bindings:
          incoming-events-stream-consumer: # RabbitMQ-specific configuration for the binding defined above
            consumer:
              bindQueue: true
              bindingRoutingKey: '#' # The routing key to use for the binding; empty means all messages are received
          events-stream-producer:
            producer:
              exchangeType: topic
      function:
        bindings:
          consumeEvent-in-0: incoming-events-stream-consumer

  1. host: Adres serwera RabbitMQ.
  2. port: Port, na którym serwer RabbitMQ nasłuchuje połączeń.
  3. username i password: Dane uwierzytelniające, które są używane do połączenia z serwerem.
  4. virtual-host: Specyficzna przestrzeń wirtualna na serwerze RabbitMQ, która izoluje pewne zasoby i konfiguracje.
  • Bindings

incoming-events-stream-consumer i events-stream-producer:

To są nazwy logiczne dla połączeń konsumenckich i produkcyjnych. Oba używają tego samego destination, czyli incoming-events, co oznacza, że obie strumienie operują na tym samym punkcie wymiany danych (destination).

group:

Wszystkie instancje mikroserwisu subskrybujące ten strumień należą do grupy devset. Dzięki temu RabbitMQ wie, jak zarządzać wiadomościami dla skalowania poziomego i zapewnienia, że każda wiadomość zostanie przetworzona tylko raz.

  • Specyfika RabbitMQ

bindQueue:

Parametr określający, czy kolejka ma być automatycznie powiązana z exchange’em (punkt wymiany).
bindingRoutingKey: Klucz routingu # oznacza, że konsumer będzie odbierać wszystkie wiadomości przekazywane przez exchange.

exchangeType:

Typ topic umożliwia bardziej elastyczne routowanie wiadomości oparte na wzorcach kluczy routingu.

  • Sekcja function.bindings 

Mapuje funkcje w aplikacji do określonych strumieni, co umożliwia łatwą integrację i zarządzanie przepływem danych. W tym przypadku consumeEvent-in-0 jest przypisany do incoming-events-stream-consumer, co pozwala funkcji consumeEvent nasłuchiwać na wiadomości przychodzące z tego strumienia.


Efekt końcowy

 

Jak widać wiadomości są wysyłane na odpwienie que, a następnie odczytywane prez naszego konsumera.

Received: Hello, RabbitMQ from DEVSET! 2024-04-26T20:45:32.677521100
Received: Hello, RabbitMQ from DEVSET! 2024-04-26T20:45:33.680558
Received: Hello, RabbitMQ from DEVSET! 2024-04-26T20:45:34.681920900
Received: Hello, RabbitMQ from DEVSET! 2024-04-26T20:45:35.697853300
Received: Hello, RabbitMQ from DEVSET! 2024-04-26T20:45:36.712690600
Received: Hello, RabbitMQ from DEVSET! 2024-04-26T20:45:37.724929500
Received: Hello, RabbitMQ from DEVSET! 2024-04-26T20:45:38.738118700
Received: Hello, RabbitMQ from DEVSET! 2024-04-26T20:45:39.738906400
Received: Hello, RabbitMQ from DEVSET! 2024-04-26T20:45:40.752305800
Received: Hello, RabbitMQ from DEVSET! 2024-04-26T20:45:41.758376500
Received: Hello, RabbitMQ from DEVSET! 2024-04-26T20:45:42.772234700
Received: Hello, RabbitMQ from DEVSET! 2024-04-26T20:45:43.785111
Received: Hello, RabbitMQ from DEVSET! 2024-04-26T20:45:44.801253900
Received: Hello, RabbitMQ from DEVSET! 2024-04-26T20:45:45.802241700

 

Podsumowując, artykuł stanowi kompleksowy przewodnik po implementacji i konfiguracji Spring Cloud Stream, prezentując zmiany w wersji 4.0 oraz szczegółowo omawiając kluczowe aspekty integracji z RabbitMQ.  Dodatkowo, omówiłem wykorzystanie StreamBridge do dynamicznego wysyłania wiadomości, co jest użyteczne w różnorodnych scenariuszach biznesowych i technologicznych. Spring Cloud Stream 4.0 oferuje rozbudowane możliwości dla developerów i architektów systemów, którzy poszukują skutecznych rozwiązań do budowy odpornych na błędy i skalowalnych systemów mikroserwisowych, korzystających z asynchronicznej komunikacji między usługami.

Tutaj znajduje sie kod źródłowy

2
[spring boot, spring cloud]

Więcej od Dominik Martyniak

Więcej artykułów