ksassnowski/venture

Mark a job as gate inside the job

Opened this issue · 17 comments

Hi!

First of all - awesome package!

I have this job, where depending on certain criteria, I would like to mark the job as gated for manual review:

class MyJob implements WorkflowableJob
{
    use WorkflowStep;

     //...
     
      private Document $document;

      public function __construct(Document $document)
      {
          $this->document = $document;
      }    

      public function handle()
      {
           if($this->document->classification === 'other'){
               //Gate the job...?
           }
           
           //Otherwise, complete the job as normal.
      }
}

Please note that $this->document->classification value is set earlier in the workflow by another job, and is not known at the time the workflow is triggered.

Hi @oliverbj!

You may be able to do this by marking the job as gated inside of the job via $this->step()->markAsGated():

public function handle()
{
    if ($this->document->classification === 'other') {
        return $this->step()->markAsGated();
    }

    // ...
}

Then, you'll have to retrieve the workflow job and manually start it after manual review:

$workflow = Workflow::findOrFail(1);

$job = $workflow->jobs->findOrFail(1);

$job->start();

Can you give that a shot?

I haven't tested this yet, but I would assume that marking the job as gated inside the job's own handle method won't work. Because once the handle method finishes without an error, Venture will consider the job to be completed.

Hi both. Thanks for a quick reply!

@stevebauman I was not able to use the markAsGated. I receive this error:

Only gated jobs can be marked as gated

FYI: This is my workflow definition:

return $this->define('Publish new document')
                    ->addJob(new GetPageCount($this->document))
                    ->addJob(new CountDocumentTokens($this->document));

And then inside the CountDocumentTokens() job:

public function handle()
{
    $condition = true;
    if($condition) {
        return $this->step()->markAsGated();
    }

    Log::info('This is being processed...');

}

Are there any ways around this - I think being able to dynamically mark job to a certain state is quite useful for complex business logic.

I was thinking of creating a plugin, but not sure if this would be the best option?

I was thinking something like this:

<?php

//Plugin
use Sassnowski\Venture\AbstractWorkflow;
use Sassnowski\Venture\Events\JobCreating;
use Sassnowski\Venture\Plugin\Plugin;
use Sassnowski\Venture\Plugin\PluginContext;

class GatingPlugin implements Plugin
{
    public function install(PluginContext $context): void
    {
        $context->onJobCreating(function (JobCreating $event) {
            $job = $event->job;

            if($job instanceof GatableJob && $job->isGatable()) {
                // Mark the job as gated
                $event->job->step()->markAsGated();

                // Fail the workflow
                $event->workflow->fail();
            }
        });
    }
}

//Interface:
interface GatableJob
{
    public function isGatable(): bool;
}

This will mark the job as gated, then fail it to stop it from processing the rest of the jobs.

Do we have an option to resume a "failed" job?

I think what we should do is change this line in the WorkflowEventSubscriber to check for $event->job->isDeletedOrReleased() instead. I'm happy to accept a PR for that.

This way, you would be able to call $event->job->delete() to stop the job from being processed after marking it as gated.

