Spring cloud stream run on top kafka binder
Spring Cloud Stream is the solution provided by Spring to build applications connected to shared messaging systems. Show
It offers an abstraction (the binding) that works the same whatever underneath implementation we use (the binder):
In a previous post Spring Cloud Stream Kafka Streams first steps I got working a simple example using the Kafka Streams binder. In this one the goal is to use the Kafka Streams binder and the Kafka Streams Processor API to implement the following scenario:
Ready? Let's code! 🤓 If you don't forget to read this Test-first using kafka-streams-test-utilsOnce kafka-streams-test-utils is properly setup in our we can implement this test:
Enter fullscreen mode Exit fullscreen mode UserStateStream implementationWe start first with our UserStateStream implementation as a Function:
Enter fullscreen mode Exit fullscreen mode Now step by step ... 1. Aggregation by userId
Enter fullscreen mode Exit fullscreen mode 2. Completed UserStateEventsWe can generate completed UserStateEvents straightaway once we receive the last UserTokenEvent:
Enter fullscreen mode Exit fullscreen mode 3. UserStateProcessor implementationOur UserStateProcessor will scan periodically the "user-state" store and it will apply our expiration logic to every UserState:
Enter fullscreen mode Exit fullscreen mode Just apply the expiration logic this way:
Enter fullscreen mode Exit fullscreen mode 4. UserStateStream and UserStateProcessor integration
Enter fullscreen mode Exit fullscreen mode And just at this point our UserStreamTest should pass 🟩 👌 Kafka Streams binder configurationEasy!
Enter fullscreen mode Exit fullscreen mode With this configuration:
You can find all the available configuration properties documented in . UserStateStream beanAs required by our configuration we need to create a @Bean named
2:
Enter fullscreen mode Exit fullscreen mode Integration TestWe already "unit test" our UserStateStream with kafka-streams-test-utils but we need also an integration test using a Kafka container ... Testcontainers to the rescue! Which of the following binder implementations does Spring Cloud Stream support?Spring Cloud Stream relies on implementations of the Binder SPI to perform the task of connecting channels to message brokers. What is Kafka Streams binder?Kafka Streams binder provides binding capabilities for the three major types in Kafka Streams - KStream , KTable and GlobalKTable . Kafka Streams applications typically follow a model in which the records are read from an inbound topic, apply business logic, and then write the transformed records to an outbound topic. What is the difference between Spring cloud stream and Kafka stream?While the contracts established by Spring Cloud Stream are maintained from a programming model perspective, Kafka Streams binder does not use MessageChannel as the target type. The binder implementation natively interacts with Kafka Streams “types” - KStream or KTable . Is StreamListener deprecated?The support for StreamListener is deprecated starting with 3.1. 0 of Spring Cloud Stream. |