mirror of
https://github.com/ajnart/homarr.git
synced 2026-02-28 09:21:00 +01:00
refactor: add request handlers for centralized cached requests (#1504)
* feat: add object base64 hash method * chore: add script to add package * feat: add request-handler package * wip: add request handlers for all jobs and widget api procedures * wip: remove errors shown in logs, add missing decryption for secrets in cached-request-job-handler * wip: highly improve request handler, add request handlers for calendar, media-server, indexer-manager and more, add support for multiple inputs from job handler creator * refactor: move media-server requests to request-handler, add invalidation logic for dns-hole and media requests * refactor: remove unused integration item middleware * feat: add invalidation to switch entity action of smart-home * fix: lint issues * chore: use integration-kind-by-category instead of union for request-handlers * fix: build not working for tasks and websocket * refactor: add more logs * refactor: readd timestamp logic for diconnect status * fix: lint and typecheck issue * chore: address pull request feedback
This commit is contained in:
@@ -150,83 +150,6 @@ export const createManyIntegrationMiddleware = <TKind extends IntegrationKind>(
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
* Creates a middleware that provides the integrations and their items in the context that are of the specified kinds and have the specified item
|
||||
* It also ensures that the user has permission to perform the specified action on the integrations
|
||||
* @param action query for showing data or interact for mutating data
|
||||
* @param kinds kinds of integrations that are supported
|
||||
* @returns middleware that can be used with trpc
|
||||
* @example publicProcedure.unstable_concat(createManyIntegrationOfOneItemMiddleware("query", "piHole", "homeAssistant")).query(...)
|
||||
* @throws TRPCError NOT_FOUND if the integration for the item was not found
|
||||
* @throws TRPCError FORBIDDEN if the user does not have permission to perform the specified action on at least one of the specified integrations
|
||||
*/
|
||||
export const createManyIntegrationOfOneItemMiddleware = <TKind extends IntegrationKind>(
|
||||
action: IntegrationAction,
|
||||
...kinds: AtLeastOneOf<TKind> // Ensure at least one kind is provided
|
||||
) => {
|
||||
return publicProcedure
|
||||
.input(z.object({ integrationIds: z.array(z.string()).min(1), itemId: z.string() }))
|
||||
.use(async ({ ctx, input, next }) => {
|
||||
const dbIntegrations = await ctx.db.query.integrations.findMany({
|
||||
where: and(inArray(integrations.id, input.integrationIds), inArray(integrations.kind, kinds)),
|
||||
with: {
|
||||
secrets: true,
|
||||
items: {
|
||||
with: {
|
||||
item: {
|
||||
with: {
|
||||
section: {
|
||||
columns: {
|
||||
boardId: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
userPermissions: true,
|
||||
groupPermissions: true,
|
||||
},
|
||||
});
|
||||
|
||||
const offset = input.integrationIds.length - dbIntegrations.length;
|
||||
if (offset !== 0) {
|
||||
throw new TRPCError({
|
||||
code: "NOT_FOUND",
|
||||
message: `${offset} of the specified integrations not found or not of kinds ${kinds.join(",")}`,
|
||||
});
|
||||
}
|
||||
|
||||
await throwIfActionIsNotAllowedAsync(action, ctx.db, dbIntegrations, ctx.session);
|
||||
|
||||
const dbIntegrationWithItem = dbIntegrations.filter((integration) =>
|
||||
integration.items.some((item) => item.itemId === input.itemId),
|
||||
);
|
||||
|
||||
if (dbIntegrationWithItem.length === 0) {
|
||||
throw new TRPCError({
|
||||
code: "NOT_FOUND",
|
||||
message: "Integrations for item were not found",
|
||||
});
|
||||
}
|
||||
|
||||
return next({
|
||||
ctx: {
|
||||
integrations: dbIntegrationWithItem.map(
|
||||
({ secrets, kind, groupPermissions: _ignore1, userPermissions: _ignore2, ...rest }) => ({
|
||||
...rest,
|
||||
kind: kind as TKind,
|
||||
decryptedSecrets: secrets.map((secret) => ({
|
||||
...secret,
|
||||
value: decryptSecret(secret.value),
|
||||
})),
|
||||
}),
|
||||
),
|
||||
},
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
* Throws a TRPCError FORBIDDEN if the user does not have permission to perform the specified action on at least one of the specified integrations
|
||||
* @param action action to perform
|
||||
|
||||
@@ -1,20 +1,24 @@
|
||||
import { getIntegrationKindsByCategory } from "@homarr/definitions";
|
||||
import type { CalendarEvent } from "@homarr/integrations/types";
|
||||
import { createItemAndIntegrationChannel } from "@homarr/redis";
|
||||
import { radarrReleaseTypes } from "@homarr/integrations/types";
|
||||
import { calendarMonthRequestHandler } from "@homarr/request-handler/calendar";
|
||||
import { z } from "@homarr/validation";
|
||||
|
||||
import { createManyIntegrationOfOneItemMiddleware } from "../../middlewares/integration";
|
||||
import { createManyIntegrationMiddleware } from "../../middlewares/integration";
|
||||
import { createTRPCRouter, publicProcedure } from "../../trpc";
|
||||
|
||||
export const calendarRouter = createTRPCRouter({
|
||||
findAllEvents: publicProcedure
|
||||
.unstable_concat(createManyIntegrationOfOneItemMiddleware("query", ...getIntegrationKindsByCategory("calendar")))
|
||||
.query(async ({ ctx }) => {
|
||||
const result = await Promise.all(
|
||||
ctx.integrations.flatMap(async (integration) => {
|
||||
const cache = createItemAndIntegrationChannel<CalendarEvent[]>("calendar", integration.id);
|
||||
return await cache.getAsync();
|
||||
.input(z.object({ year: z.number(), month: z.number(), releaseType: z.array(z.enum(radarrReleaseTypes)) }))
|
||||
.unstable_concat(createManyIntegrationMiddleware("query", ...getIntegrationKindsByCategory("calendar")))
|
||||
.query(async ({ ctx, input }) => {
|
||||
const results = await Promise.all(
|
||||
ctx.integrations.map(async (integration) => {
|
||||
const innerHandler = calendarMonthRequestHandler.handler(integration, input);
|
||||
const { data } = await innerHandler.getCachedOrUpdatedDataAsync({ forceUpdate: false });
|
||||
|
||||
return data;
|
||||
}),
|
||||
);
|
||||
return result.filter((item) => item !== null).flatMap((item) => item.data);
|
||||
return results.flat();
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -2,31 +2,33 @@ import { observable } from "@trpc/server/observable";
|
||||
|
||||
import type { Modify } from "@homarr/common/types";
|
||||
import type { Integration } from "@homarr/db/schema/sqlite";
|
||||
import type { IntegrationKindByCategory, WidgetKind } from "@homarr/definitions";
|
||||
import type { IntegrationKindByCategory } from "@homarr/definitions";
|
||||
import { getIntegrationKindsByCategory } from "@homarr/definitions";
|
||||
import { integrationCreator } from "@homarr/integrations";
|
||||
import type { DnsHoleSummary } from "@homarr/integrations/types";
|
||||
import { controlsInputSchema } from "@homarr/integrations/types";
|
||||
import { createItemAndIntegrationChannel } from "@homarr/redis";
|
||||
import { z } from "@homarr/validation";
|
||||
import { dnsHoleRequestHandler } from "@homarr/request-handler/dns-hole";
|
||||
|
||||
import { createManyIntegrationMiddleware, createOneIntegrationMiddleware } from "../../middlewares/integration";
|
||||
import { createTRPCRouter, publicProcedure } from "../../trpc";
|
||||
|
||||
export const dnsHoleRouter = createTRPCRouter({
|
||||
summary: publicProcedure
|
||||
.input(z.object({ widgetKind: z.enum(["dnsHoleSummary", "dnsHoleControls"]) }))
|
||||
.unstable_concat(createManyIntegrationMiddleware("query", ...getIntegrationKindsByCategory("dnsHole")))
|
||||
.query(async ({ input: { widgetKind }, ctx }) => {
|
||||
.query(async ({ ctx }) => {
|
||||
const results = await Promise.all(
|
||||
ctx.integrations.map(async ({ decryptedSecrets: _, ...integration }) => {
|
||||
const channel = createItemAndIntegrationChannel<DnsHoleSummary>(widgetKind, integration.id);
|
||||
const { data: summary, timestamp } = (await channel.getAsync()) ?? { data: null, timestamp: new Date(0) };
|
||||
ctx.integrations.map(async (integration) => {
|
||||
const innerHandler = dnsHoleRequestHandler.handler(integration, {});
|
||||
const { data, timestamp } = await innerHandler.getCachedOrUpdatedDataAsync({ forceUpdate: false });
|
||||
|
||||
return {
|
||||
integration,
|
||||
timestamp,
|
||||
summary,
|
||||
integration: {
|
||||
id: integration.id,
|
||||
name: integration.name,
|
||||
kind: integration.kind,
|
||||
updatedAt: timestamp,
|
||||
},
|
||||
summary: data,
|
||||
};
|
||||
}),
|
||||
);
|
||||
@@ -34,22 +36,19 @@ export const dnsHoleRouter = createTRPCRouter({
|
||||
}),
|
||||
|
||||
subscribeToSummary: publicProcedure
|
||||
.input(z.object({ widgetKind: z.enum(["dnsHoleSummary", "dnsHoleControls"]) }))
|
||||
.unstable_concat(createManyIntegrationMiddleware("query", ...getIntegrationKindsByCategory("dnsHole")))
|
||||
.subscription(({ input: { widgetKind }, ctx }) => {
|
||||
.subscription(({ ctx }) => {
|
||||
return observable<{
|
||||
integration: Modify<Integration, { kind: IntegrationKindByCategory<"dnsHole"> }>;
|
||||
timestamp: Date;
|
||||
summary: DnsHoleSummary;
|
||||
}>((emit) => {
|
||||
const unsubscribes: (() => void)[] = [];
|
||||
for (const integrationWithSecrets of ctx.integrations) {
|
||||
const { decryptedSecrets: _, ...integration } = integrationWithSecrets;
|
||||
const channel = createItemAndIntegrationChannel<DnsHoleSummary>(widgetKind as WidgetKind, integration.id);
|
||||
const unsubscribe = channel.subscribe((summary) => {
|
||||
const innerHandler = dnsHoleRequestHandler.handler(integrationWithSecrets, {});
|
||||
const unsubscribe = innerHandler.subscribe((summary) => {
|
||||
emit.next({
|
||||
integration,
|
||||
timestamp: new Date(),
|
||||
summary,
|
||||
});
|
||||
});
|
||||
@@ -68,6 +67,12 @@ export const dnsHoleRouter = createTRPCRouter({
|
||||
.mutation(async ({ ctx: { integration } }) => {
|
||||
const client = integrationCreator(integration);
|
||||
await client.enableAsync();
|
||||
|
||||
const innerHandler = dnsHoleRequestHandler.handler(integration, {});
|
||||
// We need to wait for the integration to be enabled before invalidating the cache
|
||||
await new Promise<void>((resolve) => {
|
||||
setTimeout(() => void innerHandler.invalidateAsync().then(resolve), 1000);
|
||||
});
|
||||
}),
|
||||
|
||||
disable: publicProcedure
|
||||
@@ -76,5 +81,11 @@ export const dnsHoleRouter = createTRPCRouter({
|
||||
.mutation(async ({ ctx: { integration }, input }) => {
|
||||
const client = integrationCreator(integration);
|
||||
await client.disableAsync(input.duration);
|
||||
|
||||
const innerHandler = dnsHoleRequestHandler.handler(integration, {});
|
||||
// We need to wait for the integration to be disabled before invalidating the cache
|
||||
await new Promise<void>((resolve) => {
|
||||
setTimeout(() => void innerHandler.invalidateAsync().then(resolve), 1000);
|
||||
});
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -6,7 +6,7 @@ import type { IntegrationKindByCategory } from "@homarr/definitions";
|
||||
import { getIntegrationKindsByCategory } from "@homarr/definitions";
|
||||
import type { DownloadClientJobsAndStatus } from "@homarr/integrations";
|
||||
import { downloadClientItemSchema, integrationCreator } from "@homarr/integrations";
|
||||
import { createItemAndIntegrationChannel } from "@homarr/redis";
|
||||
import { downloadClientRequestHandler } from "@homarr/request-handler/downloads";
|
||||
import { z } from "@homarr/validation";
|
||||
|
||||
import type { IntegrationAction } from "../../middlewares/integration";
|
||||
@@ -21,12 +21,18 @@ export const downloadsRouter = createTRPCRouter({
|
||||
.unstable_concat(createDownloadClientIntegrationMiddleware("query"))
|
||||
.query(async ({ ctx }) => {
|
||||
return await Promise.all(
|
||||
ctx.integrations.map(async ({ decryptedSecrets: _, ...integration }) => {
|
||||
const channel = createItemAndIntegrationChannel<DownloadClientJobsAndStatus>("downloads", integration.id);
|
||||
const { data, timestamp } = (await channel.getAsync()) ?? { data: null, timestamp: new Date(0) };
|
||||
ctx.integrations.map(async (integration) => {
|
||||
const innerHandler = downloadClientRequestHandler.handler(integration, {});
|
||||
|
||||
const { data, timestamp } = await innerHandler.getCachedOrUpdatedDataAsync({ forceUpdate: false });
|
||||
|
||||
return {
|
||||
integration,
|
||||
timestamp,
|
||||
integration: {
|
||||
id: integration.id,
|
||||
name: integration.name,
|
||||
kind: integration.kind,
|
||||
updatedAt: timestamp,
|
||||
},
|
||||
data,
|
||||
};
|
||||
}),
|
||||
@@ -37,17 +43,15 @@ export const downloadsRouter = createTRPCRouter({
|
||||
.subscription(({ ctx }) => {
|
||||
return observable<{
|
||||
integration: Modify<Integration, { kind: IntegrationKindByCategory<"downloadClient"> }>;
|
||||
timestamp: Date;
|
||||
data: DownloadClientJobsAndStatus;
|
||||
}>((emit) => {
|
||||
const unsubscribes: (() => void)[] = [];
|
||||
for (const integrationWithSecrets of ctx.integrations) {
|
||||
const { decryptedSecrets: _, ...integration } = integrationWithSecrets;
|
||||
const channel = createItemAndIntegrationChannel<DownloadClientJobsAndStatus>("downloads", integration.id);
|
||||
const unsubscribe = channel.subscribe((data) => {
|
||||
const innerHandler = downloadClientRequestHandler.handler(integrationWithSecrets, {});
|
||||
const unsubscribe = innerHandler.subscribe((data) => {
|
||||
emit.next({
|
||||
integration,
|
||||
timestamp: new Date(),
|
||||
data,
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { observable } from "@trpc/server/observable";
|
||||
|
||||
import type { HealthMonitoring } from "@homarr/integrations";
|
||||
import { createItemAndIntegrationChannel } from "@homarr/redis";
|
||||
import { systemInfoRequestHandler } from "@homarr/request-handler/health-monitoring";
|
||||
|
||||
import { createManyIntegrationMiddleware } from "../../middlewares/integration";
|
||||
import { createTRPCRouter, publicProcedure } from "../../trpc";
|
||||
@@ -12,14 +12,14 @@ export const healthMonitoringRouter = createTRPCRouter({
|
||||
.query(async ({ ctx }) => {
|
||||
return await Promise.all(
|
||||
ctx.integrations.map(async (integration) => {
|
||||
const channel = createItemAndIntegrationChannel<HealthMonitoring>("healthMonitoring", integration.id);
|
||||
const { data: healthInfo, timestamp } = (await channel.getAsync()) ?? { data: null, timestamp: new Date(0) };
|
||||
const innerHandler = systemInfoRequestHandler.handler(integration, {});
|
||||
const { data, timestamp } = await innerHandler.getCachedOrUpdatedDataAsync({ forceUpdate: false });
|
||||
|
||||
return {
|
||||
integrationId: integration.id,
|
||||
integrationName: integration.name,
|
||||
healthInfo,
|
||||
timestamp,
|
||||
healthInfo: data,
|
||||
updatedAt: timestamp,
|
||||
};
|
||||
}),
|
||||
);
|
||||
@@ -31,8 +31,8 @@ export const healthMonitoringRouter = createTRPCRouter({
|
||||
return observable<{ integrationId: string; healthInfo: HealthMonitoring; timestamp: Date }>((emit) => {
|
||||
const unsubscribes: (() => void)[] = [];
|
||||
for (const integration of ctx.integrations) {
|
||||
const channel = createItemAndIntegrationChannel<HealthMonitoring>("healthMonitoring", integration.id);
|
||||
const unsubscribe = channel.subscribe((healthInfo) => {
|
||||
const innerHandler = systemInfoRequestHandler.handler(integration, {});
|
||||
const unsubscribe = innerHandler.subscribe((healthInfo) => {
|
||||
emit.next({
|
||||
integrationId: integration.id,
|
||||
healthInfo,
|
||||
|
||||
@@ -5,7 +5,7 @@ import { getIntegrationKindsByCategory } from "@homarr/definitions";
|
||||
import { integrationCreator } from "@homarr/integrations";
|
||||
import type { Indexer } from "@homarr/integrations/types";
|
||||
import { logger } from "@homarr/log";
|
||||
import { createItemAndIntegrationChannel } from "@homarr/redis";
|
||||
import { indexerManagerRequestHandler } from "@homarr/request-handler/indexer-manager";
|
||||
|
||||
import type { IntegrationAction } from "../../middlewares/integration";
|
||||
import { createManyIntegrationMiddleware } from "../../middlewares/integration";
|
||||
@@ -20,14 +20,8 @@ export const indexerManagerRouter = createTRPCRouter({
|
||||
.query(async ({ ctx }) => {
|
||||
const results = await Promise.all(
|
||||
ctx.integrations.map(async (integration) => {
|
||||
const client = integrationCreator(integration);
|
||||
const indexers = await client.getIndexersAsync().catch((err) => {
|
||||
logger.error("indexer-manager router - ", err);
|
||||
throw new TRPCError({
|
||||
code: "INTERNAL_SERVER_ERROR",
|
||||
message: `Failed to fetch indexers for ${integration.name} (${integration.id})`,
|
||||
});
|
||||
});
|
||||
const innerHandler = indexerManagerRequestHandler.handler(integration, {});
|
||||
const { data: indexers } = await innerHandler.getCachedOrUpdatedDataAsync({ forceUpdate: false });
|
||||
|
||||
return {
|
||||
integrationId: integration.id,
|
||||
@@ -43,11 +37,11 @@ export const indexerManagerRouter = createTRPCRouter({
|
||||
.subscription(({ ctx }) => {
|
||||
return observable<{ integrationId: string; indexers: Indexer[] }>((emit) => {
|
||||
const unsubscribes: (() => void)[] = [];
|
||||
for (const integration of ctx.integrations) {
|
||||
const channel = createItemAndIntegrationChannel<Indexer[]>("indexerManager", integration.id);
|
||||
const unsubscribe = channel.subscribe((indexers) => {
|
||||
for (const integrationWithSecrets of ctx.integrations) {
|
||||
const innerHandler = indexerManagerRequestHandler.handler(integrationWithSecrets, {});
|
||||
const unsubscribe = innerHandler.subscribe((indexers) => {
|
||||
emit.next({
|
||||
integrationId: integration.id,
|
||||
integrationId: integrationWithSecrets.id,
|
||||
indexers,
|
||||
});
|
||||
});
|
||||
@@ -60,7 +54,6 @@ export const indexerManagerRouter = createTRPCRouter({
|
||||
};
|
||||
});
|
||||
}),
|
||||
|
||||
testAllIndexers: publicProcedure
|
||||
.unstable_concat(createIndexerManagerIntegrationMiddleware("interact"))
|
||||
.mutation(async ({ ctx }) => {
|
||||
|
||||
@@ -1,53 +1,110 @@
|
||||
import { observable } from "@trpc/server/observable";
|
||||
|
||||
import { getIntegrationKindsByCategory } from "@homarr/definitions";
|
||||
import type { MediaRequestList, MediaRequestStats } from "@homarr/integrations";
|
||||
import { integrationCreator } from "@homarr/integrations";
|
||||
import { createItemAndIntegrationChannel } from "@homarr/redis";
|
||||
import { integrationCreator, MediaRequestStatus } from "@homarr/integrations";
|
||||
import type { MediaRequest } from "@homarr/integrations/types";
|
||||
import { mediaRequestListRequestHandler } from "@homarr/request-handler/media-request-list";
|
||||
import { mediaRequestStatsRequestHandler } from "@homarr/request-handler/media-request-stats";
|
||||
import { z } from "@homarr/validation";
|
||||
|
||||
import {
|
||||
createManyIntegrationOfOneItemMiddleware,
|
||||
createOneIntegrationMiddleware,
|
||||
} from "../../middlewares/integration";
|
||||
import { createManyIntegrationMiddleware, createOneIntegrationMiddleware } from "../../middlewares/integration";
|
||||
import { createTRPCRouter, protectedProcedure, publicProcedure } from "../../trpc";
|
||||
|
||||
export const mediaRequestsRouter = createTRPCRouter({
|
||||
getLatestRequests: publicProcedure
|
||||
.unstable_concat(
|
||||
createManyIntegrationOfOneItemMiddleware("query", ...getIntegrationKindsByCategory("mediaRequest")),
|
||||
)
|
||||
.query(async ({ input }) => {
|
||||
return await Promise.all(
|
||||
input.integrationIds.map(async (integrationId) => {
|
||||
const channel = createItemAndIntegrationChannel<MediaRequestList>("mediaRequests-requestList", integrationId);
|
||||
return await channel.getAsync();
|
||||
.unstable_concat(createManyIntegrationMiddleware("query", ...getIntegrationKindsByCategory("mediaRequest")))
|
||||
.query(async ({ ctx }) => {
|
||||
const results = await Promise.all(
|
||||
ctx.integrations.map(async (integration) => {
|
||||
const innerHandler = mediaRequestListRequestHandler.handler(integration, {});
|
||||
const { data } = await innerHandler.getCachedOrUpdatedDataAsync({ forceUpdate: false });
|
||||
return {
|
||||
integration: {
|
||||
id: integration.id,
|
||||
name: integration.name,
|
||||
kind: integration.kind,
|
||||
},
|
||||
data,
|
||||
};
|
||||
}),
|
||||
);
|
||||
return results
|
||||
.flatMap(({ data, integration }) => data.map((request) => ({ ...request, integrationId: integration.id })))
|
||||
.sort(({ status: statusA }, { status: statusB }) => {
|
||||
if (statusA === MediaRequestStatus.PendingApproval) {
|
||||
return -1;
|
||||
}
|
||||
if (statusB === MediaRequestStatus.PendingApproval) {
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
});
|
||||
}),
|
||||
subscribeToLatestRequests: publicProcedure
|
||||
.unstable_concat(createManyIntegrationMiddleware("query", ...getIntegrationKindsByCategory("mediaRequest")))
|
||||
.subscription(({ ctx }) => {
|
||||
return observable<{
|
||||
integrationId: string;
|
||||
requests: MediaRequest[];
|
||||
}>((emit) => {
|
||||
const unsubscribes: (() => void)[] = [];
|
||||
for (const integrationWithSecrets of ctx.integrations) {
|
||||
const { decryptedSecrets: _, ...integration } = integrationWithSecrets;
|
||||
const innerHandler = mediaRequestListRequestHandler.handler(integrationWithSecrets, {});
|
||||
const unsubscribe = innerHandler.subscribe((requests) => {
|
||||
emit.next({
|
||||
integrationId: integration.id,
|
||||
requests,
|
||||
});
|
||||
});
|
||||
unsubscribes.push(unsubscribe);
|
||||
}
|
||||
return () => {
|
||||
unsubscribes.forEach((unsubscribe) => {
|
||||
unsubscribe();
|
||||
});
|
||||
};
|
||||
});
|
||||
}),
|
||||
getStats: publicProcedure
|
||||
.unstable_concat(
|
||||
createManyIntegrationOfOneItemMiddleware("query", ...getIntegrationKindsByCategory("mediaRequest")),
|
||||
)
|
||||
.query(async ({ input }) => {
|
||||
return await Promise.all(
|
||||
input.integrationIds.map(async (integrationId) => {
|
||||
const channel = createItemAndIntegrationChannel<MediaRequestStats>(
|
||||
"mediaRequests-requestStats",
|
||||
integrationId,
|
||||
);
|
||||
return await channel.getAsync();
|
||||
.unstable_concat(createManyIntegrationMiddleware("query", ...getIntegrationKindsByCategory("mediaRequest")))
|
||||
.query(async ({ ctx }) => {
|
||||
const results = await Promise.all(
|
||||
ctx.integrations.map(async (integration) => {
|
||||
const innerHandler = mediaRequestStatsRequestHandler.handler(integration, {});
|
||||
const { data } = await innerHandler.getCachedOrUpdatedDataAsync({ forceUpdate: false });
|
||||
return {
|
||||
integration: {
|
||||
id: integration.id,
|
||||
name: integration.name,
|
||||
kind: integration.kind,
|
||||
},
|
||||
data,
|
||||
};
|
||||
}),
|
||||
);
|
||||
return {
|
||||
stats: results.flatMap((result) => result.data.stats),
|
||||
users: results
|
||||
.map((result) => result.data.users.map((user) => ({ ...user, integration: result.integration })))
|
||||
.flat()
|
||||
.sort(({ requestCount: countA }, { requestCount: countB }) => countB - countA),
|
||||
integrations: results.map((result) => result.integration),
|
||||
};
|
||||
}),
|
||||
answerRequest: protectedProcedure
|
||||
.unstable_concat(createOneIntegrationMiddleware("interact", ...getIntegrationKindsByCategory("mediaRequest")))
|
||||
.input(z.object({ requestId: z.number(), answer: z.enum(["approve", "decline"]) }))
|
||||
.mutation(async ({ ctx: { integration }, input }) => {
|
||||
const integrationInstance = integrationCreator(integration);
|
||||
const innerHandler = mediaRequestListRequestHandler.handler(integration, {});
|
||||
|
||||
if (input.answer === "approve") {
|
||||
await integrationInstance.approveRequestAsync(input.requestId);
|
||||
await innerHandler.invalidateAsync();
|
||||
return;
|
||||
}
|
||||
await integrationInstance.declineRequestAsync(input.requestId);
|
||||
await innerHandler.invalidateAsync();
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -2,7 +2,7 @@ import { observable } from "@trpc/server/observable";
|
||||
|
||||
import { getIntegrationKindsByCategory } from "@homarr/definitions";
|
||||
import type { StreamSession } from "@homarr/integrations";
|
||||
import { createItemAndIntegrationChannel } from "@homarr/redis";
|
||||
import { mediaServerRequestHandler } from "@homarr/request-handler/media-server";
|
||||
|
||||
import type { IntegrationAction } from "../../middlewares/integration";
|
||||
import { createManyIntegrationMiddleware } from "../../middlewares/integration";
|
||||
@@ -17,11 +17,11 @@ export const mediaServerRouter = createTRPCRouter({
|
||||
.query(async ({ ctx }) => {
|
||||
return await Promise.all(
|
||||
ctx.integrations.map(async (integration) => {
|
||||
const channel = createItemAndIntegrationChannel<StreamSession[]>("mediaServer", integration.id);
|
||||
const data = await channel.getAsync();
|
||||
const innerHandler = mediaServerRequestHandler.handler(integration, {});
|
||||
const { data } = await innerHandler.getCachedOrUpdatedDataAsync({ forceUpdate: false });
|
||||
return {
|
||||
integrationId: integration.id,
|
||||
sessions: data?.data ?? [],
|
||||
sessions: data,
|
||||
};
|
||||
}),
|
||||
);
|
||||
@@ -32,8 +32,9 @@ export const mediaServerRouter = createTRPCRouter({
|
||||
return observable<{ integrationId: string; data: StreamSession[] }>((emit) => {
|
||||
const unsubscribes: (() => void)[] = [];
|
||||
for (const integration of ctx.integrations) {
|
||||
const channel = createItemAndIntegrationChannel<StreamSession[]>("mediaServer", integration.id);
|
||||
const unsubscribe = channel.subscribe((sessions) => {
|
||||
const innerHandler = mediaServerRequestHandler.handler(integration, {});
|
||||
|
||||
const unsubscribe = innerHandler.subscribe((sessions) => {
|
||||
emit.next({
|
||||
integrationId: integration.id,
|
||||
data: sessions,
|
||||
|
||||
@@ -2,7 +2,7 @@ import { observable } from "@trpc/server/observable";
|
||||
|
||||
import { getIntegrationKindsByCategory } from "@homarr/definitions";
|
||||
import { integrationCreator } from "@homarr/integrations";
|
||||
import { homeAssistantEntityState } from "@homarr/redis";
|
||||
import { smartHomeEntityStateRequestHandler } from "@homarr/request-handler/smart-home-entity-state";
|
||||
import { z } from "@homarr/validation";
|
||||
|
||||
import type { IntegrationAction } from "../../middlewares/integration";
|
||||
@@ -13,29 +13,45 @@ const createSmartHomeIntegrationMiddleware = (action: IntegrationAction) =>
|
||||
createOneIntegrationMiddleware(action, ...getIntegrationKindsByCategory("smartHomeServer"));
|
||||
|
||||
export const smartHomeRouter = createTRPCRouter({
|
||||
subscribeEntityState: publicProcedure.input(z.object({ entityId: z.string() })).subscription(({ input }) => {
|
||||
return observable<{
|
||||
entityId: string;
|
||||
state: string;
|
||||
}>((emit) => {
|
||||
const unsubscribe = homeAssistantEntityState.subscribe((message) => {
|
||||
if (message.entityId !== input.entityId) {
|
||||
return;
|
||||
}
|
||||
emit.next(message);
|
||||
});
|
||||
entityState: publicProcedure
|
||||
.input(z.object({ entityId: z.string() }))
|
||||
.unstable_concat(createSmartHomeIntegrationMiddleware("query"))
|
||||
.query(async ({ ctx: { integration }, input }) => {
|
||||
const innerHandler = smartHomeEntityStateRequestHandler.handler(integration, { entityId: input.entityId });
|
||||
const { data } = await innerHandler.getCachedOrUpdatedDataAsync({ forceUpdate: false });
|
||||
return data;
|
||||
}),
|
||||
subscribeEntityState: publicProcedure
|
||||
.unstable_concat(createSmartHomeIntegrationMiddleware("query"))
|
||||
.input(z.object({ entityId: z.string() }))
|
||||
.subscription(({ input, ctx }) => {
|
||||
return observable<{
|
||||
entityId: string;
|
||||
state: string;
|
||||
}>((emit) => {
|
||||
const innerHandler = smartHomeEntityStateRequestHandler.handler(ctx.integration, {
|
||||
entityId: input.entityId,
|
||||
});
|
||||
const unsubscribe = innerHandler.subscribe((state) => {
|
||||
emit.next({ state, entityId: input.entityId });
|
||||
});
|
||||
|
||||
return () => {
|
||||
unsubscribe();
|
||||
};
|
||||
});
|
||||
}),
|
||||
return () => {
|
||||
unsubscribe();
|
||||
};
|
||||
});
|
||||
}),
|
||||
switchEntity: publicProcedure
|
||||
.unstable_concat(createSmartHomeIntegrationMiddleware("interact"))
|
||||
.input(z.object({ entityId: z.string() }))
|
||||
.mutation(async ({ ctx: { integration }, input }) => {
|
||||
const client = integrationCreator(integration);
|
||||
return await client.triggerToggleAsync(input.entityId);
|
||||
const success = await client.triggerToggleAsync(input.entityId);
|
||||
|
||||
const innerHandler = smartHomeEntityStateRequestHandler.handler(integration, { entityId: input.entityId });
|
||||
await innerHandler.invalidateAsync();
|
||||
|
||||
return success;
|
||||
}),
|
||||
executeAutomation: publicProcedure
|
||||
.unstable_concat(createSmartHomeIntegrationMiddleware("interact"))
|
||||
|
||||
Reference in New Issue
Block a user