定时任务

BSV 使用的定时任务库是 BullMQ,一款基于 RedisNodeJS 的高级消息队列。

快速开始

假如我们有一个 xx 的模块,需要一个定时执行打印的任务,那么在 src/modules/xx/ 里新建一个 tasks 文件夹,

  1. 新建配置 src/modules/xx/tasks/config.ts:
import config from "/src/config";
import IORedis from "ioredis";
export default {
/** 任务名称 */
printHello: "print_hello_task",
/** 最大尝试多少次 */
// maxAttempts: 3,
/** 并行数量 */
concurrency: 1,
connection: new IORedis({
...config.redis,
enableAutoPipelining: true,
maxRetriesPerRequest: null,
enableReadyCheck: false,
}),
};
  1. 新建 src/modules/xx/tasks/queue.ts
import { Queue } from "bullmq";
import config from "./config";
export const printHelloQueue = new Queue(config.printHello, {
connection: config.connection,
});
  1. 新建 src/modules/xx/tasks/worker.ts
import { Job, Worker } from "bullmq";
import { pingWorker } from "/src/utils/ping-worker";
import { Logger } from "/src/loaders/logger";
import config from "./config";
const worker = new Worker<{ type?: "ping" }>(
config.printHello,
async (job) => {
// 用于检测
if (job.data.type === "ping") {
return pingWorker(config.printHello);
}
// 我们的打印代码
console.log("hello world");
},
{ concurrency: config.concurrency, connection: config.connection }
);
worker.on("completed", (job) => {
Logger.info(`%s job.id(=%s) has completed!`, config.printHello, job.id);
});
worker.on("failed", (job: Job, err: Error) => {
Logger.warn(
`%s job.id(=%s) has failed with %s`,
config.printHello,
job.id,
err.message
);
});
export const printHelloWorker = worker;
  1. 新建 src/modules/xx/tasks/run.ts
import typeormLoader from "/src/loaders/typeorm";
import { Logger } from "/src/loaders/logger";
import { QueueScheduler } from "bullmq";
import config from "./config";
import { printHelloWorker } from "./worker";
import { printHelloQueue } from "./queue";
export default async function run(needSetupTypeorm = false): Promise<void> {
if (needSetupTypeorm) {
// 如果有数据库相关操作需要,需要先初始化 typeorm 相关
await typeormLoader();
}
const options = {
connection: config.connection,
};
// 需要声明 scheduler 来处理定时相关逻辑
new QueueScheduler(config.printHello, options);
const workers = [printHelloWorker];
workers.forEach((i) => {
i.on("failed", (err: Error) => {
Logger.warn(
`name(=%s) failed processing task job; error message: %s`,
i.name,
err.message
);
});
});
Logger.info(`started workers: \n${workers.map((i) => i.name).join("\n")}`);
// 添加一个任务试试
await printHelloQueue.add(config.printHello, {}, {});
}
  1. 新建运行入口 src/modules/xx/tasks/index.ts
import run from "./run";
run(true);
  1. 注册到 bull-board,可以通过后台查看任务列表相关信息
import { registerQueues } from "/src/modules/ui/queues";
import { printHelloQueue } from "./queue";
registerQueues(printHelloQueue);

单独打包

由于主应用代码会有多实例,任务相应的也会启动多实例,并不是我们想要的,所以需要将入口单独打包成一个 js,单独运行。

package.jsontargets 新增:

...
"print-hello-worker": {
"source": "./src/modules/xx/tasks/index.ts",
"distDir": "./dist/xx/",
"engines": {
"node": "^12.0.0"
}
}
...

再使用 yarn parcel build --target print-hello-worker 命令打包,输出的打包文件位置在 dist/xx/index.js

踩坑

  • 如果需要移除 repeat 任务,需要 queue.getRepeatableJobs() 然后 queue.removeRepeatableByKey(i.key) 移除,但是不会移除等待中的状态,所以最好之前再调用下 try { await queue.drain(true) } catch(e) {}
  • pause 方法只能 pause 整个 queue,不能 pause 某个具体的任务;