Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
476 views
in Technique[技术] by (71.8m points)

java - How to correctly emit values to Sink from multiple Fluxes (WebsocketSession::receive) in Spring WebFlux?

In my simplified case I want to broadcast a message sent by WebSocket client to all other clients. The application is built using reactive websockets with Spring.

My idea was to use single Sink and if a message is received from the client, emit it on this sink. WebsocketSession::send just forwards events emitted by this Sink to connected clients.

@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .doOnNext {
                    sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
                }
                .then()
        val output = session.send(sink.asFlux().map { message -> session.textMessage(toJson(message)) })

        return Mono.zip(input, output).then()
    }

    fun toJson(obj : Any) : String = objectMapper.writeValueAsString(obj)

    fun <T> fromJson(json : String, clazz : Class<T>) : T{
        return objectMapper.readValue(json, clazz)
    }

}

This implementation is not safe as Sink.emitNext can be called from different threads.

My attempt was to use publishOn and pass a singled threaded Scheduler so that onNext for all WebSocketSessions is called from a single thread. However this does not work. One item is emitted from a websocket client and then all subsequent websocket clients receive onClose event immediately after connection :

@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    private val scheduler = Schedulers.newSingle("sink-scheduler")

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .publishOn(scheduler) // publish on single threaded scheduler
                .doOnNext {
                    sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
                }
                .then()
        ...
    }

}

Another option which I could see is to synchronize on some common lock so that emission is thread safe :

@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    private val lock = Any()

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .doOnNext {
                    synchronized(lock) {
                        sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
                    }
                }
                .then()
        ...
    }


}

However I am not sure if this should be done like that.

The question is

Is it possible to use publishOn in this case so that emission is thread safe and if not what is other solution to this problem (apart of using synchronization like I have done with synchronized keyword).

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Instead of pessimistic locking with the synchronized option, you could create an EmitFailureHandler comparable to FAIL_FAST except it returns true for EmitResult.NON_SERIALIZED_ACCESS.

This would result in the concurrent emit attempts to be immediately retried, like in a busy loop.

Optimistically, this will end up succeeding. You can even make the custom handler introduce a delay or limit the number of times it returns true if you want to be extra defensive against infinite loops.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...