You should scope your concurrency appropriately using structured concurrency. Your coroutines can leak if you don't do this. In your case, scoping them to the processing of a single message seems appropriate.
Here's an example:
/* I don't know Kafka, but let's pretend this function gets
* called when you receive a new message
*/
suspend fun onMessage(msg: Message) {
val ids: List<Int> = msg.getIds()
val jobs = ids.map { id ->
GlobalScope.launch { restService.post(id) }
}
jobs.joinAll()
}
If one of the calls to restService.post(id)
fails with an exception, the example will immediately rethrow the exception, and all the jobs that hasn't completed yet will leak. They will continue to execute (potentially indefinitely), and if they fail, you won't know about it.
To solve this, you need to scope your coroutines. Here's the same example without the leak:
suspend fun onMessage(msg: Message) = coroutineScope {
val ids: List<Int> = msg.getIds()
ids.forEach { id ->
// launch is called on "this", which is the coroutineScope.
launch { restService.post(id) }
}
}
In this case, if one of the calls to restService.post(id)
fails, then all other non-completed coroutines inside the coroutine scope will get cancelled. When you leave the scope, you can be sure that you haven't leaked any coroutines.
Also, because coroutineScope
will wait until all child-coroutines are done, you can drop the jobs.joinAll()
call.
Side note:
A convention when writing a function that start some coroutines, is to let the caller decide the coroutine scope using the receiver parameter. Doing this with the onMessage
function could look like this:
fun CoroutineScope.onMessage(msg: Message): List<Job> {
val ids: List<Int> = msg.getIds()
return ids.map { id ->
// launch is called on "this", which is the coroutineScope.
launch { restService.post(id) }
}
}
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…