Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
335 views
in Technique[技术] by (71.8m points)

kotlin - Is there a more idiomatic way to perform a subscribe & async / await operation?

I have a spring boot kotlin app that creates a web socket connection to another spring app, sends multiple "subscribe" messages, and then needs to wait for receipt of one response per subscription on the web socket connection. The number of subscriptions open at a given time could be up to a few thousand.

I've come up with a basic working solution using CompletableFuture and coroutines, as below. Is there a more idiomatic or concise way to do this task, or is this a fine solution? Any suggestions for improvement are appreciated.

// InputObject / ResponseObject are generic placeholders
fun getItems(inputObjects: List<InputObject>): List<ResponseObject> {
    val ret: ConcurrentLinkedQueue<ResponseObject> = ConcurrentLinkedQueue()

    // create a completable future for each input object
    val subscriptions: MutableMap<String, CompletableFuture<ResponseObject>> = mutableMapOf()
    inputObjects.forEach {
        subscriptions[it.id] = CompletableFuture()
    }

    // create web socket client configured with a lambda handler to
    // fulfill each subscription
    // each responseObject.id matches one inputObject.id
    val client = createWebSocketClient({
        try {
            val responseObject = objectMapper.readValue(it, ResponseObject::class.java)
            subscriptions[responseObject.id]?.complete(responseObject)
        } catch (e: Exception) {
            logger.warn("Exception reading data: ${e.message}")
        }
    })

    runBlocking {
        coroutineScope {
            for (item in inputObjects) {
                launch {
                    // create and send a subscribe request
                    client.sendMessage(createSubscribe(item.id))

                    // wait for each future to complete
                    // uses CompletableFuture extension await() from kotlinx-coroutines-jdk8
                    val result = subscriptions[item.id]?.await()
                    if (result != null) {
                        ret.add(result)
                    }
                }
            }
        }
    }

    client.close()

    return ret.toList()
}

edit: I found a similar question: How to pass result as it comes using coroutines?

Which options makes the most sense?

question from:https://stackoverflow.com/questions/66047038/is-there-a-more-idiomatic-way-to-perform-a-subscribe-async-await-operation

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
fun getItems(inputObjects: List<InputObject>): List<ResponseObject> {
    val subscriptions = ids.associateTo(mutableMapOf()) { it.id to CompletableFuture<ResponseObject>() }

    val client = createWebSocketClient({
        try {
            val responseObject = objectMapper.readValue(it, ResponseObject::class.java)
            subscriptions[responseObject.id]?.complete(responseObject)
        } catch (e: Exception) {
            logger.warn("Exception reading data: ${e.message}")
        }
    })

    return runBlocking(Dispatchers.IO) {
      inputObjects
        .mapNotNull {
            client.sendMessage(createSubscribe(item.id))
            subscriptions[item.id]?.await()
        }
    }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...