Release Laravel jobs in queue without increasing attempts

 Reading time ~10 minutes

Heads up: this article is over a year old. Some information might be out of date, as I don't always update older articles.

Introduction

I’m currently working on a platform for processing videoclips. Our job is to extract as much information as possible (e.g. detect tags, detect faces, get dominant colors, etc.) out of each clip. Most of the heavy lifting is delegated to 3rd-party services, such as AWS Rekognition and Google Video Intelligence, which can provide the most “reliable” and cheap Machine Learning platform for this kind of tasks.

Without going too much into details, target videoclips are selected on the platform and submitted to a pipeline. Each step of the pipeline performs a specific job on the clip and each step is carried out sequentially, as soon as the previous one finishes. The majority of these steps are asynchronous, meaning that the processing request is sent to a 3rd-party service and periodically (or after some kind of notification) the service is queried for the response.

The entire pipeline is built using queued jobs on a Redis queue. Let’s try to use some array notation to visualize the pipeline. Each item of the array contains the name of the service, the operation that it performs and if such operation is synchronous (it finishes in the same iteration) or asynchronous.

[
    ['service' => 'Zencoder', 'operation' => 'encode', 'type' => 'asynchronous'],
    ['service' => 'Mediainfo', 'operation' => 'extractMetadata', 'type' => 'synchronous'],
    ['service' => 'GoogleVideo', 'operation' => 'labelDetection', 'type' => 'asynchronous'],
    ['service' => 'AWSRekognition', 'operation' => 'detectObjects', 'type' => 'asynchronous'],
    ...
]

Warning: This notation does not match the actual source code and it is written solely for the purpose of this post.

The Problem

When dealing with 3rd-party services you’re always subjected to throttled or rate-limited requests. There are no exceptions.

Amazon Rekognition Video, for example “supports a maximum of 20 concurrent jobs per account” (source). This means that we can submit up to 20 jobs, but then we have to wait for those jobs to complete before sending any other in.

This problem can be approached in different ways, with different degrees of complexity. We decided to consider the following solutions:

  1. Submit 20 jobs. As soon as a job completes in the AWS Rekognition step, submit the next one in the pipeline.
  2. Submit all jobs. Wait as soon as a bottleneck is detected, then keep trying with some kind of backoff algorithm.

The first solution is easy to implement, but it’s inefficient because we’re wasting processing slots when waiting for the AWS Rekognition platform to process its jobs. For example in the following case the Zencoder service is free, Mediainfo and GoogleVideo can accept more jobs, but we cannot submit new clips because potentially we could hit the limit on the AWSRekognition service.

[
    ['service' => 'Zencoder'],       // free
    ['service' => 'Mediainfo'],      // 2 processing, but can accept more
    ['service' => 'GoogleVideo'],    // 3 processing, but can accept more
    ['service' => 'AWSRekognition'], // 15 processing, 0 free (cannot submit new jobs)
    ...
]

The ideal solution would be the second one. We really want to push all of our videoclip processing jobs in the queue, without having to deal with this sort of things in advance. The queue should take care of it by itself, slowing down when there are no slots available.

Queued Jobs in Laravel

You might be already familiar of how Job classes look like in Laravel. Normally they contain only a handle method which is called when the job is processed by the queue.

Mohamed Said wrote a really interesting article about how queue workers work under the hood. Check it out!

Let’s take a look at the initial implementation of our ProcessingJob class, without any rate limiting mechanism. Again the entire code shown here has been heavily edited for the purpose of this post.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;

class ProcessingJob implements ShouldQueue
{
    use Dispatchable, Queueable, SerializesModels;

    /**
     * Model that stores the information
     * about the current processing step.
     *
     * @var ProcessingStep
     */
    public $step;

    /**
     * Create a new job instance.
     *
     * @param ProcessingStep $step
     * @return void
     */
    public function __construct(ProcessingStep $step)
    {
        $this->step = $step;
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        // create a new instance of the Class
        // that is going to process the step
        $processor = app()->make($this->step->service);

        // if this step has not been submitted before to
        // the 3rd party service, we need to submit it
        // otherwise we can try to get the results
        if (! $this->step->hasBeenSubmitted())
        {
            $updatedStep = $processor->start($this->step);
        }
        else
        {
            $updatedStep = $processor->update($this->step);
        }

        // if the updated step has been completed
        // then dispatch the next one in the pipeline
        if ($updatedStep->isCompleted() and $updatedStep->hasNextStep())
        {
            ProcessingJob::dispatch($updatedStep->getNextStep());

            return;
        }

        // if this step somehow failed,
        if ($updatedStep->isFailed())
        {
            // block the entire pipeline
            // notify admin and handle error
            // ...

            return;
        }
    }
}

Now let’s update the code to account for the AWS Rekognition limit.

The number of jobs pushed to AWS Rekognition is tracked using a specific counter in Redis, which gets incremented and decremented.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<?php

use Illuminate\Support\Facades\Redis;

[...]

public function handle()
{
    $key = 'my-key';

    $processor = app()->make($this->step->service);

    if (! $this->step->hasBeenSubmitted())
    {
        if ($this->step->service === 'AWSRekognition')
        {
            if ((int) Redis::get($key) < 20)
            {
                Redis::incr($key);

                $updatedStep = $processor->start($this->step);
            }
        }
        else
        {
            $updatedStep = $processor->start($this->step);
        }

    }
    else
    {
        $updatedStep = $processor->update($this->step);

        if (
            $updatedStep->service === 'AWSRekognition'
            and
            ($updatedStep->isFailed() or $updatedStep->isCompleted())
        )
        {
            Redis::decr($key);
        }
    }

    [...]
}

The code is pretty straightforward. If we need to process the step that uses the AWSRekognition service, we have to check beforehand the value of the counter in Redis. If it is lower than 20 (L17) we can increment (L19) it and proceed , otherwise…

Yeah, otherwise? 🤔

It’s obvious. We have a problem. The handle method ends without throwing any exception. Laravel considers this job as completed and removes it from the queue, our clip is never submitted to AWS Rekognition. Do we really have to try again manually? Checking beforehand that the counter value is lower than 20? Definitely it’s not what we want.

Release the Job and specify Max Job Attempts

The first countermeasure that we can set up is release the job back into the queue, if we are not allowed to interact with AWS Rekognition because we already reached the limit. We need to use the InteractsWithQueue trait inside our class, which offers the release method for this purpose.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<?php

use Illuminate\Queue\InteractsWithQueue;

[...]

class ProcessingJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    [...]

    public function handle()
    {
        $key = 'my-key';

        $processor = app()->make($this->step->service);

        if (! $this->step->hasBeenSubmitted())
        {
            if ($this->step->service === 'AWSRekognition')
            {
                if ((int) Redis::get($key) < 20)
                {
                    Redis::incr($key);

                    $updatedStep = $processor->start($this->step);
                }
                else
                {
                    // Lock not acquired! Release the job back
                    // into the queue. It will be retried after
                    // the delay specified in seconds.
                    $this->release(180);
                }
            }
            else
            {
                $updatedStep = $processor->start($this->step);
            }
        }

        [...]
    }
}

This option gives us more space, however it does not fully solve our problems because the job will be retried up to the maximum number of retries. As a matter of fact Laravel reccommends to specify the maximum number of times a job may be attempted, either via the --tries switch on the Artisan command line or via the public $tries property on the job class itself. Running a job indefinitely is definitely bad practice.

In this specific case, if we exhaust the number of attempts before having a free slot on AWS Rekognition we are stuck in the same situation as before.

There’s no way for us to know the correct number of attempts and the correct delay value for releasing jobs. In my experience AWS Rekognition jobs commonly take 3-4 minutes to complete on 10-seconds clips, but that time can go up to 20-25 minutes when you have several concurrent jobs already submitted.

Is this Avalon?

As far as I can tell, this problem is fairly common but an official solution has not been proposed yet.

I’m adding my solution based on the discussion carried out in the previous links. I’m not implying that what I’m going to write this post can be considered as the final solution, but it definitely worked in our case. I’m aware that this is some kind of hack, but it feels a bit cleaner and more the-Laravel-way than using LUA scripts to interact directly with Redis.

“What if we are able to catch when a Job runs out of attempts and instead of marking it as failed, we dispatch the job again in the queue?”

Luckily for us, Laravel allows to define a failed method, directly on the Job class, that can be used to perform job specific clean-up when a failure occurs. The Exception that caused the job to fail will be passed to the failed method as an argument.

In our case we can check if the exception is an instance of MaxAttemptsExceededException and if the service of the current step is AWSRekognition. If both are true we can delete the current job and dispatch a new one on the same queue after some delay.

<?php

use Illuminate\Queue\MaxAttemptsExceededException;

class ProcessingJob implements ShouldQueue
{
    /**
     * The number of times the job may be attempted.
     *
     * @var int
     */
    public $tries = 3;

    // handle implementation
    [...]

    /**
     * Handle job exception.
     *
     * @param Exception $exception
     */
    public function failed(Exception $exception)
    {
        if (
            $exception instanceof MaxAttemptsExceededException
            and
            $this->step->service === 'AWSRekognition'
        )
        {
            $this->delete();

            $this->dispatch($this->step)
                ->onConnection($this->connection)
                ->onQueue($this->queue)
                ->delay(180); // 3 minutes

            return;
        }

        // handle a valid pipeline failure
        [...]
    }
}

This approach however has its own drawbacks:

  • attempts are set back to zero every time a new job is dispatched
  • potentially the job could continue to run forever

In the first case we are not interested in the number of attempts. We really don’t care if the clip is submitted to AWS Rekognition after 1 retry or after 100.

The second point is more relevant and is in contrast with what I wrote early in the post: running a job indefinitely is a bad practice. However our code is written such as all exceptions are caught at a higher level, inside the start and update methods of each processor service. Failures are then handled separately.

Therefore the likelyhood of having an error inside the ProcessorJob is really low, thanks also to an extensive test suite.


Last Note about Rate Limiting

You might ask yourself why we are using a Redis counter manually to throttle our request, instead of using one of the methods provided by Laravel for this purpose.

Throttle

The throttle method allows a given type of job to only run a predefined number of times during a specific period of time. If a lock can not be obtained, you should typically release the job back onto the queue so it can be retried later.

<?php

Redis::throttle('key')->allow(10)->every(60)->then(function () {
    // Job logic...
}, function () {
    // Could not obtain lock...

    return $this->release(10);
});

This method definitely does not help us because we don’t know in advance how long is going to take the job on AWS Rekognition. We can come up with good estimates for those values, but still there is the likelihood of failure.

Funnel

Alternatively, the funnel method allows you to specify the maximum number of workers that may simultaneously process a given job.

<?php

Redis::funnel('key')->limit(1)->then(function () {
    // Job logic...
}, function () {
    // Could not obtain lock...

    return $this->release(10);
});

However this method does work in our case because our problem is not the concurrency on the local queue, but instead on AWS Rekognition. Even a single worker that tries to submit a clip to AWS Rekognition, where 20 clips are still being processed, is going to cause an exception.

comments powered by Disqus

Building a pagination component in Svelte

Introduction - What is Svelte?

Svelte is a (not so) new framework for building User Interfaces. It borrows some ideas from …