Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.0k views
in Technique[技术] by (71.8m points)

swift - How to limit flatMap concurrency in Combine still having all source events processed?

If I specify the maxPublishers parameter then source events after first maxPublishers events won't be flat mapped. While I want to limit only concurrency. That is to continue processing next events after some of the first maxPublishers flat map publishers have completed.

Publishers.Merge(
    addImageRequestSubject
        .flatMap(maxPublishers: .max(3)) { self.compressImage($0) }
        .compactMap { $0 }
        .flatMap(maxPublishers: .max(3)) { self.addImage($0) },
    addVideoRequestSubject
        .flatMap(maxPublishers: .max(3)) { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)

I've also tried to limit concurrency with help of OperationQueue. But maxConcurrentOperationCount seems doesn't have an effect.

Publishers.Merge(
    addImageRequestSubject
        .receive(on: imageCompressionQueue)
        .flatMap { self.compressImage($0) }
        .compactMap { $0 }
        .receive(on: mediaAddingQueue)
        .flatMap { self.addImage($0) },
    addVideoRequestSubject
        .receive(on: mediaAddingQueue)
        .flatMap { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)

private lazy var imageCompressionQueue: OperationQueue = {
    var queue = OperationQueue()
    queue.maxConcurrentOperationCount = 3

    return queue
}()

private lazy var mediaAddingQueue: OperationQueue = {
    var queue = OperationQueue()
    queue.maxConcurrentOperationCount = 3

    return queue
}()

Flat map publishers look this way:

func compressImage(_ image: UIImage) -> Future<Data?, Never> {
    Future { promise in
        DispatchQueue.global().async {
            let result = image.compressTo(15)?.jpegData(compressionQuality: 1)
            promise(Result.success(result))
        }
    }
}
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

You have stumbled very beautifully right into the use case for the .buffer operator. Its purpose is to compensate for .flatMap backpressure by accumulating values that would otherwise be dropped.

I will illustrate by a completely artificial example:

class ViewController: UIViewController {
    let sub = PassthroughSubject<Int,Never>()
    var storage = Set<AnyCancellable>()
    var timer : Timer!
    override func viewDidLoad() {
        super.viewDidLoad()
        sub
            .flatMap(maxPublishers:.max(3)) { i in
                return Just(i)
                    .delay(for: 3, scheduler: DispatchQueue.main)
                    .eraseToAnyPublisher()
            }
            .sink { print($0) }
            .store(in: &storage)
        
        var count = 0
        self.timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) { 
            _ in
            count += 1
            self.sub.send(count)
        }
    }
}

So, our publisher is emitting an incremented integer every second, but our flatMap has .max(3) and takes 3 seconds to republish a value. The result is that we start to miss values:

1
2
3
5
6
7
9
10
11
...

The solution is to put a buffer in front of the flatMap. It needs to be large enough to hold any missed values long enough for them to be requested:

        sub
            .buffer(size: 20, prefetch: .keepFull, whenFull: .dropOldest)
            .flatMap(maxPublishers:.max(3)) { i in

The result is that all the numeric values do in fact arrive at the sink. Of course in real life we could still lose values if the buffer is not large enough to compensate for disparity between the rate of value emission from the publisher and the rate of value emission from the backpressuring flatMap.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...