场景
N 个生产者往 db 里面插入新任务,N 个消费者从 db 取出新任务执行并更新任务状态为“已执行”。
CREATE TABLE `t_job_queue` (
`id` INT NOT NULL AUTO_INCREMENT,
`status` INT NOT NULL DEFAULT '0',
`params` VARCHAR(1024) NOT NULL DEFAULT '',
`result` VARCHAR(1024) NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 1
- 2
- 3
- 4
- 5
- 6
- 7
关键问题
避免多个消费者取到同一个新任务。
有问题的解决方案
方案1 - 锁表
LOCK TABLES t_job_queue WRITE, t_job_queue as t1 READ;
SELECT id, status, params FROM t_job_queue WHERE status=0 ORDER BY id ASC LIMIT 1;
UPDATE t_job_queue SET status=STATUS_PROCESSING WHERE id=xxx;
UNLOCK TABLES;
-- execute task
UPDATE t_job_queue SET status=STATUS_FINISHED WHERE id=xxx;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
这样做的问题在于锁住整张表,性能会比较低,而且多业务使用同一个库时可能因为交叉锁表造成死锁。
方案2 - SELECT … FOR UPDATE
-- disable autocommit
START TRANSACTION;
SELECT id, status, params FROM t_job_queue WHERE status=0 ORDER BY id ASC LIMIT 1 FOR UPDATE;
UPDATE t_job_queue SET status=STATUS_PROCESSING WHERE id=xxx;
COMMIT;
-- execute task
UPDATE t_job_queue SET status=STATUS_FINISHED WHERE id=xxx;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
使用 SELECT ... FOR UPDATE
可以发挥 innodb 行锁(row-level locking) 的特性,但终究还是会加锁,只是锁的粒度降低了。
无锁任务队列
-- enable autocommit
SELECT id, status, params FROM t_job_queue WHERE status=0 ORDER BY id ASC LIMIT 1;
UPDATE t_job_queue SET status=STATUS_PROCESSING WHERE id=xxx;
-- if affected_rows == 1 then execute task else continue end
UPDATE t_job_queue SET status=STATUS_FINISHED WHERE id=xxx;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
但是这种方案有个问题,就是可能多个消费者会取到同一个任务去 update, 尽管不会造成一个任务对多个消费者执行,但是终究还是有瑕疵,因此进一步完善,改成先 UPDATE, 再 SELECT 的方式。
关键问题及解决方案
由于 UPDATE 只能返回 affected rows, 并不能返回被更新的记录内容,所以需要一种方式来取到被更新的任务。这里有 2 种思路:
- 使用 MySQL 特有的 User-Defined Variables
UPDATE t_job_queue SET status=STATUS_PROCESSING, id=(@id:=id) WHERE status=0 ORDER BY id ASC LIMIT 1;
SELECT id, status, params FROM t_job_queue WHERE id=@id;
- 1
- 2
- 1
- 2
- 增加唯一标识码字段
CREATE TABLE `t_job_queue` (
`id` INT NOT NULL AUTO_INCREMENT,
`status` INT NOT NULL DEFAULT '0',
`params` VARCHAR(1024) NOT NULL DEFAULT '',
`result` VARCHAR(1024) NOT NULL DEFAULT '',
`update_id1` INT UNSIGNED NOT NULL DEFAULT '0',
`update_id2` INT UNSIGNED NOT NULL DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
UPDATE t_job_queue SET status=STATUS_PROCESSING, id=(@id:=id) WHERE status=0 ORDER BY id ASC LIMIT 1;
SELECT * FROM t_job_queue WHERE update_id1=xxx AND update_id2=xxx;
- 1
- 2
- 1
- 2
使用“自定义变量”的好处是无需增加额外的字段,但缺点是自定义变量只在一个 session 内生效,也就是说,如果 UPDATE 和 SELECT 之间连接断开的话,自定义变量会失效:
User-defined variables are session specific. A user variable defined by one client cannot be seen or used by other clients.
所以,推荐的做法是:
- 优先使用“自定义变量”
- 当 UPDATE 和 SELECT 之间连接断开时重连时使用唯一标识码
笔者设置了两个唯一标识码字段,update_id1 填入时间戳,update_id2 填入随机生成的 seq.
Note: 使用该方法必须设置 autocommit 为 1
References
5 subtle ways you’re using MySQL as a queue, and why it’ll bite you
A job queue in MySQL