Polidea/RxBluetoothKit

Timeout properly on peripheral.establishConnection

daniel-dumortier opened this issue · 4 comments

Is your feature request related to a problem? Please describe.
My problem is the following :
I want to connect to a known device, from which I stored the UUID, but timeout after 30s if the device is not reachable.
I use peripheral.establishConnection observable, and I put a timeOut operator on establishConnection observable.
I works well when the device is not reachable, the onError.
But when the device is reachable, it disconnects it after 30s (when the timeout is reached).
I tried to add a .take(1) operator, but it then completes and dispose the observable and thus disconnect the device just after connection.

Describe the solution you'd like
I would like to be able to apply a timeout on peripheral.establishConnection that just fires if the device is not found, but does not close the connection if it is found

Additional context
Here is my code snippet :

let connectObservableCreator = {
                return peripheral.establishConnection( options: [
                    CBConnectPeripheralOptionNotifyOnConnectionKey: true,
                    CBConnectPeripheralOptionNotifyOnDisconnectionKey: true,
                    CBConnectPeripheralOptionNotifyOnNotificationKey: true
                    ])
                   
                    .do(onNext: { [weak self] _ in
                        //SdkLogger.logError(textToLog: "connectPeripheral : do onNext connected")
                        self?.observeDisconnect(for: peripheral, timeout: timeout, failed: failed)
                    })
                    .timeout(RxTimeInterval(BleManager.connectTimeoutValue), scheduler: MainScheduler.instance)
                    .take(1)
            }
            let disposable = connectObservableCreator()
            
                .subscribe(onNext: { _ in
                    SdkLogger.logError(textToLog: "connectPeripheral : connected")
                    
                    
                    // discover its services and characteristics
                    _ = self.discoverServices(for: peripheral, hardwareDevice: hardwareDevice, connected: connected, timeout: timeout, failed: failed)
                
                    
                }, onError : { [weak self] error in
                    DispatchQueue.main.async(execute : {
                        
                        self?.onError(error: error, failed: failed, timeout:  timeout)}
                    )}, onCompleted: {
                        //SdkLogger.logError(textToLog: "connection completed")
                }, onDisposed: {
                    //SdkLogger.logError(textToLog: "connection disposed")
                })

The problem is that timeout will fire when you don't get values from the upstream for some time.
So in your case, when the connection is established you receive the Peripheral from the "establishConnection" observable, your upstream, telling you that a new connection was made. Then you don't get more news from that, which is expected, your timeout throws an error that cancels the subscription, and without the subscription your connection is closed.

You want to hold whatever comes from establishConnection to keep the connection alive.

I'm doing similar thing:

private func connect(peripheral: Peripheral) {
    peripheralConnection = peripheral
        .establishConnection(options: [CBConnectPeripheralOptionEnableTransportBridgingKey: true])
        .map { $0.isConnected }
        .startWith(false)
        .distinctUntilChanged()
        .flatMapLatest { connected in
            connected
                ? peripheral.observeConnection()
                : Observable<Bool>
                    .error(RxError.timeout)
                    .delaySubscription(ConnectivityFlowService.connectingTimeout, scheduler: MainScheduler.instance)
        }
        .subscribe(
            onNext: { [weak self] connected in
                self?.context().dispatch(.peripheralConnectionStateChanged(uuid: peripheral.identifier, connected: connected))
            },
            onError: { [weak self] error in
                // Here you want to account for:
                // - connection established and later lost
                // - connection never established

                if case BluetoothError.peripheralIsAlreadyObservingConnection = error { return }
                guard let self = self else { return }

                if self.context().getState().connection.stage.connected != nil {
                    self.context().dispatch(.lostConnection(uuid: peripheral.identifier))
                    return
                }

                guard self.context().getState().connection.stage.tryingKnownDevices != nil else { return }

                self.context().dispatch(
                    .couldNotConnectToDevice(
                        uuid: peripheral.identifier,
                        error: {
                            switch error {
                            case RxSwift.RxError.timeout:
                                return .timeout
                            case BluetoothError.peripheralConnectionFailed,
                                 BluetoothError.peripheralDisconnected,
                                 BluetoothError.destroyed:
                                return .deviceConnectionFailed
                            case BluetoothError.bluetoothUnsupported,
                                 BluetoothError.bluetoothUnauthorized,
                                 BluetoothError.bluetoothPoweredOff,
                                 BluetoothError.bluetoothInUnknownState,
                                 BluetoothError.bluetoothResetting:
                                return .radioNotReady
                            default:
                                return .deviceConnectionFailed
                            }
                        }()
                    )
                )
            }
        )
}

