fix: memory leak caused by many unclosed redis subscriptions (#750)
* fix: memory leak caused by many unclosed redis subscriptions * chore: address pull request feedback
This commit is contained in:
@@ -21,21 +21,22 @@ export const cronJobsRouter = createTRPCRouter({
|
|||||||
}),
|
}),
|
||||||
subscribeToStatusUpdates: publicProcedure.subscription(() => {
|
subscribeToStatusUpdates: publicProcedure.subscription(() => {
|
||||||
return observable<TaskStatus>((emit) => {
|
return observable<TaskStatus>((emit) => {
|
||||||
let isConnectionClosed = false;
|
const unsubscribes: (() => void)[] = [];
|
||||||
|
|
||||||
for (const job of jobGroup.getJobRegistry().values()) {
|
for (const job of jobGroup.getJobRegistry().values()) {
|
||||||
const channel = createCronJobStatusChannel(job.name);
|
const channel = createCronJobStatusChannel(job.name);
|
||||||
channel.subscribe((data) => {
|
const unsubscribe = channel.subscribe((data) => {
|
||||||
if (isConnectionClosed) return;
|
|
||||||
|
|
||||||
emit.next(data);
|
emit.next(data);
|
||||||
});
|
});
|
||||||
|
unsubscribes.push(unsubscribe);
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info("A tRPC client has connected to the cron job status updates procedure");
|
logger.info("A tRPC client has connected to the cron job status updates procedure");
|
||||||
|
|
||||||
return () => {
|
return () => {
|
||||||
isConnectionClosed = true;
|
unsubscribes.forEach((unsubscribe) => {
|
||||||
|
unsubscribe();
|
||||||
|
});
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
}),
|
}),
|
||||||
|
|||||||
@@ -9,16 +9,13 @@ import { createTRPCRouter, publicProcedure } from "../trpc";
|
|||||||
export const logRouter = createTRPCRouter({
|
export const logRouter = createTRPCRouter({
|
||||||
subscribe: publicProcedure.subscription(() => {
|
subscribe: publicProcedure.subscription(() => {
|
||||||
return observable<LoggerMessage>((emit) => {
|
return observable<LoggerMessage>((emit) => {
|
||||||
let isConnectionClosed = false;
|
const unsubscribe = loggingChannel.subscribe((data) => {
|
||||||
|
|
||||||
loggingChannel.subscribe((data) => {
|
|
||||||
if (isConnectionClosed) return;
|
|
||||||
emit.next(data);
|
emit.next(data);
|
||||||
});
|
});
|
||||||
logger.info("A tRPC client has connected to the logging procedure");
|
logger.info("A tRPC client has connected to the logging procedure");
|
||||||
|
|
||||||
return () => {
|
return () => {
|
||||||
isConnectionClosed = true;
|
unsubscribe();
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
}),
|
}),
|
||||||
|
|||||||
@@ -27,19 +27,15 @@ export const appRouter = createTRPCRouter({
|
|||||||
const pingResult = await sendPingRequestAsync(input.url);
|
const pingResult = await sendPingRequestAsync(input.url);
|
||||||
|
|
||||||
return observable<{ url: string; statusCode: number } | { url: string; error: string }>((emit) => {
|
return observable<{ url: string; statusCode: number } | { url: string; error: string }>((emit) => {
|
||||||
let isConnectionClosed = false;
|
|
||||||
|
|
||||||
emit.next({ url: input.url, ...pingResult });
|
emit.next({ url: input.url, ...pingResult });
|
||||||
pingChannel.subscribe((message) => {
|
const unsubscribe = pingChannel.subscribe((message) => {
|
||||||
if (isConnectionClosed) return;
|
|
||||||
|
|
||||||
// Only emit if same url
|
// Only emit if same url
|
||||||
if (message.url !== input.url) return;
|
if (message.url !== input.url) return;
|
||||||
emit.next(message);
|
emit.next(message);
|
||||||
});
|
});
|
||||||
|
|
||||||
return () => {
|
return () => {
|
||||||
isConnectionClosed = true;
|
unsubscribe();
|
||||||
void pingUrlChannel.removeAsync(input.url);
|
void pingUrlChannel.removeAsync(input.url);
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -25,20 +25,21 @@ export const mediaServerRouter = createTRPCRouter({
|
|||||||
.unstable_concat(createManyIntegrationMiddleware("jellyfin", "plex"))
|
.unstable_concat(createManyIntegrationMiddleware("jellyfin", "plex"))
|
||||||
.subscription(({ ctx }) => {
|
.subscription(({ ctx }) => {
|
||||||
return observable<{ integrationId: string; data: StreamSession[] }>((emit) => {
|
return observable<{ integrationId: string; data: StreamSession[] }>((emit) => {
|
||||||
let isConnectionClosed = false;
|
const unsubscribes: (() => void)[] = [];
|
||||||
|
|
||||||
for (const integration of ctx.integrations) {
|
for (const integration of ctx.integrations) {
|
||||||
const channel = createItemAndIntegrationChannel<StreamSession[]>("mediaServer", integration.id);
|
const channel = createItemAndIntegrationChannel<StreamSession[]>("mediaServer", integration.id);
|
||||||
void channel.subscribeAsync((sessions) => {
|
const unsubscribe = channel.subscribe((sessions) => {
|
||||||
if (isConnectionClosed) return;
|
|
||||||
emit.next({
|
emit.next({
|
||||||
integrationId: integration.id,
|
integrationId: integration.id,
|
||||||
data: sessions,
|
data: sessions,
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
unsubscribes.push(unsubscribe);
|
||||||
}
|
}
|
||||||
return () => {
|
return () => {
|
||||||
isConnectionClosed = true;
|
unsubscribes.forEach((unsubscribe) => {
|
||||||
|
unsubscribe();
|
||||||
|
});
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
}),
|
}),
|
||||||
|
|||||||
@@ -13,10 +13,7 @@ export const smartHomeRouter = createTRPCRouter({
|
|||||||
entityId: string;
|
entityId: string;
|
||||||
state: string;
|
state: string;
|
||||||
}>((emit) => {
|
}>((emit) => {
|
||||||
let isConnectionClosed = false;
|
const unsubscribe = homeAssistantEntityState.subscribe((message) => {
|
||||||
|
|
||||||
homeAssistantEntityState.subscribe((message) => {
|
|
||||||
if (isConnectionClosed) return;
|
|
||||||
if (message.entityId !== input.entityId) {
|
if (message.entityId !== input.entityId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -24,7 +21,7 @@ export const smartHomeRouter = createTRPCRouter({
|
|||||||
});
|
});
|
||||||
|
|
||||||
return () => {
|
return () => {
|
||||||
isConnectionClosed = true;
|
unsubscribe();
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
}),
|
}),
|
||||||
|
|||||||
92
packages/redis/src/lib/channel-subscription-tracker.ts
Normal file
92
packages/redis/src/lib/channel-subscription-tracker.ts
Normal file
@@ -0,0 +1,92 @@
|
|||||||
|
import { randomUUID } from "crypto";
|
||||||
|
|
||||||
|
import type { MaybePromise } from "@homarr/common/types";
|
||||||
|
import { logger } from "@homarr/log";
|
||||||
|
|
||||||
|
import { createRedisConnection } from "./connection";
|
||||||
|
|
||||||
|
type SubscriptionCallback = (message: string) => MaybePromise<void>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is used to deduplicate redis subscriptions.
|
||||||
|
* It keeps track of all subscriptions and only subscribes to a channel if there are any subscriptions to it.
|
||||||
|
* It also provides a way to remove the callback from the channel.
|
||||||
|
* It fixes a potential memory leak where the redis client would keep creating new subscriptions to the same channel.
|
||||||
|
* @see https://github.com/homarr-labs/homarr/issues/744
|
||||||
|
*/
|
||||||
|
export class ChannelSubscriptionTracker {
|
||||||
|
private static subscriptions = new Map<string, Map<string, SubscriptionCallback>>();
|
||||||
|
private static redis = createRedisConnection();
|
||||||
|
private static listenerActive = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subscribes to a channel.
|
||||||
|
* @param channelName name of the channel
|
||||||
|
* @param callback callback function to be called when a message is received
|
||||||
|
* @returns a function to unsubscribe from the channel
|
||||||
|
*/
|
||||||
|
public static subscribe(channelName: string, callback: SubscriptionCallback) {
|
||||||
|
logger.debug(`Adding redis channel callback channel='${channelName}'`);
|
||||||
|
|
||||||
|
// We only want to activate the listener once
|
||||||
|
if (!this.listenerActive) {
|
||||||
|
this.activateListener();
|
||||||
|
this.listenerActive = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
const channelSubscriptions = this.subscriptions.get(channelName) ?? new Map<string, SubscriptionCallback>();
|
||||||
|
const id = randomUUID();
|
||||||
|
|
||||||
|
// If there are no subscriptions to the channel, subscribe to it
|
||||||
|
if (channelSubscriptions.size === 0) {
|
||||||
|
logger.debug(`Subscribing to redis channel channel='${channelName}'`);
|
||||||
|
void this.redis.subscribe(channelName);
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug(`Adding redis channel callback channel='${channelName}' id='${id}'`);
|
||||||
|
channelSubscriptions.set(id, callback);
|
||||||
|
|
||||||
|
this.subscriptions.set(channelName, channelSubscriptions);
|
||||||
|
|
||||||
|
// Return a function to unsubscribe
|
||||||
|
return () => {
|
||||||
|
logger.debug(`Removing redis channel callback channel='${channelName}' id='${id}'`);
|
||||||
|
|
||||||
|
const channelSubscriptions = this.subscriptions.get(channelName);
|
||||||
|
if (!channelSubscriptions) return;
|
||||||
|
|
||||||
|
channelSubscriptions.delete(id);
|
||||||
|
|
||||||
|
// If there are no subscriptions to the channel, unsubscribe from it
|
||||||
|
if (channelSubscriptions.size >= 1) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.debug(`Unsubscribing from redis channel channel='${channelName}'`);
|
||||||
|
void this.redis.unsubscribe(channelName);
|
||||||
|
this.subscriptions.delete(channelName);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Activates the listener for the redis client.
|
||||||
|
*/
|
||||||
|
private static activateListener() {
|
||||||
|
logger.debug("Activating listener");
|
||||||
|
this.redis.on("message", (channel, message) => {
|
||||||
|
const channelSubscriptions = this.subscriptions.get(channel);
|
||||||
|
if (!channelSubscriptions) {
|
||||||
|
logger.warn(`Received message on unknown channel channel='${channel}'`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const [id, callback] of channelSubscriptions.entries()) {
|
||||||
|
// Don't log messages from the logging channel as it would create an infinite loop
|
||||||
|
if (channel !== "pubSub:logging") {
|
||||||
|
logger.debug(`Calling subscription callback channel='${channel}' id='${id}'`);
|
||||||
|
}
|
||||||
|
void callback(message);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,9 +4,9 @@ import { createId } from "@homarr/db";
|
|||||||
import type { WidgetKind } from "@homarr/definitions";
|
import type { WidgetKind } from "@homarr/definitions";
|
||||||
import { logger } from "@homarr/log";
|
import { logger } from "@homarr/log";
|
||||||
|
|
||||||
|
import { ChannelSubscriptionTracker } from "./channel-subscription-tracker";
|
||||||
import { createRedisConnection } from "./connection";
|
import { createRedisConnection } from "./connection";
|
||||||
|
|
||||||
const subscriber = createRedisConnection(); // Used for subscribing to channels - after subscribing it can only be used for subscribing
|
|
||||||
const publisher = createRedisConnection();
|
const publisher = createRedisConnection();
|
||||||
const lastDataClient = createRedisConnection();
|
const lastDataClient = createRedisConnection();
|
||||||
|
|
||||||
@@ -31,15 +31,7 @@ export const createSubPubChannel = <TData>(name: string, { persist }: { persist:
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
void subscriber.subscribe(channelName, (err) => {
|
return ChannelSubscriptionTracker.subscribe(channelName, (message) => {
|
||||||
if (!err) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
logger.error(`Error with channel '${channelName}': ${err.name} (${err.message})`);
|
|
||||||
});
|
|
||||||
subscriber.on("message", (channel, message) => {
|
|
||||||
if (channel !== channelName) return; // TODO: check if this is necessary - it should be handled by the redis client
|
|
||||||
|
|
||||||
callback(superjson.parse(message));
|
callback(superjson.parse(message));
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
@@ -172,15 +164,9 @@ export const createCacheChannel = <TData>(name: string, cacheDurationMs: number
|
|||||||
export const createItemAndIntegrationChannel = <TData>(kind: WidgetKind, integrationId: string) => {
|
export const createItemAndIntegrationChannel = <TData>(kind: WidgetKind, integrationId: string) => {
|
||||||
const channelName = `item:${kind}:integration:${integrationId}`;
|
const channelName = `item:${kind}:integration:${integrationId}`;
|
||||||
return {
|
return {
|
||||||
subscribeAsync: async (callback: (data: TData) => void) => {
|
subscribe: (callback: (data: TData) => void) => {
|
||||||
await subscriber.subscribe(channelName);
|
return ChannelSubscriptionTracker.subscribe(channelName, (message) => {
|
||||||
subscriber.on("message", (channel, message) => {
|
|
||||||
if (channel !== channelName) {
|
|
||||||
logger.warn(`received message on ${channel} channel but was looking for ${channelName}`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
callback(superjson.parse(message));
|
callback(superjson.parse(message));
|
||||||
logger.debug(`sent message on ${channelName}`);
|
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
publishAndUpdateLastStateAsync: async (data: TData) => {
|
publishAndUpdateLastStateAsync: async (data: TData) => {
|
||||||
|
|||||||
Reference in New Issue
Block a user