vapor/queues

ScheduledJob management [Draft]

maximkrouk opened this issue · 3 comments

Now it's not possible to edit the schedule of a job or to remove a job from the queue.

jdmcd commented

Just to make sure I'm understanding the question right, are you talking about scheduling a new job on the fly, not at startup? If so, then yes, I don't believe that's possible right now since we capture all of the scheduled jobs on startup.

Yep, I mean on the fly. Probably there is already a workaround: app.queues.schedule() can accept a builder, which can be stored in app.storage for example and probably can be used to change the job schedule. Also, a wrapper-job with optional action or some sort of flag can be implemented by the user and be stored too to fulfill a need to stop/continue the execution.
It's not very convenient to store multiple objects for the management of one job. So the user can create another wrapper. I'm not sure, but I'll try to use something like this

import Queues
import Vapor

extension Application.Queues {
    func schedule(_ task: Task) -> TaskContainer { .init(task, schedule(task)) }
}

final class TaskContainer {
    let execute: ScheduleBuilder
    let task: Task
    init(_ task: Task, _ scheduleBuilder: ScheduleBuilder) {
        self.task = task
        self.execute = scheduleBuilder
    }
    
    func store(in variable: inout TaskContainer) {
        variable = self
    }
    
    func store<Key: StorageKey>(in storage: inout Storage, forKey key: Key.Type)
    where Key.Value == TaskContainer {
        storage[key] = self
    }
}

final class Task: ScheduledJob {
    private var _action: (() -> Void)?
    private let lock: Lock = .init()
    
    private init(action: (() -> Void)?) {
        self._action = action
    }
    
    init(action: @escaping () -> Void) {
        self._action = action
    }
    
    func cancel() {
        lock.withLock {
            self._action = .none
        }
    }
    
    func `override`(with task: Task) {
        lock.withLock {
            self._action = task._action
        }
    }
    
    @inlinable
    func run(context: QueueContext) -> EventLoopFuture<Void> {
        _action?()
        return context.eventLoop.future(())
    }
    
    @inlinable
    static func empty() -> Task { .init(action: .none) }
}

and if it works maybe it would be nice to have some kind of manager object out-of-the-box. 🌚

Usage

// initial setup

app.queues
    .schedule(.sendPushnotifications(app))
    .store(in: &app.storage, forKey: Task.StorageKeys.SendPushNotifications.self)
    .execute.weekly().on(.monday).at(.noon)

// edits

app.storage[Task.StorageKeys.SendPushNotifications.self].map { container in
    container.execute.minutely().at(0)
}

app.storage[Task.StorageKeys.SendPushNotifications.self].map { container in
    container.task.cancel()
}

app.storage[Task.StorageKeys.SendPushNotifications.self].map { container in
    container.task.override(with: .sendPushNotifications(app))
}
3a4oT commented

+1 for this functionality.