You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Currently there seems to be no way to consume mongo event streams in a Kotlin coroutines setup. See the following test:
@DataMongoTest
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
@Import(TestcontainersConfiguration::class)
class MongoAggregateChangesTest(private val mongoTemplate: ReactiveMongoTemplate) {
@Test
fun testFlux() { // success
runTest { onHit ->
mongoTemplate.changeStream<TestDocument>().listen()
.subscribe { onHit.apply(it.body!!) }
}
}
@Test
fun testLaunch() { // fails
runTest { onHit ->
launch {
mongoTemplate.changeStream<TestDocument>().listen()
.subscribe { onHit.apply(it.body!!) }
}
}
}
@Test
fun testCoroutine() { // fails
runTest { onHit ->
launch {
mongoTemplate.changeStream<TestDocument>().listen().asFlow()
.collect { onHit.apply(it.body!!) }
}
}
}
private fun runTest(listener: suspend CoroutineScope.(EventHandler) -> Unit) {
runBlocking {
val check = mutableSetOf<Int>()
listener(DefaultEventHandler(check))
Flux.range(0, 100).flatMap { mongoTemplate.save(TestDocument(it)) }.subscribe { println("Saved: ${it.id}") }
await().atMost(Duration.ofSeconds(3)).until { check.size == 100 }
}
}
class TestDocument(val id: Int)
class DefaultEventHandler(private val check: MutableSet<Int>) : EventHandler {
override fun apply(document: TestDocument) {
val id = document.id
println("Received: $id")
check.add(id)
}
}
fun interface EventHandler {
fun apply(document: TestDocument)
}
}
In my project, I cannot use event streams as flux, as the events need to be mapped by suspend functions. Is there any way to use event streams with coroutines? asFlow() seems to cause issues not only in this test, but also in projects where the flow is consumed by controller methods.
I think this would require a dedicated CoroutineMongoTemplate that makes use of the Kotlin MongoClient.
The text was updated successfully, but these errors were encountered:
I found a solution for my problem: Instead of using asFlow(), I subscribe to the change stream and emit all events to a Kotlin MutableSharedFlow. Then my clients can consume the SharedFlow without any issues. Anyway, a dedicated coroutines support would still be nice.
Kotlin Coroutines have a different concept than Reactive Streams. While a Flow is somewhat similar to Publisher, Coroutines try to advance the flow in a step-by-step fashion, as if you would chain Futures. The changestream is a hot publisher emitting potentially an infinite number of events. Trying to collect these to advance later on isn't conceptually going to work.
We do not plan to add another Kotlin-specific extension to Changestreams.
Currently there seems to be no way to consume mongo event streams in a Kotlin coroutines setup. See the following test:
In my project, I cannot use event streams as flux, as the events need to be mapped by suspend functions. Is there any way to use event streams with coroutines?
asFlow()
seems to cause issues not only in this test, but also in projects where the flow is consumed by controller methods.I think this would require a dedicated
CoroutineMongoTemplate
that makes use of the KotlinMongoClient
.The text was updated successfully, but these errors were encountered: