Devious SQL: Message Queuing Using Native PostgreSQL
An interesting question came up on the #postgresql IRC channel about how to use native PostgreSQL features to handle queuing behavior. There are existing solutions for queuing, both in PostgreSQL, with the venerable pgq project, or dedicated message queues like RabbitMQ, Kafka, etc. I wanted to explore what could be done with native Postgres primitives and I thought this warranted an entry in my Devious SQL series.
Requirements
So what makes up a minimal queuing solution? Effectively, we need the following:
- a table to hold events or items to be processed
- something to enqueue/put items in the table
- something to dequeue/consume these items
- do so without locking
We also want to ensure that any processing that happens only affects things once. We don't want more than one worker process to charge a credit card, for example, and we also want to make sure that if there is an issue when handling an item that it doesn't disappear without being handled.
So what sorts of primitives can we use in PostgreSQL in order to handle the basic requirements here?
Basics
In this design the processing of these work items happens outside the database. We can structure the design of this queue processor around the following pseudocode:
// sample queue worker pseudocode
dbh = connect_to_postgres()
while (1) {
dbh->begin()
rows = get_batch()
do_something_with_rows(rows)
delete_old_rows(rows)
dbh->commit()
}
Work storage for queuing is trivial is PostgreSQL; we just use a basic table
with the items to be processed. Since we want data to be persistent (i.e., don't
want to lose our work jobs if the server crashed in the middle of processing) we
will want to use a normal table instead of an UNLOGGED
table or other sorts of
optimization.
Adding new tasks to the queue will be accomplished via just straight INSERT
statements. Once we handle the external processing, we will remove the rows
using DELETE
statements. These transactional semantics are easy enough to
adapt to our desired format.
Here is an example simple schema for our table:
CREATE TABLE queue_table (
id int not null primary key generated always as identity,
queue_time timestamptz default now(),
payload text
);
INSERT INTO queue_table (payload) SELECT 'data-' || text(generate_series(1,1000));
CREATE TABLE
INSERT 0 1000
Trickier stuff
Say we want to process batches of 10 with worker processes, but we want to
ensure that we don't double-process items. The naïve approach here is to do a
SELECT * FROM queue_table LIMIT 10
. However, this could result in multiple
workers getting the same list of items to process. This is against our
requirements per the definition, and in practice would be bad. (Some systems can
be designed to deal with message repeats, but by definition, this is not one of
them.)
So what to do?
Row locks to the rescue
Using row locks (a la
SELECT FOR UPDATE
),
we can lock the rows that we are handling in a particular backend. Our query
then looks like:
SELECT * FROM queue_table LIMIT 10 FOR UPDATE;
Let's test it out in two backends. Since row locks only last for the duration of a transaction, we will need to run this query inside each backend in a concurrent transaction.
-- backend 1
BEGIN;
SELECT * FROM queue_table LIMIT 10 FOR UPDATE;
BEGIN
id queue_time payload
1 2021-08-31 10:36:34.872794-05 data-1
2 2021-08-31 10:36:34.872794-05 data-2
3 2021-08-31 10:36:34.872794-05 data-3
4 2021-08-31 10:36:34.872794-05 data-4
5 2021-08-31 10:36:34.872794-05 data-5
6 2021-08-31 10:36:34.872794-05 data-6
7 2021-08-31 10:36:34.872794-05 data-7
8 2021-08-31 10:36:34.872794-05 data-8
9 2021-08-31 10:36:34.872794-05 data-9
10 2021-08-31 10:36:34.872794-05 data-10
-- backend 2
BEGIN;
SELECT * FROM queue_table LIMIT 10 FOR UPDATE;
BEGIN
<hangs>
SKIP LOCKED
As you can see, the second backend hangs. Fortunately, PostgreSQL supports skipping already locked rows and can return the "next" batch of rows in the queue table. Let's adjust our query as follows:
SELECT * FROM queue_table LIMIT 10 FOR UPDATE SKIP LOCKED;
And our subsequent test shows us selecting the concurrent batches that do not overlap:
-- backend 1
BEGIN;
SELECT * FROM queue_table LIMIT 10 FOR UPDATE SKIP LOCKED;
BEGIN
id queue_time payload
1 2021-08-31 10:36:34.872794-05 data-1
2 2021-08-31 10:36:34.872794-05 data-2
3 2021-08-31 10:36:34.872794-05 data-3
4 2021-08-31 10:36:34.872794-05 data-4
5 2021-08-31 10:36:34.872794-05 data-5
6 2021-08-31 10:36:34.872794-05 data-6
7 2021-08-31 10:36:34.872794-05 data-7
8 2021-08-31 10:36:34.872794-05 data-8
9 2021-08-31 10:36:34.872794-05 data-9
10 2021-08-31 10:36:34.872794-05 data-10
-- backend 2
BEGIN;
SELECT * FROM queue_table LIMIT 10 FOR UPDATE SKIP LOCKED;
id queue_time payload
11 2021-08-31 10:36:34.872794-05 data-11
12 2021-08-31 10:36:34.872794-05 data-12
13 2021-08-31 10:36:34.872794-05 data-13
14 2021-08-31 10:36:34.872794-05 data-14
15 2021-08-31 10:36:34.872794-05 data-15
16 2021-08-31 10:36:34.872794-05 data-16
17 2021-08-31 10:36:34.872794-05 data-17
18 2021-08-31 10:36:34.872794-05 data-18
19 2021-08-31 10:36:34.872794-05 data-19
20 2021-08-31 10:36:34.872794-05 data-20
Next steps
So now we have a solution that works to SELECT
individual batches of rows and
hand them out to multiple backends. So how to we deal with processing the rows
and marking them done?
Let's review our pseudocode:
// sample queue worker pseudocode
dbh = connect_to_postgres()
while (1) {
dbh->begin()
rows = get_batch()
do_something_with_rows(rows)
delete_old_rows(rows)
dbh->commit()
}
As you can see from this illustration, we have implemented the get_batch()
functionality. The do_something_with_rows()
routine would be
application-specific handling/processing of the underlying work item. But how do
we delete the rows in question?
Cleanup
If our table has a Primary Key (which good database design dictates that you
should always have), then we could use it to DELETE
the rows we just selected
from the batch. However, there are some potential tradeoffs/issues with this
approach:
- Statements would be built up based on returned data. If you were doing
DELETE FROM queue_table WHERE id = ?
statement for each row, this would become more and more inefficient if you change the batch size from 10 to 100, for example. - Issuing individual
DELETE
statements isn't good, but if you try to improve this by using batch queries you would need to consider the number of rows returned, not just the basic batch size. If you expected to process 10 rows each time, but only had 3 returned, you would need to prepare aDELETE
statement that deleted only 3 records with associated IDs; that, or use a language which supported array bindings and pass the ids in that way.
Either way, we are increasing the complexity when we don't need to; we already
know which rows we want to delete: they are the exact ones that we previously
SELECT
-ed.
There's got to be a better way!
Hmm, let's think. The DELETE
statement is able to return data to us as if it
were a SELECT
statement via the RETURNING
clause, however DELETE
lacks a
LIMIT
clause, nor can we take row locks on it explicitly. That said, we can
use the USING
clause to join to itself and get both limits and row locks.
This gives us a final query as follows:
DELETE FROM
queue_table
USING (
SELECT * FROM queue_table LIMIT 10 FOR UPDATE SKIP LOCKED
) q
WHERE q.id = queue_table.id RETURNING queue_table.*;
We are using the self-join method of deletion here with a subquery to both lock
the underlying rows being returned, as well as deleting the rows and
returning all values for the set of rows as if this were an original SELECT
statement.
-- backend 1
BEGIN;
DELETE FROM
queue_table
USING (
SELECT * FROM queue_table LIMIT 10 FOR UPDATE SKIP LOCKED
) q
WHERE q.id = queue_table.id RETURNING queue_table.*;
BEGIN
id queue_time payload
1 2021-08-31 10:36:34.872794-05 data-1
2 2021-08-31 10:36:34.872794-05 data-2
3 2021-08-31 10:36:34.872794-05 data-3
4 2021-08-31 10:36:34.872794-05 data-4
5 2021-08-31 10:36:34.872794-05 data-5
6 2021-08-31 10:36:34.872794-05 data-6
7 2021-08-31 10:36:34.872794-05 data-7
8 2021-08-31 10:36:34.872794-05 data-8
9 2021-08-31 10:36:34.872794-05 data-9
10 2021-08-31 10:36:34.872794-05 data-10
DELETE 10
COMMIT
-- backend 2
BEGIN;
DELETE FROM
queue_table
USING (
SELECT * FROM queue_table LIMIT 10 FOR UPDATE SKIP LOCKED
) q
WHERE q.id = queue_table.id RETURNING queue_table.*;
BEGIN
id queue_time payload
11 2021-08-31 10:36:34.872794-05 data-11
12 2021-08-31 10:36:34.872794-05 data-12
13 2021-08-31 10:36:34.872794-05 data-13
14 2021-08-31 10:36:34.872794-05 data-14
15 2021-08-31 10:36:34.872794-05 data-15
16 2021-08-31 10:36:34.872794-05 data-16
17 2021-08-31 10:36:34.872794-05 data-17
18 2021-08-31 10:36:34.872794-05 data-18
19 2021-08-31 10:36:34.872794-05 data-19
20 2021-08-31 10:36:34.872794-05 data-20
DELETE 10
As long as the transaction stays open until we are done with the processing of
the work items, other similar worker processes will not get assigned those work
items, and will instead retrieve ones which are not currently locked. If this
worker process is not able to handle its batch of items, it merely needs to
ROLLBACK
(or to have the application abort) and the original work_items will
be returned to the queue. As long as the worker obeys the BEGIN
,
DELETE RETURNING
, <process>
, COMMIT
process, you have a performant,
non-blocking, independent queue manager built using only native PostgreSQL
features.
Additionally, since we are using DELETE
, we know that autovacuum should kick
in and clean this table up periodically. If needed, we could tune the
storage parameters
for this queue table to ensure that the database itself handles this sort of
cleanup.
Caveats
One of the considerations when using PostgreSQL (or any RDBMS with MVCC support) for high turnover queuing is table bloat. If you were using this technique in a production setting, you would need to monitor table bloat using a tool such as pg_bloat_check (a part of pgmonitor) and then tune autovacuum settings appropriately. Even then, you may need to occasionally rotate your queue table in order to deal with table bloat.
Future improvements
Since many queue systems have support for than a basic FIFO system you could
enhance our system here by adding fields to our basic queue_table
, such as
topics, priorities, etc. The same basic recipe would be the same, but we would
not need to change much about the worker itself other than the sub-SELECT
used
to identify the rows in question. Everything else about the system could be the
same.
This is a powerful model that PostgreSQL itself handles with basic primitives. While special-case tools often have their niche, it is interesting to see what we can accomplish using the database itself.