feat(tasks): allow management of job intervals and disabling them (#3408)

This commit is contained in:
Meier Lukas
2025-07-03 20:59:26 +02:00
committed by GitHub
parent 95c8aadb0c
commit 9398dd983c
37 changed files with 5224 additions and 195 deletions

View File

@@ -0,0 +1,109 @@
import { schedule, validate as validateCron } from "node-cron";
import type { IJobManager } from "@homarr/cron-job-api";
import type { jobGroup as cronJobGroup, JobGroupKeys } from "@homarr/cron-jobs";
import type { Database, InferInsertModel } from "@homarr/db";
import { eq } from "@homarr/db";
import { cronJobConfigurations } from "@homarr/db/schema";
import { logger } from "@homarr/log";
export class JobManager implements IJobManager {
constructor(
private db: Database,
private jobGroup: typeof cronJobGroup,
) {}
public async startAsync(name: JobGroupKeys): Promise<void> {
await this.jobGroup.startAsync(name);
}
public async triggerAsync(name: JobGroupKeys): Promise<void> {
await this.jobGroup.runManuallyAsync(name);
}
public async stopAsync(name: JobGroupKeys): Promise<void> {
await this.jobGroup.stopAsync(name);
}
public async updateIntervalAsync(name: JobGroupKeys, cron: string): Promise<void> {
logger.info(`Updating cron job interval name="${name}" expression="${cron}"`);
const job = this.jobGroup.getJobRegistry().get(name);
if (!job) throw new Error(`Job ${name} not found`);
if (job.cronExpression === "never") throw new Error(`Job ${name} cannot be updated as it is set to "never"`);
if (!validateCron(cron)) {
throw new Error(`Invalid cron expression: ${cron}`);
}
await this.updateConfigurationAsync(name, { cronExpression: cron });
await this.jobGroup.getTask(name)?.destroy();
this.jobGroup.setTask(
name,
schedule(cron, () => void job.executeAsync(), {
name,
}),
);
logger.info(`Cron job interval updated name="${name}" expression="${cron}"`);
}
public async disableAsync(name: JobGroupKeys): Promise<void> {
logger.info(`Disabling cron job name="${name}"`);
const job = this.jobGroup.getJobRegistry().get(name);
if (!job) throw new Error(`Job ${name} not found`);
if (job.cronExpression === "never") throw new Error(`Job ${name} cannot be disabled as it is set to "never"`);
await this.updateConfigurationAsync(name, { isEnabled: false });
await this.jobGroup.stopAsync(name);
logger.info(`Cron job disabled name="${name}"`);
}
public async enableAsync(name: JobGroupKeys): Promise<void> {
logger.info(`Enabling cron job name="${name}"`);
await this.updateConfigurationAsync(name, { isEnabled: true });
await this.jobGroup.startAsync(name);
logger.info(`Cron job enabled name="${name}"`);
}
private async updateConfigurationAsync(
name: JobGroupKeys,
configuration: Omit<Partial<InferInsertModel<typeof cronJobConfigurations>>, "name">,
) {
const existingConfig = await this.db.query.cronJobConfigurations.findFirst({
where: (table, { eq }) => eq(table.name, name),
});
logger.debug(
`Updating cron job configuration name="${name}" configuration="${JSON.stringify(configuration)}" exists="${Boolean(existingConfig)}"`,
);
if (existingConfig) {
await this.db
.update(cronJobConfigurations)
// prevent updating the name, as it is the primary key
.set({ ...configuration, name: undefined })
.where(eq(cronJobConfigurations.name, name));
logger.debug(`Cron job configuration updated name="${name}" configuration="${JSON.stringify(configuration)}"`);
return;
}
const job = this.jobGroup.getJobRegistry().get(name);
if (!job) throw new Error(`Job ${name} not found`);
await this.db.insert(cronJobConfigurations).values({
name,
cronExpression: configuration.cronExpression ?? job.cronExpression,
isEnabled: configuration.isEnabled ?? true,
});
logger.debug(`Cron job configuration updated name="${name}" configuration="${JSON.stringify(configuration)}"`);
}
public async getAllAsync(): Promise<
{ name: JobGroupKeys; cron: string; preventManualExecution: boolean; isEnabled: boolean }[]
> {
const configurations = await this.db.query.cronJobConfigurations.findMany();
return [...this.jobGroup.getJobRegistry().entries()].map(([name, job]) => {
const config = configurations.find((config) => config.name === name);
return {
name,
cron: config?.cronExpression ?? job.cronExpression,
preventManualExecution: job.preventManualExecution,
isEnabled: config?.isEnabled ?? true,
};
});
}
}

View File

@@ -1,10 +1,45 @@
// This import has to be the first import in the file so that the agent is overridden before any other modules are imported.
import "./undici-log-agent-override";
import { registerCronJobRunner } from "@homarr/cron-job-runner/register";
import type { FastifyTRPCPluginOptions } from "@trpc/server/adapters/fastify";
import { fastifyTRPCPlugin } from "@trpc/server/adapters/fastify";
import fastify from "fastify";
import type { JobRouter } from "@homarr/cron-job-api";
import { jobRouter } from "@homarr/cron-job-api";
import { CRON_JOB_API_KEY_HEADER, CRON_JOB_API_PATH, CRON_JOB_API_PORT } from "@homarr/cron-job-api/constants";
import { jobGroup } from "@homarr/cron-jobs";
import { db } from "@homarr/db";
import { logger } from "@homarr/log";
import { JobManager } from "./job-manager";
const server = fastify({
maxParamLength: 5000,
});
server.register(fastifyTRPCPlugin, {
prefix: CRON_JOB_API_PATH,
trpcOptions: {
router: jobRouter,
createContext: ({ req }) => ({
manager: new JobManager(db, jobGroup),
apiKey: req.headers[CRON_JOB_API_KEY_HEADER] as string | undefined,
}),
onError({ path, error }) {
logger.error(new Error(`Error in tasks tRPC handler path="${path}"`, { cause: error }));
},
} satisfies FastifyTRPCPluginOptions<JobRouter>["trpcOptions"],
});
void (async () => {
registerCronJobRunner();
await jobGroup.initializeAsync();
await jobGroup.startAllAsync();
try {
await server.listen({ port: CRON_JOB_API_PORT });
logger.info(`Tasks web server started successfully port="${CRON_JOB_API_PORT}"`);
} catch (err) {
logger.error(new Error(`Failed to start tasks web server port="${CRON_JOB_API_PORT}"`, { cause: err }));
process.exit(1);
}
})();