I have thought a bit about your use-case. With sharing the same connection you're introducing states to your application which needs a bit of state handling and therefore it is impossible (or at least I have no idea how) to be purely reactive.
I have focused on establishing a connection and performing a write-notify transmission to a BLE device that is serialized.
private PublishSubject<Pair<byte[], Integer>> inputSubject = PublishSubject.create();
private PublishSubject<Pair<byte[], Integer>> outputSubject = PublishSubject.create();
private Subscription connectionSubscription;
private volatile int uniqueId = 0; // used to identify the transmission we're interested in in case more than one will be started at the same time
public void connect() {
Observable<RxBleConnection> connectionObservable = // your establishing of the connection wether it will be through scan or RxBleDevice.establishConnection()
final UUID notificationUuid = // your notification characteristic UUID
final UUID writeUuid = // your write-only characteristic UUID
connectionSubscription = connectionObservable
.flatMap(
rxBleConnection -> rxBleConnection.setupNotification(notificationUuid), // subscribing for notifications
(rxBleConnection, notificationObservable) -> // connection is established and notification prepared
inputSubject // waiting for the data-packet to transmit
.onBackpressureBuffer()
.flatMap(bytesAndFilter -> {
return Observable.combineLatest( // subscribe at the same time to
notificationObservable.take(1), // getting the next notification bytes
rxBleConnection.writeCharacteristic(writeUuid, bytesAndFilter.first), // transmitting the data bytes to the BLE device
(responseBytes, writtenBytes) -> responseBytes // interested only in the response bytes
)
.doOnNext(responseBytes -> outputSubject.onNext(new Pair<>(responseBytes, bytesAndFilter.second))); // pass the bytes to the receiver with the identifier
},
1 // serializing communication as only one Observable will be processed at the same time
)
)
.flatMap(observable -> observable)
.subscribe(
response -> { /* ignored here - used only as side effect with outputSubject */ },
throwable -> outputSubject.onError(throwable)
);
}
public void disconnect() {
if (connectionSubscription != null && !connectionSubscription.isUnsubscribed()) {
connectionSubscription.unsubscribe();
connectionSubscription = null;
}
}
public Observable<byte[]> writeData(byte[] data) {
return Observable.defer(() -> {
final int uniqueId = this.uniqueId++; // creating new uniqueId for identifying the response
inputSubject.onNext(new Pair<>(data, uniqueId)); // passing the data with the id to be processed by the connection flow in connect()
return outputSubject
.filter(responseIdPair -> responseIdPair.second == uniqueId)
.first()
.map(responseIdPair -> responseIdPair.first);
}
);
}
This is an approach that I think is good as the whole flow is described in one place and therefore easier to understand. The part of communication that is stateful (writing request and waiting for response) is serialized and it has the possibility to persist the connection till the disconnect()
call.
The downside is that the transmission relies on the side effects of different flow and calling the writeData()
before the connection is established and notification setup will never complete the returned Observable though it shouldn't be a problem to add handling for this scenario with a state check.
Best Regards
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…