定时任务
BSV
使用的定时任务库是 BullMQ,一款基于 Redis
的 NodeJS
的高级消息队列。
快速开始
假如我们有一个 xx
的模块,需要一个定时执行打印的任务,那么在 src/modules/xx/
里新建一个 tasks
文件夹,
- 新建配置
src/modules/xx/tasks/config.ts
:
import config from "/src/config";import IORedis from "ioredis";
export default { /** 任务名称 */ printHello: "print_hello_task", /** 最大尝试多少次 */ // maxAttempts: 3, /** 并行数量 */ concurrency: 1, connection: new IORedis({ ...config.redis, enableAutoPipelining: true, maxRetriesPerRequest: null, enableReadyCheck: false, }),};
- 新建
src/modules/xx/tasks/queue.ts
import { Queue } from "bullmq";import config from "./config";
export const printHelloQueue = new Queue(config.printHello, { connection: config.connection,});
- 新建
src/modules/xx/tasks/worker.ts
import { Job, Worker } from "bullmq";import { pingWorker } from "/src/utils/ping-worker";import { Logger } from "/src/loaders/logger";import config from "./config";
const worker = new Worker<{ type?: "ping" }>( config.printHello, async (job) => { // 用于检测 if (job.data.type === "ping") { return pingWorker(config.printHello); }
// 我们的打印代码 console.log("hello world"); }, { concurrency: config.concurrency, connection: config.connection });
worker.on("completed", (job) => { Logger.info(`%s job.id(=%s) has completed!`, config.printHello, job.id);});
worker.on("failed", (job: Job, err: Error) => { Logger.warn( `%s job.id(=%s) has failed with %s`, config.printHello, job.id, err.message );});
export const printHelloWorker = worker;
- 新建
src/modules/xx/tasks/run.ts
import typeormLoader from "/src/loaders/typeorm";import { Logger } from "/src/loaders/logger";import { QueueScheduler } from "bullmq";import config from "./config";import { printHelloWorker } from "./worker";import { printHelloQueue } from "./queue";
export default async function run(needSetupTypeorm = false): Promise<void> { if (needSetupTypeorm) { // 如果有数据库相关操作需要,需要先初始化 typeorm 相关 await typeormLoader(); }
const options = { connection: config.connection, };
// 需要声明 scheduler 来处理定时相关逻辑 new QueueScheduler(config.printHello, options);
const workers = [printHelloWorker];
workers.forEach((i) => { i.on("failed", (err: Error) => { Logger.warn( `name(=%s) failed processing task job; error message: %s`, i.name, err.message ); }); });
Logger.info(`started workers: \n${workers.map((i) => i.name).join("\n")}`);
// 添加一个任务试试 await printHelloQueue.add(config.printHello, {}, {});}
- 新建运行入口
src/modules/xx/tasks/index.ts
import run from "./run";
run(true);
- 注册到
bull-board
,可以通过后台查看任务列表相关信息
import { registerQueues } from "/src/modules/ui/queues";import { printHelloQueue } from "./queue";
registerQueues(printHelloQueue);
单独打包
由于主应用代码会有多实例,任务相应的也会启动多实例,并不是我们想要的,所以需要将入口单独打包成一个 js,单独运行。
在 package.json
的 targets
新增:
... "print-hello-worker": { "source": "./src/modules/xx/tasks/index.ts", "distDir": "./dist/xx/", "engines": { "node": "^12.0.0" } }...
再使用 yarn parcel build --target print-hello-worker
命令打包,输出的打包文件位置在 dist/xx/index.js
踩坑
- 如果需要移除
repeat
任务,需要queue.getRepeatableJobs()
然后queue.removeRepeatableByKey(i.key)
移除,但是不会移除等待中的状态,所以最好之前再调用下try { await queue.drain(true) } catch(e) {}
; pause
方法只能 pause 整个queue
,不能 pause 某个具体的任务;