ReactiveX/RxSwift

Setting `maxConcurrentOperationCount` has no effect on `OperationQueueScheduler`.

guyniv opened this issue · 3 comments

Short description of the issue:

When passing an OperationQueue to OperationQueueScheduler with a custom maxConcurrentOperationCount, all tasks are still executed at the same time

Expected outcome:
maxConcurrentOperationCount should limit the concurrent async tasks that are running at all times.

What actually happens:
It behaves as if maxConcurrentOperationCount has the default value - all tasks run concurrently.

Self contained code example that reproduces the issue:

class SampleCode {
  func start() {
    let operationQueue = OperationQueue()
    operationQueue.maxConcurrentOperationCount = 1
    let scheduler = OperationQueueScheduler(operationQueue: operationQueue)
    _ = heavyLiftingTask()
      .subscribe(on: scheduler)
      .bind { _ in
        NSLog("## task ended")
      }

    _ = heavyLiftingTask()
      .subscribe(on: scheduler)
      .bind { _ in
        NSLog("## task ended")
      }
  }

  private func heavyLiftingTask() -> Observable<Void> {
    return Observable.create { subscriber in
      NSLog("## task started")
      return Observable<Int>.interval(.seconds(2), scheduler: MainScheduler.instance)
        .take(1)
        .bind { _ in
          subscriber.onNext(())
          subscriber.onCompleted()
        }
    }
  }
}

Calling start() prints:

2023-08-12 11:08:30.285476+0300 Video[10418:2952148] ## task started
2023-08-12 11:08:30.285800+0300 Video[10418:2952148] ## task started
2023-08-12 11:08:32.440357+0300 Video[10418:2952425] ## task ended
2023-08-12 11:08:32.440640+0300 Video[10418:2952425] ## task ended

I would expect for the first task to finish before the second starts, because maxConcurrentOperationCount is set to 1 so the queue should be serial.

RxSwift/RxCocoa/RxBlocking/RxTest version/commit

RxSwift 6.5.0

Platform/Environment

  • iOS
  • macOS
  • tvOS
  • watchOS
  • playgrounds

How easy is to reproduce? (chances of successful reproduce after running the self contained code)

  • easy, 100% repro
  • sometimes, 10%-100%
  • hard, 2% - 10%
  • extremely hard, %0 - 2%

Xcode version:

  14.3.1 (14E300c)

⚠️ Fields below are optional for general issues or in case those questions aren't related to your issue, but filling them out will increase the chances of getting your issue resolved. ⚠️

Installation method:

  • CocoaPods
  • Carthage
  • Git submodules

I have multiple versions of Xcode installed:
(so we can know if this is a potential cause of your issue)

  • yes (which ones)
  • no

Level of RxSwift knowledge:
(this is so we can understand your level of knowledge
and formulate the response in an appropriate manner)

  • just starting
  • I have a small code base
  • I have a significant code base

This is as expected. The same behavior would occur if you used this code instead:

func heavyLiftingTask() -> Observable<Void> {
    return Observable.create { subscriber in
        NSLog("## task started")
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            subscriber.onNext(())
            subscriber.onCompleted()
        }
        return Disposables.create()
    }
}

The reason it doesn't behave as you expect is because the closure that is passed into the Observable.create is completing immediately. So the operation queue is free to start up the second operation.

The code would work exactly as you expect if the heavyLiftingTask were a synchronous function. Like this:

func heavyLiftingTask() -> Observable<Void> {
    return Observable.create { subscriber in
        NSLog("## task started")
        sleep(2)
        subscriber.onNext(())
        subscriber.onCompleted()
        return Disposables.create()
    }
}

It is helpful if you think of the subscribe(on:) operator as a tool for routing synchronous code onto specific schedulers. Code that is written to perform work asynchronously (like interval(_:scheduler:) or my first example above) get no real benefits by being subscribed onto specific schedulers.

As I have said often on Stack Overflow to these sorts of questions... subscribe(on:) doesn't do what you think it does, and once you understand what it does do, you will likely almost never use it. This issue can be closed.

It makes sense. Thank you!

Thanks for the wonderful answer as usual, @danielt1263 :)