场景

N 个生产者往 db 里面插入新任务,N 个消费者从 db 取出新任务执行并更新任务状态为“已执行”。

CREATE TABLE `t_job_queue` (
  `id` INT NOT NULL AUTO_INCREMENT,
  `status` INT NOT NULL DEFAULT '0',
  `params` VARCHAR(1024) NOT NULL DEFAULT '',
  `result` VARCHAR(1024) NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

关键问题

避免多个消费者取到同一个新任务。

有问题的解决方案

方案1 - 锁表

LOCK TABLES t_job_queue WRITE, t_job_queue as t1 READ;
SELECT id, status, params FROM t_job_queue WHERE status=0 ORDER BY id ASC LIMIT 1;
UPDATE t_job_queue SET status=STATUS_PROCESSING WHERE id=xxx;
UNLOCK TABLES;

-- execute task

UPDATE t_job_queue SET status=STATUS_FINISHED WHERE id=xxx;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

这样做的问题在于锁住整张表,性能会比较低,而且多业务使用同一个库时可能因为交叉锁表造成死锁。

方案2 - SELECT … FOR UPDATE

-- disable autocommit

START TRANSACTION;
SELECT id, status, params FROM t_job_queue WHERE status=0 ORDER BY id ASC LIMIT 1 FOR UPDATE;
UPDATE t_job_queue SET status=STATUS_PROCESSING WHERE id=xxx;
COMMIT;

-- execute task

UPDATE t_job_queue SET status=STATUS_FINISHED WHERE id=xxx;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

使用 SELECT ... FOR UPDATE 可以发挥 innodb 行锁(row-level locking) 的特性,但终究还是会加锁,只是锁的粒度降低了。

无锁任务队列

-- enable autocommit

SELECT id, status, params FROM t_job_queue WHERE status=0 ORDER BY id ASC LIMIT 1;
UPDATE t_job_queue SET status=STATUS_PROCESSING WHERE id=xxx;

-- if affected_rows == 1 then execute task else continue end

UPDATE t_job_queue SET status=STATUS_FINISHED WHERE id=xxx;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

但是这种方案有个问题,就是可能多个消费者会取到同一个任务去 update, 尽管不会造成一个任务对多个消费者执行,但是终究还是有瑕疵,因此进一步完善,改成先 UPDATE, 再 SELECT 的方式。

关键问题及解决方案

由于 UPDATE 只能返回 affected rows, 并不能返回被更新的记录内容,所以需要一种方式来取到被更新的任务。这里有 2 种思路:

UPDATE t_job_queue SET status=STATUS_PROCESSING, id=(@id:=id) WHERE status=0 ORDER BY id ASC LIMIT 1;
SELECT id, status, params FROM t_job_queue WHERE id=@id;
  • 1
  • 2
  • 1
  • 2
  • 增加唯一标识码字段
CREATE TABLE `t_job_queue` (
  `id` INT NOT NULL AUTO_INCREMENT,
  `status` INT NOT NULL DEFAULT '0',
  `params` VARCHAR(1024) NOT NULL DEFAULT '',
  `result` VARCHAR(1024) NOT NULL DEFAULT '',
  `update_id1` INT UNSIGNED NOT NULL DEFAULT '0',
  `update_id2` INT UNSIGNED NOT NULL DEFAULT '0',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
UPDATE t_job_queue SET status=STATUS_PROCESSING, id=(@id:=id) WHERE status=0 ORDER BY id ASC LIMIT 1;
SELECT * FROM t_job_queue WHERE update_id1=xxx AND update_id2=xxx;
  • 1
  • 2
  • 1
  • 2

使用“自定义变量”的好处是无需增加额外的字段,但缺点是自定义变量只在一个 session 内生效,也就是说,如果 UPDATE 和 SELECT 之间连接断开的话,自定义变量会失效:

User-defined variables are session specific. A user variable defined by one client cannot be seen or used by other clients.

所以,推荐的做法是:

  • 优先使用“自定义变量”
  • 当 UPDATE 和 SELECT 之间连接断开时重连时使用唯一标识码

笔者设置了两个唯一标识码字段,update_id1 填入时间戳,update_id2 填入随机生成的 seq.

Note: 使用该方法必须设置 autocommit 为 1

References

5 subtle ways you’re using MySQL as a queue, and why it’ll bite you
A job queue in MySQL

使用 MySQL 实现无锁任务队列(using MySQL as a job queue)
标签: