Spring cloud stream run on top kafka binder

Spring Cloud Stream is the solution provided by Spring to build applications connected to shared messaging systems.

It offers an abstraction (the binding) that works the same whatever underneath implementation we use (the binder):

  • Apache Kafka
  • Rabbit MQ
  • Kafka Streams
  • Amazon Kinesis
  • ...

In a previous post Spring Cloud Stream Kafka Streams first steps I got working a simple example using the Kafka Streams binder.

In this one the goal is to use the Kafka Streams binder and the Kafka Streams Processor API to implement the following scenario:

Spring cloud stream run on top kafka binder

  1. We receive messages with key = userId and value = { userId: string, token: number } from topic pub.user.token
  2. For every userId which we receive token 1, 2, 3, 4 and 5 within under 1 minute, we send a completed event to topic pub.user.state
  3. For every userId which we receive at least one token but not the complete 1, 2, 3, 4 and 5 sequence within under 1 minute, we send an expired event to topic pub.user.state

Ready? Let's code! 🤓

If you don't forget to read this

Test-first using kafka-streams-test-utils

Once kafka-streams-test-utils is properly setup in our we can implement this test:

data class UserTokenEvent(val userId: String, val token: Int)
enum class UserStateEventType { COMPLETED, EXPIRED }
data class UserStateEvent(val userId: String, val state: UserStateEventType)
@Test
fun `should publish completed event for one user`() {
  topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 1))
  topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 2))
  topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 3))
  topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 4))
  topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 5))
  topologyTestDriver.advanceWallClockTime(EXPIRATION.minusMillis(10))
  assertThat(topicOut.readKeyValuesToList()).singleElement().satisfies(Consumer { topicOutMessage ->
    assertThat(topicOutMessage.key).isEqualTo(USERNAME_1)
    assertThat(topicOutMessage.value).isEqualTo(UserStateEvent(USERNAME_1, COMPLETED))
  })
}
@Test
fun `should publish expired event for one user`() {
  topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 1))
  topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 2))
  topologyTestDriver.advanceWallClockTime(EXPIRATION.plus(SCHEDULE).plus(SCHEDULE))
  assertThat(topicOut.readKeyValuesToList()).singleElement().satisfies(Consumer { topicOutMessage ->
    assertThat(topicOutMessage.key).isEqualTo(USERNAME_1)
    assertThat(topicOutMessage.value).isEqualTo(UserStateEvent(USERNAME_1, EXPIRED))
  })
}

Enter fullscreen mode Exit fullscreen mode

UserStateStream implementation

We start first with our UserStateStream implementation as a Function:

  • Which input is a KStream, as we want a String as the Kafka message's key and a UserTokenEvent as the Kafka message's value
  • Which output is a KStream, same here, String as the key and UserStateEvent as the value

class UserStateStream(
  private val schedule: Duration,
  private val expiration: Duration
) : Function, KStream> {
  override fun apply(input: KStream): KStream {
    TODO()
  }
}

Enter fullscreen mode Exit fullscreen mode

Now step by step ...

1. Aggregation by userId

private const val USER_STATE_STORE = "user-state"
data class UserState(val userId: String = "", val tokens: List = emptyList()) {
  operator fun plus(event: UserTokenEvent) = UserState(event.userId, tokens + event.token)
}
class UserStateStream(
  private val schedule: Duration,
  private val expiration: Duration
) : Function, KStream> {
  override fun apply(input: KStream): KStream {
    return input
      .selectKey { _, event -> event.userId } // just in case but the key should be userId already
      .groupByKey()
      .aggregate(
        { UserState() },
        { userId, event, state ->
          logger.info("Aggregate $userId ${state.tokens} + ${event.token}")
          state + event // we use the UserState's plus operator
        },
        Materialized.`as`>(USER_STATE_STORE)
          .withKeySerde(Serdes.StringSerde())
          .withValueSerde(JsonSerde(UserState::class.java))
      )
      .toStream()
      // From here down it is just to avoid compilation errors
      .mapValues { userId, _ ->
        UserStateEvent(userId, COMPLETED) 
      }
  }
}

Enter fullscreen mode Exit fullscreen mode

2. Completed UserStateEvents

We can generate completed UserStateEvents straightaway once we receive the last UserTokenEvent:

data class UserState(val userId: String = "", val tokens: List = emptyList()) {
  // ...
  fun isCompleted() = tokens.containsAll(listOf(1, 2, 3, 4, 5))
}
class UserStateStream(
  private val schedule: Duration,
  private val expiration: Duration
) : Function, KStream> {
  override fun apply(input: KStream): KStream {
    return input
      // ...
      .toStream()
      .mapValues { state ->
        logger.info("State $state")
        when {
          state.isCompleted() -> UserStateEvent(state.userId, COMPLETED)
          else -> null
        }
      }
      .filter { _, event -> event != null }
      .mapValues { event ->
        logger.info("Publish $event")
        event!!
      }
  }
}

Enter fullscreen mode Exit fullscreen mode

3. UserStateProcessor implementation

Our UserStateProcessor will scan periodically the "user-state" store and it will apply our expiration logic to every UserState:

class UserStateProcessor(
  private val schedule: Duration,
  private val expiration: Duration
) : Processor {
  override fun init(context: ProcessorContext) {
    context.schedule(schedule, PunctuationType.WALL_CLOCK_TIME) { time ->
      val stateStore = context.getStateStore>>(USER_STATE_STORE)
      stateStore.all().forEachRemaining { it : KeyValue> ->
        logger.info("Do something with $it!!") // TODO
      }
    }
  }
  override fun process(record: Record?) {
    // we do not need to do anything here
  }
}

Enter fullscreen mode Exit fullscreen mode

Just apply the expiration logic this way:

data class UserState(val userId: String = "", val tokens: List = emptyList(), private val expired: Boolean = false) {
  // ...
  fun isExpired() = expired
  fun expire() = UserState(userId, tokens, true)
}
class UserStateProcessor(
  private val schedule: Duration,
  private val expiration: Duration
) : Processor {
  override fun init(context: ProcessorContext) {
    context.schedule(schedule, PunctuationType.WALL_CLOCK_TIME) { time ->
      val stateStore = context.getStateStore>>(USER_STATE_STORE)
      stateStore.all().forEachRemaining {
        val age = Duration.ofMillis(time - it.value.timestamp())
        if (age > expiration) {
          if (it.value.value().isExpired()) {
            // if it is already expired from a previous execution, we delete it
            logger.info("Delete ${it.key}")
            stateStore.delete(it.key)
          } else {
            // if it has expired right now, we mark it as expired and we update it
            logger.info("Expire ${it.key}")
            stateStore.put(it.key, ValueAndTimestamp.make(it.value.value().expire(), it.value.timestamp()))
          }
        }
      }
    }
  }
}

Enter fullscreen mode Exit fullscreen mode

4. UserStateStream and UserStateProcessor integration

class UserStateStream(
  private val schedule: Duration,
  private val expiration: Duration
) : Function, KStream> {
  override fun apply(input: KStream): KStream {
    return input
      // ...
      .toStream()
      // we add the UserStateProcessor
      .apply { process(ProcessorSupplier { UserStateProcessor(schedule, expiration) }, USER_STATE_STORE) }
      // downstream we will both receive upstream realtime values as the ones "generated" by the UserStateProcessor
      .mapValues { state ->
        logger.info("State $state")
        when {
          // null states are sent downstream by UserStateProcessor when deleting entries from the store
          state == null -> null // "null" value generated by UserStateProcessor deleting values from the store
          // completed states are sent downstream from upstream
          state.isCompleted() -> UserStateEvent(state.userId, COMPLETED)
          // expired states are sent downstream by UserStateProcessor when updating entries from the store
          state.isExpired() -> UserStateEvent(state.userId, EXPIRED)
          else -> null
        }
      }
      .filter { _, event -> event != null }
      .mapValues { event ->
        logger.info("Publish $event")
        event!!
      }
  }
}

Enter fullscreen mode Exit fullscreen mode

And just at this point our UserStreamTest should pass 🟩 👌

Kafka Streams binder configuration

Easy!

spring:
  application:
    name: "spring-cloud-stream-kafka-streams-processor"
  cloud:
    stream:
      function:
        definition: userStateStream
        bindings:
          userStateStream-in-0: "pub.user.token"
          userStateStream-out-0: "pub.user.state"
      kafka:
        streams:
          binder:
            applicationId: "${spring.application.name}"
            brokers: "localhost:9094"
            configuration:
              default:
                key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

Enter fullscreen mode Exit fullscreen mode

With this configuration:

  • Spring Cloud Stream will create a Kafka Streams binder connected to localhost:9094
  • We need to create a @Bean named userStateStream that should implement Function interface
    • This @Bean will connect a KStream subscribed to pub.user.token topic to another KStream publishing to pub.user.state topic

You can find all the available configuration properties documented in .

UserStateStream bean

As required by our configuration we need to create a @Bean named

class UserStateStream(
  private val schedule: Duration,
  private val expiration: Duration
) : Function, KStream> {
  override fun apply(input: KStream): KStream {
    TODO()
  }
}

2:

@Configuration
class ApplicationConfiguration {
  @Bean
  fun userStateStream(
    @Value("\${user.schedule}") schedule: Duration,
    @Value("\${user.expiration}") expiration: Duration
  ): Function, KStream> = UserStateStream(schedule, expiration)
}

Enter fullscreen mode Exit fullscreen mode

Integration Test

We already "unit test" our UserStateStream with kafka-streams-test-utils but we need also an integration test using a Kafka container ... Testcontainers to the rescue!

Which of the following binder implementations does Spring Cloud Stream support?

Spring Cloud Stream relies on implementations of the Binder SPI to perform the task of connecting channels to message brokers.

What is Kafka Streams binder?

Kafka Streams binder provides binding capabilities for the three major types in Kafka Streams - KStream , KTable and GlobalKTable . Kafka Streams applications typically follow a model in which the records are read from an inbound topic, apply business logic, and then write the transformed records to an outbound topic.

What is the difference between Spring cloud stream and Kafka stream?

While the contracts established by Spring Cloud Stream are maintained from a programming model perspective, Kafka Streams binder does not use MessageChannel as the target type. The binder implementation natively interacts with Kafka Streams “types” - KStream or KTable .

Is StreamListener deprecated?

The support for StreamListener is deprecated starting with 3.1. 0 of Spring Cloud Stream.