Skip to content

Add Kotlin Coroutine extension (or API) for Change Streams consumption #4987

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
huberchrigu opened this issue May 29, 2025 · 2 comments
Closed
Assignees
Labels
status: declined A suggestion or change that we don't feel we should currently apply

Comments

@huberchrigu
Copy link

Currently there seems to be no way to consume mongo event streams in a Kotlin coroutines setup. See the following test:

@DataMongoTest
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
@Import(TestcontainersConfiguration::class)
class MongoAggregateChangesTest(private val mongoTemplate: ReactiveMongoTemplate) {
    @Test
    fun testFlux() { // success
        runTest { onHit ->
            mongoTemplate.changeStream<TestDocument>().listen()
                .subscribe { onHit.apply(it.body!!) }
        }
    }

    @Test
    fun testLaunch() { // fails
        runTest { onHit ->
            launch {
                mongoTemplate.changeStream<TestDocument>().listen()
                    .subscribe { onHit.apply(it.body!!) }
            }
        }
    }

    @Test
    fun testCoroutine() { // fails
        runTest { onHit ->
            launch {
                mongoTemplate.changeStream<TestDocument>().listen().asFlow()
                    .collect { onHit.apply(it.body!!) }
            }
        }
    }

    private fun runTest(listener: suspend CoroutineScope.(EventHandler) -> Unit) {
        runBlocking {
            val check = mutableSetOf<Int>()
            listener(DefaultEventHandler(check))
            Flux.range(0, 100).flatMap { mongoTemplate.save(TestDocument(it)) }.subscribe { println("Saved: ${it.id}") }

            await().atMost(Duration.ofSeconds(3)).until { check.size == 100 }
        }
    }

    class TestDocument(val id: Int)

    class DefaultEventHandler(private val check: MutableSet<Int>) : EventHandler {
        override fun apply(document: TestDocument) {
            val id = document.id
            println("Received: $id")
            check.add(id)
        }
    }

    fun interface EventHandler {
        fun apply(document: TestDocument)
    }
}

In my project, I cannot use event streams as flux, as the events need to be mapped by suspend functions. Is there any way to use event streams with coroutines? asFlow() seems to cause issues not only in this test, but also in projects where the flow is consumed by controller methods.

I think this would require a dedicated CoroutineMongoTemplate that makes use of the Kotlin MongoClient.

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label May 29, 2025
@mp911de mp911de self-assigned this Jun 2, 2025
@huberchrigu
Copy link
Author

I found a solution for my problem: Instead of using asFlow(), I subscribe to the change stream and emit all events to a Kotlin MutableSharedFlow. Then my clients can consume the SharedFlow without any issues. Anyway, a dedicated coroutines support would still be nice.

@mp911de
Copy link
Member

mp911de commented Jun 4, 2025

Kotlin Coroutines have a different concept than Reactive Streams. While a Flow is somewhat similar to Publisher, Coroutines try to advance the flow in a step-by-step fashion, as if you would chain Futures. The changestream is a hot publisher emitting potentially an infinite number of events. Trying to collect these to advance later on isn't conceptually going to work.

We do not plan to add another Kotlin-specific extension to Changestreams.

@mp911de mp911de closed this as not planned Won't fix, can't repro, duplicate, stale Jun 4, 2025
@mp911de mp911de added status: declined A suggestion or change that we don't feel we should currently apply and removed status: waiting-for-triage An issue we've not yet triaged labels Jun 4, 2025
@mp911de mp911de changed the title Change streams with Kotlin coroutines Add Kotlin Coroutine extension (or API) for Change Streams consumption Jun 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: declined A suggestion or change that we don't feel we should currently apply
Projects
None yet
Development

No branches or pull requests

3 participants