Title: Lightweight Portable Message Queue Using 'SQLite'
Version: 1.1.0
Author: Gábor Csárdi
Maintainer: Gábor Csárdi <csardi.gabor@gmail.com>
Description: Temporary and permanent message queues for R. Built on top of 'SQLite' databases. 'SQLite' provides locking, and makes it possible to detect crashed consumers. Crashed jobs can be automatically marked as "failed", or put in the queue again, potentially a limited number of times.
License: MIT + file LICENSE
LazyData: true
URL: https://github.com/r-lib/liteq#readme
BugReports: https://github.com/r-lib/liteq/issues
RoxygenNote: 6.1.1
Imports: assertthat, DBI, rappdirs, RSQLite
Suggests: callr, covr, processx, testthat, withr
Encoding: UTF-8
NeedsCompilation: no
Packaged: 2019-03-08 10:41:18 UTC; gaborcsardi
Repository: CRAN
Date/Publication: 2019-03-08 13:40:10 UTC

Lightweight Portable Message Queue Using 'SQLite'

Description

Message queues for R. Built on top of 'SQLite' databases.

Concurrency

liteq works with multiple producer and/or consumer processes accessing the same queue, via the locking mechanism of 'SQLite'. If a queue is locked by 'SQLite', the process that tries to access it, must wait until it is unlocked. The maximum amount of waiting time is by default 10 seconds, and it can be changed via the R_LITEQ_BUSY_TIMEOUT environment variable, in milliseconds. If you have many concurrent processes using the same liteq database, and see database locked errors, then you can try to increase the timeout value.

Examples

# We don't run this, because it writes to the cache directory
db <- tempfile()
q <- ensure_queue("jobs", db = db)
q
list_queues(db)

# Publish two messages
publish(q, title = "First message", message = "Hello world!")
publish(q, title = "Second message", message = "Hello again!")
is_empty(q)
message_count(q)
list_messages(q)

# Consume one
msg <- try_consume(q)
msg

ack(msg)
list_messages(q)
msg2 <- try_consume(q)
nack(msg2)
list_messages(q)

# No more messages
is_empty(q)
try_consume(q)

Examples

## See the manual page

Acknowledge that the work on a message has finished successfully

Description

Acknowledge that the work on a message has finished successfully

Usage

ack(message)

Arguments

message

The message object.

See Also

liteq for examples

Other liteq messages: consume, is_empty, list_failed_messages, list_messages, message_count, publish, remove_failed_messages, requeue_failed_messages, try_consume


Consume a message from a queue

Description

Blocks and waits for a message if there isn't one to work on currently.

Usage

consume(queue, poll_interval = 500)

Arguments

queue

The queue object.

poll_interval

Poll interval in milliseconds. How often to poll the queue for new jobs, if none are immediately available.

Value

A message.

See Also

liteq for examples

Other liteq messages: ack, is_empty, list_failed_messages, list_messages, message_count, publish, remove_failed_messages, requeue_failed_messages, try_consume


Create a queue in a database

Description

It also creates the database, if it does not exist.

Usage

create_queue(name = NULL, db = default_db(), crash_strategy = "fail")

Arguments

name

Name of the queue. If not specified or NULL, a name is generated randomly.

db

Path to the database file.

crash_strategy

What to do with crashed jobs. The default is that they will "fail" (just like a negative acknowledgement). Another possibility is "requeue", in which case they are requeued immediately, potentially even multiple times. Alternatively it can be a number, in which case they are requeued at most the specified number of times.

See Also

liteq for examples

Other liteq queues: delete_queue, ensure_queue, list_queues


Positive or negative ackowledgement

Description

If positive, then we need to remove the message from the queue. If negative, we just set the status to FAILED.

Usage

db_ack(db, queue, id, lock, success)

Arguments

db

DB file.

queue

Queue name.

id

Message id.

lock

Name of the message lock file.

success

Whether this is a positive or negative ACK.


Consume a message from a message queue

Description

This is the blocking version of try_consume(). Currently it just polls twice a second, and sleeps between the polls. Each poll will also trigger a crash cleanup, if there are workers running.

Usage

db_consume(db, queue, poll_interval = 500)

Arguments

queue

The queue object.


Create a queue

Description

The database columns:

Usage

db_create_queue(name, db, crash_strategy)

Arguments

name

Name of the queue. If not specified or NULL, a name is generated randomly.

db

Path to the database file.

crash_strategy

What to do with crashed jobs. The default is that they will "fail" (just like a negative acknowledgement). Another possibility is "requeue", in which case they are requeued immediately, potentially even multiple times. Alternatively it can be a number, in which case they are requeued at most the specified number of times.


Try to consume a message from the queue

Description

If there is a message that it READY, it returns that. Otherwise it checks for crashed workers.

Usage

db_try_consume(db, queue, crashed = TRUE, con = NULL)

Arguments

db

DB file name.

queue

Name of the queue.

Details of the implementation

The database must be locked for the whole operation, including checking on or creating the lock databases.

  1. If there is a READY message, that one is taken.

  2. Otherwise if there are WORKING messages, then we check them one by one. This might take a lot of time, and the DB must be locked for the whole search, so it is not ideal. But I don't have a better solution right now.

