Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
190 views
in Technique[技术] by (71.8m points)

swift - Custom Combine RateLimitedScheduler

I'm trying to implement a rate limiting scheduler in combine. The docs aren't great for Scheduler so I'm making some assumptions about how they operate internally. I'm assuming that the schedule function gets called for each event in the stream but that's not what I'm seeing in practice. I'm only seeing a couple of calls to that function in a stream of many more events.

Here is my implementation which is doing effectively nothing right now and allowing all events through...

public class RateLimitedScheduler: Scheduler {
    public typealias SchedulerTimeType = DispatchQueue.SchedulerTimeType
    public typealias SchedulerOptions = DispatchQueue.SchedulerOptions
    
    // MARK: API
    
    public var now: SchedulerTimeType { queue.now }
    public var minimumTolerance: SchedulerTimeType.Stride { queue.minimumTolerance }

    public func schedule(options: SchedulerOptions? = nil, _ action: @escaping () -> Void) {
        queue.schedule(after: nextDeadline(), tolerance: .init(floatLiteral: period / 10), options: options, action)
    }

    public func schedule(after date: SchedulerTimeType, tolerance: SchedulerTimeType.Stride, options: SchedulerOptions? = nil, _ action: @escaping () -> Void) {
        let deadline = max(nextDeadline(), date)
        queue.schedule(after: deadline, tolerance: tolerance, options: options, action)
    }

    public func schedule(after date: SchedulerTimeType,
                  interval: SchedulerTimeType.Stride,
                  tolerance: SchedulerTimeType.Stride,
                  options: SchedulerOptions? = nil,
                  _ action: @escaping () -> Void) -> Cancellable {
        let deadline = max(nextDeadline(), date)
        return queue.schedule(after: deadline, interval: interval, tolerance: tolerance, options: options, action)
    }
    
    // MARK: Initialization
    
    public init(maxEvents: Int, period: TimeInterval, queue: DispatchQueue = .main) {
        self.maxEvents = maxEvents
        self.period = period
        self.queue = queue
    }
    
    // MARK: Private
    
    private let maxEvents: Int
    private let period: TimeInterval
    private let queue: DispatchQueue

    private var eventCount = 0
    private var windowStartTime = DispatchTime.now()

    private func nextDeadline() -> SchedulerTimeType {
        let now = DispatchTime.now()

        if eventCount < maxEvents {
            eventCount += 1
            return SchedulerTimeType(max(windowStartTime, now))
        }

        let nextStartTime = windowStartTime + period
        eventCount = 1
        
        if now > nextStartTime {
            windowStartTime = now
            return SchedulerTimeType(max(windowStartTime, now))
        }
        
        windowStartTime = nextStartTime
            
        return SchedulerTimeType(max(nextStartTime, now))
    }
}

And to test:

let cancellable = Array(repeating: 0, count: 20).publisher
    .map { $0 * 1 }
    .subscribe(on: RateLimitedScheduler(maxEvents: 5, period: 1))
    .sink(receiveValue: { _ in
        print(Date())
    })

My algorithm for nextDeadline definitely works as I've tested it outside of the scheduler so that's not the issue.

So my questions are:

  1. Is my assumption correct that each event should fire the schedule function in the scheduler? And if not, why not?
  2. Can anyone help figure this out or provide me with an alternative?
question from:https://stackoverflow.com/questions/65950123/custom-combine-ratelimitedscheduler

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...