Spring Cloud Stream은 메시지 기반 마이크로서비스를 쉽게 개발하기 위한 여러 추상화 방식을 제공한다.
Spring Cloud Stream은 rabbitmq, kafka 같은 미들웨어에 종속되지 않고 중립적으로 구성됩니다.
rabbitmq나 kafka에 특화된 기능들은 각 미들웨어에 맞는 binder 구현에서 처리를 합니다.
Destination Binders : 외부 메시징 시스템과의 통합을 제공하는 구성 요소
Bindings : 외부 메시징 시스템과 애플리케이션 제공 메시지의 메시지 생성자(Producers) 및 소비자(Consumers) 간의 Bridge
Binder의 역할은 메시징 시스템과의 연결, 위임 및 생산자와 소비자 간의 메시지 라우팅, 데이터 유형 변환, 사용자 코드 호출 등을 담당한다.
spring cloud stream를 사용한 프로젝트를 새로 셋팅하면서 몇년 전에 사용하던 애너테이션 방식은 이제 deprecated 되었다.
이제 함수형 프로그래밍 방식으로 설정해야된다.
아래와 같이 @EnableBinding, @Input, @Out, @StreamListener, @StreamMessageConverter 등이 모두 deprecated 되었다.
검색 중에 @EnableBinding, @StreamListner 등이 샘플로 나오는 것들은 모두 3.x.x 이하여서 공식 문서를 보는 것을 추천한다.
Bindings 외부 메시징 시스템(queue, topic) 과 Producers, Consumers 등을 제공하는 애플리케이션과의 다리(Bridge) 역할을 한다.
아래와 같이 Function을 Bean으로 생성하기만 하면 이미 메시지 처리를 할 수 있는 Spring Cloud Stream 애플리케이션을 작성한 것이다.
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
}
아래와 같은 설정을 해놓으면 my-topic이란 topic에 있는 메시지를 소비하는 Consumer가 완성된다.
spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic
위에 function의 네이밍 컨벤션은 다음과 같다. 아래 내용을 꼭 알고 있어야지 bindings 설정을 할 수 있다.
- input - <functionName> + -in- + <index>
- output - <functionName> + -out- + <index>
아래와 같이 명시적으로 uppercase-in-0이란 bindings 이름을 input으로 바꿀 수 있다.
spring.cloud.stream.function.bindings.uppercase-in-0=input
Spring Cloud Stream에서 설정 시 kafka나 rabbitmq에 특화된 기능들은 설정 위치가 다르니 꼭 문서를 참고하면서 확인해봐야한다.
아래 처럼 resetOffSets는 kafka에 설정이기 때문에 맞게 설정을 해주었다.
spring:
profiles:
active: local
cloud:
stream:
bindings:
log-in-0:
group: log-group
destination: log
log-out-0:
destination: log
personConsumer-in-0:
group: person-group
destination: person
personSupplier-out-0:
destination: person
product-in-0:
group: product-group
destination: product
product-out-0:
destination: product
kafka:
binder:
brokers: localhost:9092
auto-create-topics: false
bindings:
log-in-0:
consumer:
resetOffsets: false
streams:
binder:
auto-create-topics: false
function:
definition: log;personConsumer;personSupplier;product
공통 Consumer Properties 설정
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_consumer_properties
kafka binder properties 설정
https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties
그외 다른 Binder 들
autuator endpoints
http://localhost:8080/actuator/functions
http://localhost:8080/actuator/bindings
문제 해결
Supplier, Consumer를 한 Configuration에서 정의했을 때 발생함.
Can't determine default function definition. Please use 'spring.cloud.function.definition' property to explicitly define it.
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_multiple_functions_in_a_single_application
Bean으로 생성 시에 로직이 너무 길어서 분리하고 싶을 때 아래 코드 참고
https://docs.spring.io/spring-cloud-function/docs/current/reference/html/spring-cloud-function.html#_comparing_functional_with_traditional_bean_definitions
메시지 전송 시에 기본 생성자가 꼭 있어야 됨.
Lombok 사용 시에 주의 해야됨. 아래와 같이 선언한 경우 꼭 @NoArgsContructor도 붙여주자.
@AllArgsConstructor
@NoArgsConstructor
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
샘플 코드
https://github.com/yepapa/spring-cloud-stream-samples
참고 자료
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-preface-notable-deprecations
https://spring.io/blog/2020/07/13/introducing-java-functions-for-spring-cloud-stream-applications-part-0
https://spring.io/blog/2020/07/20/introducing-java-functions-for-spring-cloud-stream-applications-part-1
https://spring.io/blog/2020/07/27/creating-a-supplier-function-and-generating-spring-cloud-stream-source
https://docs.spring.io/spring-cloud-function/docs/current/reference/html/spring-cloud-function.html#function_visualization
https://spring.io/blog/2020/12/15/testing-spring-cloud-stream-applications-part-2