- $context->onJobCreating(function (JobCreating $event) {
+ $context->onJobProcessing(function (JobProcessing $event) {
    $job = $event->job;

    if($job instanceof GatableJob && $job->isGatable()) {
        // Mark the job as gated
        $job->step()->markAsGated();

-       $event->workflow->fail();
+       $job->delete();
    }
});

This wouldn't fail the workflow and everything would resume normally once you manually start the gated job again.

You should probably also listen on the JobProcessing event instead. This event gets fired after the job was picked up by a queue worker. JobCreating is called when you call the addJob method on a WorkflowDefinition. Since you want to mark a job as gated at runtime, that's probably not what you want.

That being said, I'm just writing this off the top of my head. I haven't actually tested this. So it's probably worth it to verify that my suggestion actually works as expected before creating a PR for it.

@ksassnowski Sounds like a much better plan. I will start test and hopefully a PR will emerge. Thank you!!!

I changed the line in the WorkflowEventSubscriber to use isDeletedOrReleased(), and did the changes in the plugin. That gives me this error:

Only gated jobs can be marked as gated

If I explicit set the job to gated before calling markAsGated(), the workflow is running without errors.

    public function install(PluginContext $context): void
    {
        $context->onJobProcessing(function (JobProcessing $event) {
            $job = $event->job;
            if($job instanceof GateableJob && $job->isGateable()) {

                $job->step()->update(['gated' => true]); //<-- I added this line.
                $job->step()->markAsGated();

                // Delete the job from the queue
                $job->delete();
            }
        });
    }

However, the gated job is still being executed, as I see This is being processed... in my log file:

class CountDocumentTokens implements WorkflowableJob, GateableJob
{
    use Dispatchable, InteractsWithQueue, SerializesModels, WorkflowStep;

    /**
     * Execute the job.
     */
    public function handle()
    {
        Log::info('This is being processed...');
    }

    public function isGateable(): bool
    {
        return true;
    }
}

I would expect that since isGateable() is returning true, that the job wouldn't execute the handle() method - but perhaps I am reading the events wrong.

Ok, so there's a couple things going on.

First of all, I linked the wrong line in the WorkflowEventSubscriber since that part of the code is dealing with the JobProcessed event which happens after the job was run.

What we actually need to do is update the withWorkflowJob method in the same file to look like this:

private function withWorkflowJob(
    JobProcessing|JobProcessed|JobFailed $event,
    Closure $callback,
): void {
    $jobInstance = $this->jobExtractor->extractWorkflowJob($event->job);

    if (null !== $jobInstance) {
+        $this->setJobInstanceIfNecessary($event->job, $jobInstance);
+
        $callback($jobInstance);
    }
}

+ private function setJobInstanceIfNecessary(
+   Job $job,
+   WorkflowableJob $instance,
+ ): void {
+    if (\in_array(InteractsWithQueue::class, class_uses_recursive($instance))) {
+        $instance->setJob($job);
+    }
+ }

This is because the delete method of the InteractsWithQueue trait requires that a Laravel queue job instance has been set on the job itself. Normally, this happens inside Laravel's CallQueuedHandler class. The issue is that this happens after the JobProcessing event was fired. That's why we have to set the job instance ourselves in order to be able to delete a job before it gets fired.

The second thing I noticed during my testing is that the sync queue actually works slightly differently than the other queue connections. Normally, the queue worker checks if the job has been deleted after firing the JobProcessing event but before actually handling the job itself. If it was deleted, the job will not get handled. Notably, the sync is missing this check if the job has been deleted. This means that calling delete on the job won't have any effect. This looks like an oversight to me and I will open an issue for it on the Laravel repository itself.

After I made the changes outlined above I could then call delete from inside the plugin and the job would not get handled as expected.

I tried to make the suggested changes in the WorkflowEventSubscriber and change my queue connection to use the database instead of sync, but I am still not able to get the GatingPlugin to work and stop the gated job from being executed.

Running queue:listen and having the plugin installed like this:

//AppServiceProvider.php
Venture::registerPlugin(
    GatingPlugin::class,
);

Then my GatingPlugin file.

class GatingPlugin implements Plugin
{
    public function install(PluginContext $context): void
    {
        Log::info('Installing GatingPlugin...');
        $context->onJobProcessing(function (JobProcessing $event) {
            $job = $event->job;
            if($job instanceof GateableJob && $job->isGateable()) {
                $job->step()->update(['gated' => true]);
                $job->step()->markAsGated();

                // Delete the job from the queue
                $job->delete();
            }
        });
    }
}

My second job (CountDocumentTokens) that uses the GatingPlugin has not been changed, and is as above. It just returns true from the isGateable() method. However, the job is being executed as I see the This is being processed... in the log file.

Another thing I noticed was that the message Installing GatingPlugin that I added in the Plugin file is being called every 3-4 second - is this the expected behavior?

I really appreciate you taking the time to dig into this with me.

After digging into this a little more, I'm not sure if this is even possible to implement, unfortunately. Once a job has been dispatched, there are only limited ways in which we can interact with it.

The problem is that we would need some way of determining inside Venture's JobProcessed event listener if the job's handle method was even called or not before it got deleted. Unfortunately, Laravel also deletes the job after it was executed. This means that—as far as I can tell—there's no way for us to differentiate between

  • Job was deleted before it got handled, and
  • Job was deleted after it was handled

That's an issue because we only want to dispatch the next jobs in the workflow in the latter case.

I'll have to think about this a bit more, but it doesn't look promising at the moment.

I just thought of something that could potentially work, although I haven't tested it yet.

We could check ourselves if the job was deleted after we fire our own JobProcessing event here. Because at this point we know for a fact that the job's handle method hasn't been called yet. We could then add some kind of information to the job that we would then be able to inspect inside the JobProcessed listener to help differentiate the two cases mentioned above.

I can't really see a way of not making this a breaking change, however, so I have to think about the specifics a bit more. I also need to decide if I actually want to make a change like that if it only helps in this specific scenario. But perhaps this could be useful in other situations, too.

I see the constraints and that it's actually quite complex. I don't want to burden you - appreciate the work so far! If possible, I think it would make sense for others to be able to set a job as gated inside a specific job, as I often find myself needing this when evaluating business logic. Especially for tasks that require human-in-the-loop.

BUT, I do understand that it might not be possible or it's too big of a change. Could a simple workaround be to simply just handle this in the job's handle method instead?

  /**
   * Execute the job.
   */
  public function handle()
  {

      if($this->isGateable() && !$this->step()->gated) {
          Log::info('This job is gated. Aborting...');

          // Update the step status in the database
          $this->step()->update(['gated' => true]);
          // Mark the job as gated
          $this->step()->markAsGated();


          // The job is aborted without further execution
          return;
      }


      Log::info('This is being processed...');
  }

  public function isGateable(): bool
  {
      return true;
  }

In this case, I set the job as gated only if isGateable is true and that it's not already been gated.

Playing around with this a bit, and it is "working" to some extend. The job is being gated and I can start it from there - but from Venture's perspective, even the gated job is marked as finished (together with the workflow itself). Once the gated job is started, it will also be added twice to the workflows table:

["App\\Jobs\\GetPageCount", "App\\Jobs\\CountDocumentTokens", "App\\Jobs\\CountDocumentTokens"]

EDIT:

If I set the $this->gated directly in the job I want to gate, I think I can gate the job as you normally would by using addGatedJob():

  /**
   * Create a new job instance.
   */
  public function __construct(private Document $document)
  {
      $this->gated = true;
  }

So, my solution right now is to have this as a trait:

trait GateableJob
{
    public function shouldGateJob()
    {
        if($this->gate()) {
            $this->gated = true;
        }
    }
    abstract protected function gate(): bool;
}

Then in my job's constructor, I check if the job should be gated:

/**
 * Create a new job instance.
 */
public function __construct(private Document $document)
{
    $this->shouldGateJob();
}

//And the `gate()` method in the same job class:
  protected function gate(): bool
  {
      return $this->document->classification === 'other';
  }

This seems to work - albeit not the most elegant way.

but from Venture's perspective, even the gated job is marked as finished

Yes, because there is no way for us to tell if a job was "aborted" or finished successfully. All we know is that the handle method finished without any errors. We have no idea what happens inside the handle method.

That's why I think we would have to add some kind of method that adds this additional meta information to the job. This way we could inspect the job inside the JobProcessed event listener to see if the job was actually finished or was aborted/paused/gated. The reason this would most likely be a breaking change is that we would have to store this information somewhere and adding a new column to the workflow_jobs table is definitely a breaking change. We might be able to use the Cache for that, however.

I'll play around with this a bit in the next days, but it might take a while before I actually make any changes.

Thanks @ksassnowski! Not sure if you saw my edits above, but I think it's a doable workaround!

Edit: Figured my way of doing it doesn't work, as the constructor is called immediately when starting the workflow...