In this example you can see that I observe the state of the peripheral, starting with false, and checking only for distinct events.
The first false will trigger a deferred error inside that flatMapLatest, which will cancel the whole chain.
When I get true in meantime, the deferred error is cancelled (that's what the "latest" in flatMapLatest does), and you switch to a new observable of the peripheral connection changes.

This is working well for me, I hope it helps you and please don't forget to share eventual improvements to this code.

@luizmb
Hi, thanks for your code. I implemented it in my swift App. The only problem I have is

peripheral.observeConnection()

It never return a value (It seems a PublishSubject under the hood). I am new to swift, so maybe I do something wrong.

my code:

localPeripheral.establishConnection()
            .map { $0.isConnected }
            .startWith(false)
            .distinctUntilChanged()
            .flatMapLatest { connected in
                       return connected
                           ? Observable.of(true) // localPeripheral.observeConnection()
                           : Observable<Bool>
                               .error(RxError.timeout)
                               .delaySubscription(RxTimeInterval.seconds(Constants.SecondsTimeout), scheduler: MainScheduler.instance)
                   }
            .subscribe(onNext: {(d) in
                print("subscribe") // never print this if I use localPeripheral.observeConnection()
}, onError:  { (error) in
                   
                })

regards

Hi @lfongaroScp

I changed a lot this code since I posted the response but I've never edited the response. My new approach is not solving everything in a single pipeline anymore, but one for connecting and another for monitoring device connection. I've done that because there's a "handshake" process I need to perform just to confirm that my device responds to certain GATT protocol I use and I shutdown the connection if not, but I don't want this to cause a "lostConnection" action.

This still solves the original question, about the timeout, and maybe this can be helpful for you as well, as I don't use observeConnection anymore (but it worked well for me back then).

This is where I establish the connection or give up after 5 seconds trying. This can happen either if I'm coming from discovery or if I created Peripheral by a known UUID I stored in UserDefaults (let peripheral = central.retrievePeripherals(withIdentifiers: [previouslyPairedDevice.uuid]))

private func connect(peripheral: Peripheral, delayStartConnecting: DispatchTimeInterval? = nil, model: SupportedModel) {
  peripheralConnection =
    // Let's propagate the sequence that reacts first
    // If it's establish connection, we will keep the connection forever
    // Otherwise the error will be subscribed after 5 seconds
    peripheral
      .establishConnection(options: [CBConnectPeripheralOptionEnableTransportBridgingKey: true])
      .amb(                     // The amb operator will pick the sequence that responds first: the establish connection (success)
        Observable<Peripheral>  // or the constant error that it's triggered after 5 seconds (connection attempt timeout).
          .error(RxError.timeout)
          .delaySubscription(ConnectivityFlowService.connectingTimeout, scheduler: MainScheduler.instance)
      )
      .maybeDelaySubscription(delayStartConnecting, scheduler: MainScheduler.instance) // For some flows, we give some time for the user to
      .subscribe(                                                                      // mark this device as "not mine", delaying the start
        onNext: { [weak self] peripheral in                                            // This postpones the amb as well, so it doesn't race
          guard peripheral.isConnected else { return }                                 // against the timeout above, which will start later.
          self?.output.dispatch(.peripheralConnectionStateChanged(uuid: peripheral.identifier, model: model, connected: true))
        },
        onError: { [weak self] error in self?.gotError(error, connectingTo: peripheral) }
      )
}

The Action peripheralConnectionStateChanged will cause other parts of my app to startHandshake, and once I get a handshakeSucceeded I save the UUID in my UserDefaults and monitorPeripheralConnection which calls:

private func monitorPeripheralConnection(uuid: UUID) {
  guard let peripheral = centralManager()
    .retrievePeripherals(withIdentifiers: [uuid])
    .first else { return }

  peripheralDisconnectionObservation = centralManager()
    .observeDisconnect(for: peripheral)
    .subscribe(
      onNext: { [weak self] peripheral, _ in
        guard let self = self, !peripheral.isConnected else { return }
        self.output.dispatch(.lostConnection(uuid: uuid))
      },
      onError: { _ in
        self.output.dispatch(.lostConnection(uuid: uuid))
      },
      onDisposed: { [weak self] in
        self?.peripheralConnection = nil
        self?.peripheralDisconnectionObservation = nil
      }
    )
}

A helper function to give context:

private func peripheralConnectionStateChanged(uuid: UUID, model: SupportedModel, connected: Bool) {
  if connected {
    output.dispatch(.startHandshake(uuid: uuid, model: model))
    return
  }

  switch getState().connection.stage {
  case let .connected(connectedDevice) where connectedDevice.uuid == uuid:
    output.dispatch(.lostConnection(uuid: connectedDevice.uuid))
  case .previouslyPairedDeviceFlow,
       .connected,
       .discoveryFlow,
       .lostConnection,
       .neverConnected:
    break
  }
}

Everything is kinda separated now, but it's not only easier to understand and maintain, but it's much better documented as well.

I hope it helps and if you see possible improvements please don't forget to share.

BR,

Luiz

And please ignore maybeDelaySubscription line, it's just a helper

extension ObservableType {
    public func maybeDelaySubscription(_ dueTime: DispatchTimeInterval?, scheduler: SchedulerType) -> Observable<Self.Element> {
        guard let dueTime = dueTime else { return self.asObservable() }
        return delaySubscription(dueTime, scheduler: scheduler)
    }
}