I'm trying to implement a rate limiting scheduler in combine. The docs aren't great for Scheduler so I'm making some assumptions about how they operate internally. I'm assuming that the schedule
function gets called for each event in the stream but that's not what I'm seeing in practice. I'm only seeing a couple of calls to that function in a stream of many more events.
Here is my implementation which is doing effectively nothing right now and allowing all events through...
public class RateLimitedScheduler: Scheduler {
public typealias SchedulerTimeType = DispatchQueue.SchedulerTimeType
public typealias SchedulerOptions = DispatchQueue.SchedulerOptions
// MARK: API
public var now: SchedulerTimeType { queue.now }
public var minimumTolerance: SchedulerTimeType.Stride { queue.minimumTolerance }
public func schedule(options: SchedulerOptions? = nil, _ action: @escaping () -> Void) {
queue.schedule(after: nextDeadline(), tolerance: .init(floatLiteral: period / 10), options: options, action)
}
public func schedule(after date: SchedulerTimeType, tolerance: SchedulerTimeType.Stride, options: SchedulerOptions? = nil, _ action: @escaping () -> Void) {
let deadline = max(nextDeadline(), date)
queue.schedule(after: deadline, tolerance: tolerance, options: options, action)
}
public func schedule(after date: SchedulerTimeType,
interval: SchedulerTimeType.Stride,
tolerance: SchedulerTimeType.Stride,
options: SchedulerOptions? = nil,
_ action: @escaping () -> Void) -> Cancellable {
let deadline = max(nextDeadline(), date)
return queue.schedule(after: deadline, interval: interval, tolerance: tolerance, options: options, action)
}
// MARK: Initialization
public init(maxEvents: Int, period: TimeInterval, queue: DispatchQueue = .main) {
self.maxEvents = maxEvents
self.period = period
self.queue = queue
}
// MARK: Private
private let maxEvents: Int
private let period: TimeInterval
private let queue: DispatchQueue
private var eventCount = 0
private var windowStartTime = DispatchTime.now()
private func nextDeadline() -> SchedulerTimeType {
let now = DispatchTime.now()
if eventCount < maxEvents {
eventCount += 1
return SchedulerTimeType(max(windowStartTime, now))
}
let nextStartTime = windowStartTime + period
eventCount = 1
if now > nextStartTime {
windowStartTime = now
return SchedulerTimeType(max(windowStartTime, now))
}
windowStartTime = nextStartTime
return SchedulerTimeType(max(nextStartTime, now))
}
}
And to test:
let cancellable = Array(repeating: 0, count: 20).publisher
.map { $0 * 1 }
.subscribe(on: RateLimitedScheduler(maxEvents: 5, period: 1))
.sink(receiveValue: { _ in
print(Date())
})
My algorithm for nextDeadline
definitely works as I've tested it outside of the scheduler so that's not the issue.
So my questions are:
- Is my assumption correct that each event should fire the schedule function in the scheduler? And if not, why not?
- Can anyone help figure this out or provide me with an alternative?
question from:
https://stackoverflow.com/questions/65950123/custom-combine-ratelimitedscheduler 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…