I have a spring boot kotlin app that creates a web socket connection to another spring app, sends multiple "subscribe" messages, and then needs to wait for receipt of one response per subscription on the web socket connection. The number of subscriptions open at a given time could be up to a few thousand.
I've come up with a basic working solution using CompletableFuture and coroutines, as below. Is there a more idiomatic or concise way to do this task, or is this a fine solution? Any suggestions for improvement are appreciated.
// InputObject / ResponseObject are generic placeholders
fun getItems(inputObjects: List<InputObject>): List<ResponseObject> {
val ret: ConcurrentLinkedQueue<ResponseObject> = ConcurrentLinkedQueue()
// create a completable future for each input object
val subscriptions: MutableMap<String, CompletableFuture<ResponseObject>> = mutableMapOf()
inputObjects.forEach {
subscriptions[it.id] = CompletableFuture()
}
// create web socket client configured with a lambda handler to
// fulfill each subscription
// each responseObject.id matches one inputObject.id
val client = createWebSocketClient({
try {
val responseObject = objectMapper.readValue(it, ResponseObject::class.java)
subscriptions[responseObject.id]?.complete(responseObject)
} catch (e: Exception) {
logger.warn("Exception reading data: ${e.message}")
}
})
runBlocking {
coroutineScope {
for (item in inputObjects) {
launch {
// create and send a subscribe request
client.sendMessage(createSubscribe(item.id))
// wait for each future to complete
// uses CompletableFuture extension await() from kotlinx-coroutines-jdk8
val result = subscriptions[item.id]?.await()
if (result != null) {
ret.add(result)
}
}
}
}
}
client.close()
return ret.toList()
}
edit: I found a similar question: How to pass result as it comes using coroutines?
Which options makes the most sense?
question from:
https://stackoverflow.com/questions/66047038/is-there-a-more-idiomatic-way-to-perform-a-subscribe-async-await-operation 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…