feat: add simple app ping (#580)

* feat: add simple ping

* refactor: make ping run on server and show errors

* fix: format issues

* fix: missing translation for enabled ping option for app

* refactor: remove ping queue as no longer needed

* chore: address pull request feedback

* test: add some unit tests

* fix: format issues

* fix: deepsource issues
This commit is contained in:
Meier Lukas
2024-06-08 17:33:16 +02:00
committed by GitHub
parent 3dca787fa7
commit d7ecdf5567
25 changed files with 542 additions and 22 deletions

View File

@@ -1,13 +1,15 @@
import { iconsUpdaterJob } from "~/jobs/icons-updater";
import { analyticsJob } from "./jobs/analytics";
import { pingJob } from "./jobs/ping";
import { queuesJob } from "./jobs/queue";
import { createJobGroup } from "./lib/cron-job/group";
export const jobs = createJobGroup({
// Add your jobs here:
analytics: analyticsJob,
iconsUpdater: iconsUpdaterJob,
ping: pingJob,
// This job is used to process queues.
queues: queuesJob,
iconsUpdater: iconsUpdaterJob,
analytics: analyticsJob,
});

View File

@@ -0,0 +1,25 @@
import { logger } from "@homarr/log";
import { sendPingRequestAsync } from "@homarr/ping";
import { pingChannel, pingUrlChannel } from "@homarr/redis";
import { EVERY_MINUTE } from "~/lib/cron-job/constants";
import { createCronJob } from "~/lib/cron-job/creator";
export const pingJob = createCronJob(EVERY_MINUTE).withCallback(async () => {
const urls = await pingUrlChannel.getAllAsync();
for (const url of new Set(urls)) {
const pingResult = await sendPingRequestAsync(url);
if ("statusCode" in pingResult) {
logger.debug(`executed ping for url ${url} with status code ${pingResult.statusCode}`);
} else {
logger.error(`Executing ping for url ${url} failed with error: ${pingResult.error}`);
}
await pingChannel.publishAsync({
url,
...pingResult,
});
}
});

View File

@@ -1,6 +1,7 @@
import cron from "node-cron";
import type { MaybePromise } from "@homarr/common/types";
import { logger } from "@homarr/log";
interface CreateCronJobOptions {
runOnStart?: boolean;
@@ -9,11 +10,22 @@ interface CreateCronJobOptions {
export const createCronJob = (cronExpression: string, options: CreateCronJobOptions = { runOnStart: false }) => {
return {
withCallback: (callback: () => MaybePromise<void>) => {
const catchingCallbackAsync = async () => {
try {
await callback();
} catch (error) {
logger.error(
`apps/tasks/src/lib/cron-job/creator.ts: The callback of a cron job failed, expression ${cronExpression}, with error:`,
error,
);
}
};
if (options.runOnStart) {
void callback();
void catchingCallbackAsync();
}
const task = cron.schedule(cronExpression, () => void callback(), {
const task = cron.schedule(cronExpression, () => void catchingCallbackAsync(), {
scheduled: false,
});
return {

View File

@@ -41,15 +41,14 @@ export const createQueueClient = <TQueues extends Queues>(queues: TQueues) => {
};
return acc;
},
{} as Record<
keyof TQueues,
(
data: z.infer<TQueues[keyof TQueues]["_input"]>,
{} as {
[key in keyof TQueues]: (
data: z.infer<TQueues[key]["_input"]>,
props: {
executionDate?: Date;
} | void,
) => Promise<void>
>,
) => Promise<void>;
},
),
};
};

View File

@@ -1,3 +1,4 @@
import { logger } from "@homarr/log";
import { queueChannel } from "@homarr/redis";
import { queueRegistry } from "~/queues";
@@ -14,7 +15,18 @@ export const queueWorkerAsync = async () => {
for (const execution of executions) {
const queue = queueRegistry.get(execution.name);
if (!queue) continue;
await queue.callback(execution.data);
try {
await queue.callback(execution.data);
} catch (err) {
logger.error(
`apps/tasks/src/lib/queue/worker.ts: Error occured when executing queue ${execution.name} with data`,
execution.data,
"and error:",
err,
);
}
await queueChannel.markAsDoneAsync(execution._id);
}
};