Introduction
I’m currently working on a platform for processing videoclips. Our job is to extract as much information as possible (e.g. detect tags, detect faces, get dominant colors, etc.) out of each clip. Most of the heavy lifting is delegated to 3rd-party services, such as AWS Rekognition and Google Video Intelligence, which can provide the most “reliable” and cheap Machine Learning platform for this kind of tasks.
Without going too much into details, target videoclips are selected on the platform and submitted to a pipeline. Each step of the pipeline performs a specific job on the clip and each step is carried out sequentially, as soon as the previous one finishes. The majority of these steps are asynchronous, meaning that the processing request is sent to a 3rd-party service and periodically (or after some kind of notification) the service is queried for the response.
The entire pipeline is built using queued jobs on a Redis queue. Let’s try to use some array notation to visualize the pipeline. Each item of the array contains the name
of the service, the operation
that it performs and if such operation is synchronous (it finishes in the same iteration) or asynchronous.
[
['service' => 'Zencoder', 'operation' => 'encode', 'type' => 'asynchronous'],
['service' => 'Mediainfo', 'operation' => 'extractMetadata', 'type' => 'synchronous'],
['service' => 'GoogleVideo', 'operation' => 'labelDetection', 'type' => 'asynchronous'],
['service' => 'AWSRekognition', 'operation' => 'detectObjects', 'type' => 'asynchronous'],
...
]
Warning: This notation does not match the actual source code and it is written solely for the purpose of this post.
The Problem
When dealing with 3rd-party services you’re always subjected to throttled or rate-limited requests. There are no exceptions.
Amazon Rekognition Video, for example “supports a maximum of 20 concurrent jobs per account” (source). This means that we can submit up to 20 jobs, but then we have to wait for those jobs to complete before sending any other in.
This problem can be approached in different ways, with different degrees of complexity. We decided to consider the following solutions:
- Submit 20 jobs. As soon as a job completes in the AWS Rekognition step, submit the next one in the pipeline.
- Submit all jobs. Wait as soon as a bottleneck is detected, then keep trying with some kind of backoff algorithm.
The first solution is easy to implement, but it’s inefficient because we’re wasting processing slots when waiting for the AWS Rekognition platform to process its jobs. For example in the following case the Zencoder
service is free, Mediainfo
and GoogleVideo
can accept more jobs, but we cannot submit new clips because potentially we could hit the limit on the AWSRekognition
service.
[
['service' => 'Zencoder'], // free
['service' => 'Mediainfo'], // 2 processing, but can accept more
['service' => 'GoogleVideo'], // 3 processing, but can accept more
['service' => 'AWSRekognition'], // 15 processing, 0 free (cannot submit new jobs)
...
]
The ideal solution would be the second one. We really want to push all of our videoclip processing jobs in the queue, without having to deal with this sort of things in advance. The queue should take care of it by itself, slowing down when there are no slots available.
Queued Jobs in Laravel
You might be already familiar of how Job classes look like in Laravel. Normally they contain only a handle
method which is called when the job is processed by the queue.
Mohamed Said wrote a really interesting article about how queue workers work under the hood. Check it out!
Let’s take a look at the initial implementation of our ProcessingJob
class, without any rate limiting mechanism. Again the entire code shown here has been heavily edited for the purpose of this post.
|
|
Now let’s update the code to account for the AWS Rekognition limit.
The number of jobs pushed to AWS Rekognition is tracked using a specific counter in Redis, which gets incremented and decremented.
|
|
The code is pretty straightforward. If we need to process the step that uses the AWSRekognition
service, we have to check beforehand the value of the counter in Redis. If it is lower than 20 (L17
) we can increment (L19
) it and proceed , otherwise…
Yeah, otherwise? 🤔
It’s obvious. We have a problem. The handle
method ends without throwing any exception. Laravel considers this job as completed and removes it from the queue, our clip is never submitted to AWS Rekognition. Do we really have to try again manually? Checking beforehand that the counter value is lower than 20? Definitely it’s not what we want.
Release the Job and specify Max Job Attempts
The first countermeasure that we can set up is release the job back into the queue, if we are not allowed to interact with AWS Rekognition because we already reached the limit. We need to use the InteractsWithQueue
trait inside our class, which offers the release
method for this purpose.
|
|
This option gives us more space, however it does not fully solve our problems because the job will be retried up to the maximum number of retries. As a matter of fact Laravel reccommends to specify the maximum number of times a job may be attempted, either via the --tries
switch on the Artisan command line or via the public $tries
property on the job class itself. Running a job indefinitely is definitely bad practice.
In this specific case, if we exhaust the number of attempts before having a free slot on AWS Rekognition we are stuck in the same situation as before.
There’s no way for us to know the correct number of attempts and the correct delay value for releasing jobs. In my experience AWS Rekognition jobs commonly take 3-4 minutes to complete on 10-seconds clips, but that time can go up to 20-25 minutes when you have several concurrent jobs already submitted.
Is this Avalon?
As far as I can tell, this problem is fairly common but an official solution has not been proposed yet.
- https://laracasts.com/discuss/channels/laravel/release-a-job-back-on-the-queue-without-increasing-the-number-of-tries
- https://laravel.io/forum/redis-rate-limiting-releasing-jobs-without-1-attempt
- https://github.com/laravel/ideas/issues/1381
- https://github.com/laravel/ideas/issues/735
I’m adding my solution based on the discussion carried out in the previous links. I’m not implying that what I’m going to write this post can be considered as the final solution, but it definitely worked in our case. I’m aware that this is some kind of hack, but it feels a bit cleaner and more the-Laravel-way than using LUA scripts to interact directly with Redis.
“What if we are able to catch when a Job runs out of attempts and instead of marking it as failed, we dispatch the job again in the queue?”
Luckily for us, Laravel allows to define a failed
method, directly on the Job class, that can be used to perform job specific clean-up when a failure occurs. The Exception
that caused the job to fail will be passed to the failed
method as an argument.
In our case we can check if the exception is an instance of MaxAttemptsExceededException
and if the service of the current step is AWSRekognition
. If both are true
we can delete the current job and dispatch a new one on the same queue after some delay.
<?php
use Illuminate\Queue\MaxAttemptsExceededException;
class ProcessingJob implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 3;
// handle implementation
[...]
/**
* Handle job exception.
*
* @param Exception $exception
*/
public function failed(Exception $exception)
{
if (
$exception instanceof MaxAttemptsExceededException
and
$this->step->service === 'AWSRekognition'
)
{
$this->delete();
$this->dispatch($this->step)
->onConnection($this->connection)
->onQueue($this->queue)
->delay(180); // 3 minutes
return;
}
// handle a valid pipeline failure
[...]
}
}
This approach however has its own drawbacks:
- attempts are set back to zero every time a new job is dispatched
- potentially the job could continue to run forever
In the first case we are not interested in the number of attempts. We really don’t care if the clip is submitted to AWS Rekognition after 1 retry or after 100.
The second point is more relevant and is in contrast with what I wrote early in the post: running a job indefinitely is a bad practice. However our code is written such as all exceptions are caught at a higher level, inside the start
and update
methods of each processor service. Failures are then handled separately.
Therefore the likelyhood of having an error inside the ProcessorJob
is really low, thanks also to an extensive test suite.
Last Note about Rate Limiting
You might ask yourself why we are using a Redis counter manually to throttle our request, instead of using one of the methods provided by Laravel for this purpose.
Throttle
The throttle
method allows a given type of job to only run a predefined number of times during a specific period of time. If a lock can not be obtained, you should typically release the job back onto the queue so it can be retried later.
<?php
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Job logic...
}, function () {
// Could not obtain lock...
return $this->release(10);
});
This method definitely does not help us because we don’t know in advance how long is going to take the job on AWS Rekognition. We can come up with good estimates for those values, but still there is the likelihood of failure.
Funnel
Alternatively, the funnel
method allows you to specify the maximum number of workers that may simultaneously process a given job.
<?php
Redis::funnel('key')->limit(1)->then(function () {
// Job logic...
}, function () {
// Could not obtain lock...
return $this->release(10);
});
However this method does work in our case because our problem is not the concurrency on the local queue, but instead on AWS Rekognition. Even a single worker that tries to submit a clip to AWS Rekognition, where 20 clips are still being processed, is going to cause an exception.