From 41dba7b516880098ae923d45f2065875cb6c70da Mon Sep 17 00:00:00 2001 From: Meier Lukas Date: Wed, 3 Jul 2024 20:31:06 +0200 Subject: [PATCH] fix: controller is already closed trpc subscription observable error (#743) --- packages/api/src/router/cron-jobs.ts | 8 ++++++++ packages/api/src/router/log.ts | 7 +++++++ packages/api/src/router/user.ts | 12 ------------ packages/api/src/router/widgets/app.ts | 5 +++++ packages/api/src/router/widgets/media-server.ts | 6 ++++++ packages/api/src/router/widgets/smart-home.ts | 7 +++++++ 6 files changed, 33 insertions(+), 12 deletions(-) diff --git a/packages/api/src/router/cron-jobs.ts b/packages/api/src/router/cron-jobs.ts index 598f25d5f..feb88682e 100644 --- a/packages/api/src/router/cron-jobs.ts +++ b/packages/api/src/router/cron-jobs.ts @@ -21,14 +21,22 @@ export const cronJobsRouter = createTRPCRouter({ }), subscribeToStatusUpdates: publicProcedure.subscription(() => { return observable((emit) => { + let isConnectionClosed = false; + for (const job of jobGroup.getJobRegistry().values()) { const channel = createCronJobStatusChannel(job.name); channel.subscribe((data) => { + if (isConnectionClosed) return; + emit.next(data); }); } logger.info("A tRPC client has connected to the cron job status updates procedure"); + + return () => { + isConnectionClosed = true; + }; }); }), }); diff --git a/packages/api/src/router/log.ts b/packages/api/src/router/log.ts index ac46ba9a3..1586e4239 100644 --- a/packages/api/src/router/log.ts +++ b/packages/api/src/router/log.ts @@ -9,10 +9,17 @@ import { createTRPCRouter, publicProcedure } from "../trpc"; export const logRouter = createTRPCRouter({ subscribe: publicProcedure.subscription(() => { return observable((emit) => { + let isConnectionClosed = false; + loggingChannel.subscribe((data) => { + if (isConnectionClosed) return; emit.next(data); }); logger.info("A tRPC client has connected to the logging procedure"); + + return () => { + isConnectionClosed = true; + }; }); }), }); diff --git a/packages/api/src/router/user.ts b/packages/api/src/router/user.ts index d73afa8c8..b82c8f14f 100644 --- a/packages/api/src/router/user.ts +++ b/packages/api/src/router/user.ts @@ -1,11 +1,9 @@ import { TRPCError } from "@trpc/server"; -import { observable } from "@trpc/server/observable"; import { createSaltAsync, hashPasswordAsync } from "@homarr/auth"; import type { Database } from "@homarr/db"; import { and, createId, eq, schema } from "@homarr/db"; import { groupMembers, groupPermissions, groups, invites, users } from "@homarr/db/schema/sqlite"; -import { exampleChannel } from "@homarr/redis"; import { validation, z } from "@homarr/validation"; import { createTRPCRouter, protectedProcedure, publicProcedure } from "../trpc"; @@ -232,16 +230,6 @@ export const userRouter = createTRPCRouter({ }) .where(eq(users.id, input.userId)); }), - setMessage: publicProcedure.input(z.string()).mutation(async ({ input }) => { - await exampleChannel.publishAsync({ message: input }); - }), - test: publicProcedure.subscription(() => { - return observable<{ message: string }>((emit) => { - exampleChannel.subscribe((message) => { - emit.next(message); - }); - }); - }), }); const createUserAsync = async (db: Database, input: z.infer) => { diff --git a/packages/api/src/router/widgets/app.ts b/packages/api/src/router/widgets/app.ts index 8bfc51868..142fe1972 100644 --- a/packages/api/src/router/widgets/app.ts +++ b/packages/api/src/router/widgets/app.ts @@ -27,14 +27,19 @@ export const appRouter = createTRPCRouter({ const pingResult = await sendPingRequestAsync(input.url); return observable<{ url: string; statusCode: number } | { url: string; error: string }>((emit) => { + let isConnectionClosed = false; + emit.next({ url: input.url, ...pingResult }); pingChannel.subscribe((message) => { + if (isConnectionClosed) return; + // Only emit if same url if (message.url !== input.url) return; emit.next(message); }); return () => { + isConnectionClosed = true; void pingUrlChannel.removeAsync(input.url); }; }); diff --git a/packages/api/src/router/widgets/media-server.ts b/packages/api/src/router/widgets/media-server.ts index da43e8d20..4eba38250 100644 --- a/packages/api/src/router/widgets/media-server.ts +++ b/packages/api/src/router/widgets/media-server.ts @@ -25,15 +25,21 @@ export const mediaServerRouter = createTRPCRouter({ .unstable_concat(createManyIntegrationMiddleware("jellyfin", "plex")) .subscription(({ ctx }) => { return observable<{ integrationId: string; data: StreamSession[] }>((emit) => { + let isConnectionClosed = false; + for (const integration of ctx.integrations) { const channel = createItemAndIntegrationChannel("mediaServer", integration.id); void channel.subscribeAsync((sessions) => { + if (isConnectionClosed) return; emit.next({ integrationId: integration.id, data: sessions, }); }); } + return () => { + isConnectionClosed = true; + }; }); }), }); diff --git a/packages/api/src/router/widgets/smart-home.ts b/packages/api/src/router/widgets/smart-home.ts index fa3ffb415..8ba8b9e56 100644 --- a/packages/api/src/router/widgets/smart-home.ts +++ b/packages/api/src/router/widgets/smart-home.ts @@ -13,12 +13,19 @@ export const smartHomeRouter = createTRPCRouter({ entityId: string; state: string; }>((emit) => { + let isConnectionClosed = false; + homeAssistantEntityState.subscribe((message) => { + if (isConnectionClosed) return; if (message.entityId !== input.entityId) { return; } emit.next(message); }); + + return () => { + isConnectionClosed = true; + }; }); }), switchEntity: publicProcedure