feat: add nestjs replacement, remove nestjs (#285)
* feat: add nestjs replacement, remove nestjs * fix: format issues * fix: dependency issues * fix: dependency issues * fix: format issue * fix: wrong channel used for logging channel
This commit is contained in:
3
apps/tasks/src/index.ts
Normal file
3
apps/tasks/src/index.ts
Normal file
@@ -0,0 +1,3 @@
|
||||
import { client } from "./queues";
|
||||
|
||||
export const queueClient = client;
|
||||
9
apps/tasks/src/jobs.ts
Normal file
9
apps/tasks/src/jobs.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
import { queuesJob } from "./jobs/queue";
|
||||
import { createJobGroup } from "./lib/cron-job/group";
|
||||
|
||||
export const jobs = createJobGroup({
|
||||
// Add your jobs here:
|
||||
|
||||
// This job is used to process queues.
|
||||
queues: queuesJob,
|
||||
});
|
||||
8
apps/tasks/src/jobs/queue.ts
Normal file
8
apps/tasks/src/jobs/queue.ts
Normal file
@@ -0,0 +1,8 @@
|
||||
import { EVERY_MINUTE } from "../lib/cron-job/constants";
|
||||
import { createCronJob } from "../lib/cron-job/creator";
|
||||
import { queueWorker } from "../lib/queue/worker";
|
||||
|
||||
// This job processes queues, it runs every minute.
|
||||
export const queuesJob = createCronJob(EVERY_MINUTE).withCallback(async () => {
|
||||
await queueWorker();
|
||||
});
|
||||
5
apps/tasks/src/lib/cron-job/constants.ts
Normal file
5
apps/tasks/src/lib/cron-job/constants.ts
Normal file
@@ -0,0 +1,5 @@
|
||||
export const EVERY_5_SECONDS = "*/5 * * * * *";
|
||||
export const EVERY_MINUTE = "* * * * *";
|
||||
export const EVERY_5_MINUTES = "*/5 * * * *";
|
||||
export const EVERY_10_MINUTES = "*/10 * * * *";
|
||||
export const EVERY_HOUR = "0 * * * *";
|
||||
17
apps/tasks/src/lib/cron-job/creator.ts
Normal file
17
apps/tasks/src/lib/cron-job/creator.ts
Normal file
@@ -0,0 +1,17 @@
|
||||
import cron from "node-cron";
|
||||
|
||||
import type { MaybePromise } from "@homarr/common/types";
|
||||
|
||||
export const createCronJob = (cronExpression: string) => {
|
||||
return {
|
||||
withCallback: (callback: () => MaybePromise<void>) => {
|
||||
const task = cron.schedule(cronExpression, () => void callback(), {
|
||||
scheduled: false,
|
||||
});
|
||||
return {
|
||||
_expression: cronExpression,
|
||||
_task: task,
|
||||
};
|
||||
},
|
||||
};
|
||||
};
|
||||
48
apps/tasks/src/lib/cron-job/group.ts
Normal file
48
apps/tasks/src/lib/cron-job/group.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
import { objectEntries } from "@homarr/common";
|
||||
|
||||
import type { createCronJob } from "./creator";
|
||||
import { jobRegistry } from "./registry";
|
||||
|
||||
type Jobs = Record<
|
||||
string,
|
||||
ReturnType<ReturnType<typeof createCronJob>["withCallback"]>
|
||||
>;
|
||||
|
||||
export const createJobGroup = <TJobs extends Jobs>(jobs: TJobs) => {
|
||||
for (const [name, job] of objectEntries(jobs)) {
|
||||
if (typeof name !== "string") continue;
|
||||
jobRegistry.set(name, {
|
||||
name,
|
||||
expression: job._expression,
|
||||
active: false,
|
||||
task: job._task,
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
start: (name: keyof TJobs) => {
|
||||
const job = jobRegistry.get(name as string);
|
||||
if (!job) return;
|
||||
job.active = true;
|
||||
job.task.start();
|
||||
},
|
||||
startAll: () => {
|
||||
for (const job of jobRegistry.values()) {
|
||||
job.active = true;
|
||||
job.task.start();
|
||||
}
|
||||
},
|
||||
stop: (name: keyof TJobs) => {
|
||||
const job = jobRegistry.get(name as string);
|
||||
if (!job) return;
|
||||
job.active = false;
|
||||
job.task.stop();
|
||||
},
|
||||
stopAll: () => {
|
||||
for (const job of jobRegistry.values()) {
|
||||
job.active = false;
|
||||
job.task.stop();
|
||||
}
|
||||
},
|
||||
};
|
||||
};
|
||||
9
apps/tasks/src/lib/cron-job/registry.ts
Normal file
9
apps/tasks/src/lib/cron-job/registry.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
import type cron from "node-cron";
|
||||
|
||||
interface Job {
|
||||
name: string;
|
||||
expression: string;
|
||||
active: boolean;
|
||||
task: cron.ScheduledTask;
|
||||
}
|
||||
export const jobRegistry = new Map<string, Job>();
|
||||
64
apps/tasks/src/lib/queue/client.ts
Normal file
64
apps/tasks/src/lib/queue/client.ts
Normal file
@@ -0,0 +1,64 @@
|
||||
import { objectEntries, objectKeys } from "@homarr/common";
|
||||
import type { MaybePromise } from "@homarr/common/types";
|
||||
import { queueChannel } from "@homarr/redis";
|
||||
import type { z } from "@homarr/validation";
|
||||
|
||||
import type { createQueue } from "./creator";
|
||||
|
||||
interface Queue<TInput extends z.ZodType = z.ZodType> {
|
||||
name: string;
|
||||
callback: (input: z.infer<TInput>) => MaybePromise<void>;
|
||||
inputValidator: TInput;
|
||||
}
|
||||
|
||||
type Queues = Record<
|
||||
string,
|
||||
ReturnType<ReturnType<typeof createQueue>["withCallback"]>
|
||||
>;
|
||||
|
||||
export const createQueueClient = <TQueues extends Queues>(queues: TQueues) => {
|
||||
const queueRegistry = new Map<string, Queue>();
|
||||
for (const [name, queue] of objectEntries(queues)) {
|
||||
if (typeof name !== "string") continue;
|
||||
queueRegistry.set(name, {
|
||||
name,
|
||||
callback: queue._callback,
|
||||
inputValidator: queue._input,
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
queueRegistry,
|
||||
client: objectKeys(queues).reduce(
|
||||
(acc, name) => {
|
||||
acc[name] = async (
|
||||
data: z.infer<TQueues[typeof name]["_input"]>,
|
||||
options,
|
||||
) => {
|
||||
if (typeof name !== "string") return;
|
||||
const queue = queueRegistry.get(name);
|
||||
if (!queue) return;
|
||||
|
||||
await queueChannel.add({
|
||||
name,
|
||||
data,
|
||||
executionDate:
|
||||
typeof options === "object" && options.executionDate
|
||||
? options.executionDate
|
||||
: new Date(),
|
||||
});
|
||||
};
|
||||
return acc;
|
||||
},
|
||||
{} as Record<
|
||||
keyof TQueues,
|
||||
(
|
||||
data: z.infer<TQueues[keyof TQueues]["_input"]>,
|
||||
props: {
|
||||
executionDate?: Date;
|
||||
} | void,
|
||||
) => Promise<void>
|
||||
>,
|
||||
),
|
||||
};
|
||||
};
|
||||
14
apps/tasks/src/lib/queue/creator.ts
Normal file
14
apps/tasks/src/lib/queue/creator.ts
Normal file
@@ -0,0 +1,14 @@
|
||||
import type { z } from "zod";
|
||||
|
||||
import type { MaybePromise } from "@homarr/common/types";
|
||||
|
||||
export const createQueue = <TInput extends z.ZodType>(input: TInput) => {
|
||||
return {
|
||||
withCallback: (callback: (data: z.infer<TInput>) => MaybePromise<void>) => {
|
||||
return {
|
||||
_input: input,
|
||||
_callback: callback,
|
||||
};
|
||||
},
|
||||
};
|
||||
};
|
||||
20
apps/tasks/src/lib/queue/worker.ts
Normal file
20
apps/tasks/src/lib/queue/worker.ts
Normal file
@@ -0,0 +1,20 @@
|
||||
import { queueChannel } from "@homarr/redis";
|
||||
|
||||
import { queueRegistry } from "~/queues";
|
||||
|
||||
/**
|
||||
* This function reads all the queue executions that are due and processes them.
|
||||
* Those executions are stored in the redis queue channel.
|
||||
*/
|
||||
export const queueWorker = async () => {
|
||||
const now = new Date();
|
||||
const executions = await queueChannel.filter((item) => {
|
||||
return item.executionDate < now;
|
||||
});
|
||||
for (const execution of executions) {
|
||||
const queue = queueRegistry.get(execution.name);
|
||||
if (!queue) continue;
|
||||
await queue.callback(execution.data);
|
||||
await queueChannel.markAsDone(execution._id);
|
||||
}
|
||||
};
|
||||
3
apps/tasks/src/main.ts
Normal file
3
apps/tasks/src/main.ts
Normal file
@@ -0,0 +1,3 @@
|
||||
import { jobs } from "./jobs";
|
||||
|
||||
jobs.startAll();
|
||||
7
apps/tasks/src/queues.ts
Normal file
7
apps/tasks/src/queues.ts
Normal file
@@ -0,0 +1,7 @@
|
||||
import { createQueueClient } from "./lib/queue/client";
|
||||
import { testQueue } from "./queues/test";
|
||||
|
||||
export const { client, queueRegistry } = createQueueClient({
|
||||
// Add your queues here
|
||||
test: testQueue,
|
||||
});
|
||||
11
apps/tasks/src/queues/test.ts
Normal file
11
apps/tasks/src/queues/test.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
import { z } from "@homarr/validation";
|
||||
|
||||
import { createQueue } from "~/lib/queue/creator";
|
||||
|
||||
export const testQueue = createQueue(
|
||||
z.object({
|
||||
id: z.string(),
|
||||
}),
|
||||
).withCallback(({ id }) => {
|
||||
console.log(`Test queue with id ${id}`);
|
||||
});
|
||||
Reference in New Issue
Block a user