Taking a message means

  1. Updating its row.status to WORKING.

  2. Creating another database that serves as the lock for this message.


The name of the default database

Description

If the queue database is not specified explicitly, then liteq uses this file. Its location is determined via the rappdirs package, see rappdirs::user_data_dir().

Usage

default_db()

Value

A characater scalar, the name of the default database.


Delete a queue

Description

Delete a queue

Usage

delete_queue(queue, force = FALSE)

Arguments

queue

The queue to delete.

force

Whether to delete the queue even if it contains messages.

See Also

liteq for examples

Other liteq queues: create_queue, ensure_queue, list_queues


Ensure that the DB exists and has the right columns

Description

We try a query, and if it fails then we try to create the DB.

Usage

ensure_db(db)

Arguments

db

DB file.


Make sure that a queue exists

Description

If it does not exist, then the queue will be created.

Usage

ensure_queue(name, db = default_db(), crash_strategy = "fail")

Arguments

name

Name of the queue. If not specified or NULL, a name is generated randomly.

db

Path to the database file.

crash_strategy

What to do with crashed jobs. The default is that they will "fail" (just like a negative acknowledgement). Another possibility is "requeue", in which case they are requeued immediately, potentially even multiple times. Alternatively it can be a number, in which case they are requeued at most the specified number of times.

Value

The queue object.

See Also

liteq for examples

Other liteq queues: create_queue, delete_queue, list_queues


Check if a queue is empty

Description

Check if a queue is empty

Usage

is_empty(queue)

Arguments

queue

The queue object.

Value

Logical, whether the queue is empty.

See Also

liteq for examples

Other liteq messages: ack, consume, list_failed_messages, list_messages, message_count, publish, remove_failed_messages, requeue_failed_messages, try_consume


List failed messages in a queue

Description

List failed messages in a queue

Usage

list_failed_messages(queue)

Arguments

queue

The queue object.

Value

Data frame with columns: id, title, status.

See Also

liteq for examples

Other liteq messages: ack, consume, is_empty, list_messages, message_count, publish, remove_failed_messages, requeue_failed_messages, try_consume


List all messages in a queue

Description

List all messages in a queue

Usage

list_messages(queue)

Arguments

queue

The queue object.

Value

Data frame with columns: id, title, status.

See Also

liteq for examples

Other liteq messages: ack, consume, is_empty, list_failed_messages, message_count, publish, remove_failed_messages, requeue_failed_messages, try_consume


List all queues in a database

Description

List all queues in a database

Usage

list_queues(db = default_db())

Arguments

db

The queue database to query.

Value

A list of liteq_queue objects.

See Also

liteq for examples

Other liteq queues: create_queue, delete_queue, ensure_queue


Make a message object

Description

It creates the lock for the message as well.

Usage

make_message(id, title, message, db, queue, lockdir)

Arguments

id

Message id, integer, auto-generated.

title

Title of message.

message

The message itself.

db

Main DB file.

queue

Name of the queue.

lockdir

Directory to create the message lock in.

Details

The message object contains the connection to the message lock. If the worker crashes, then there will be no reference to the connection, and the lock will be released. This is how we detect crashed workers.

Value

message object


Get the number of messages in a queue.

Description

Get the number of messages in a queue.

Usage

message_count(queue)

Arguments

queue

The queue object.

Value

Number of messages in the queue.

See Also

liteq for examples

Other liteq messages: ack, consume, is_empty, list_failed_messages, list_messages, publish, remove_failed_messages, requeue_failed_messages, try_consume


Report that the work on a message has failed

Description

Report that the work on a message has failed

Usage

nack(message)

Arguments

message

The message object.

See Also

liteq for examples


Publish a message in a queue

Description

Publish a message in a queue

Usage

publish(queue, title = "", message = "")

Arguments

queue

The queue object.

title

The title of the message. It can be the empty string.

message

The body of the message. It can be the empty string.

See Also

liteq for examples

Other liteq messages: ack, consume, is_empty, list_failed_messages, list_messages, message_count, remove_failed_messages, requeue_failed_messages, try_consume


Remove failed messages from the queue

Description

Remove failed messages from the queue

Usage

remove_failed_messages(queue, id = NULL)

Arguments

queue

The queue object.

id

Ids of the messages to requeue. If it is NULL, then all failed messages will be removed.

See Also

liteq for examples

Other liteq messages: ack, consume, is_empty, list_failed_messages, list_messages, message_count, publish, requeue_failed_messages, try_consume


Requeue failed messages

Description

Requeue failed messages

Usage

requeue_failed_messages(queue, id = NULL)

Arguments

queue

The queue object.

id

Ids of the messages to requeue. If it is NULL, then all failed messages will be requeued.

See Also

liteq for examples

Other liteq messages: ack, consume, is_empty, list_failed_messages, list_messages, message_count, publish, remove_failed_messages, try_consume


Consume a message if there is one available

Description

Consume a message if there is one available

Usage

try_consume(queue)

Arguments

queue

The queue object.

Value

A message, or NULL if there is not message to work on.

See Also

liteq for examples

Other liteq messages: ack, consume, is_empty, list_failed_messages, list_messages, message_count, publish, remove_failed_messages, requeue_failed_messages