From 36e4224b3367b52d53496515c0bf79476089cb18 Mon Sep 17 00:00:00 2001 From: dswbx Date: Tue, 1 Apr 2025 11:19:55 +0200 Subject: [PATCH] refactored EventManager to run asyncs on call only, app defaults to run before response (#129) * refactored EventManager to run asyncs on call only, app defaults to run before response * fix tests --- app/__test__/app/App.spec.ts | 85 +++++++++++++++- app/__test__/core/EventManager.spec.ts | 14 +-- app/__test__/data/specs/Mutator.spec.ts | 3 + app/__test__/data/specs/Repository.spec.ts | 13 ++- app/__test__/media/Storage.spec.ts | 8 +- app/src/App.ts | 98 +++++++++++++------ .../cloudflare/cloudflare-workers.adapter.ts | 18 +++- app/src/adapter/cloudflare/modes/cached.ts | 14 ++- app/src/adapter/cloudflare/modes/durable.ts | 20 ++-- app/src/adapter/cloudflare/modes/fresh.ts | 17 +++- app/src/core/events/EventManager.ts | 18 ++-- 11 files changed, 244 insertions(+), 64 deletions(-) diff --git a/app/__test__/app/App.spec.ts b/app/__test__/app/App.spec.ts index c5e9794..860258a 100644 --- a/app/__test__/app/App.spec.ts +++ b/app/__test__/app/App.spec.ts @@ -1,6 +1,6 @@ import { describe, expect, mock, test } from "bun:test"; import type { ModuleBuildContext } from "../../src"; -import { type App, createApp } from "../../src/App"; +import { App, createApp } from "../../src/App"; import * as proto from "../../src/data/prototype"; describe("App", () => { @@ -51,4 +51,87 @@ describe("App", () => { expect(todos[0]?.title).toBe("ctx"); expect(todos[1]?.title).toBe("api"); }); + + test("lifecycle events are triggered", async () => { + const firstBoot = mock(() => null); + const configUpdate = mock(() => null); + const appBuilt = mock(() => null); + const appRequest = mock(() => null); + const beforeResponse = mock(() => null); + + const app = createApp(); + + app.emgr.onEvent( + App.Events.AppFirstBoot, + (event) => { + expect(event).toBeInstanceOf(App.Events.AppFirstBoot); + expect(event.params.app.version()).toBe(app.version()); + firstBoot(); + }, + "sync", + ); + app.emgr.onEvent( + App.Events.AppBuiltEvent, + (event) => { + expect(event).toBeInstanceOf(App.Events.AppBuiltEvent); + expect(event.params.app.version()).toBe(app.version()); + appBuilt(); + }, + "sync", + ); + app.emgr.onEvent( + App.Events.AppConfigUpdatedEvent, + () => { + configUpdate(); + }, + "sync", + ); + app.emgr.onEvent( + App.Events.AppRequest, + (event) => { + expect(event).toBeInstanceOf(App.Events.AppRequest); + expect(event.params.app.version()).toBe(app.version()); + expect(event.params.request).toBeInstanceOf(Request); + appRequest(); + }, + "sync", + ); + app.emgr.onEvent( + App.Events.AppBeforeResponse, + (event) => { + expect(event).toBeInstanceOf(App.Events.AppBeforeResponse); + expect(event.params.app.version()).toBe(app.version()); + expect(event.params.response).toBeInstanceOf(Response); + beforeResponse(); + }, + "sync", + ); + + await app.build(); + expect(firstBoot).toHaveBeenCalled(); + expect(appBuilt).toHaveBeenCalled(); + //expect(configUpdate).toHaveBeenCalled(); + expect(appRequest).not.toHaveBeenCalled(); + expect(beforeResponse).not.toHaveBeenCalled(); + }); + + test("emgr exec modes", async () => { + const called = mock(() => null); + const app = createApp({ + options: { + asyncEventsMode: "sync", + }, + }); + + // register async listener + app.emgr.onEvent(App.Events.AppFirstBoot, async () => { + called(); + }); + + await app.build(); + await app.server.request(new Request("http://localhost")); + + // expect async listeners to be executed sync after request + expect(called).toHaveBeenCalled(); + }); }); diff --git a/app/__test__/core/EventManager.spec.ts b/app/__test__/core/EventManager.spec.ts index 995ebfa..3d8b981 100644 --- a/app/__test__/core/EventManager.spec.ts +++ b/app/__test__/core/EventManager.spec.ts @@ -70,6 +70,9 @@ describe("EventManager", async () => { new SpecialEvent({ foo: "bar" }); new InformationalEvent(); + // execute asyncs + await emgr.executeAsyncs(); + expect(call).toHaveBeenCalledTimes(2); expect(delayed).toHaveBeenCalled(); }); @@ -80,15 +83,11 @@ describe("EventManager", async () => { call(); return Promise.all(p); }; - const emgr = new EventManager( - { InformationalEvent }, - { - asyncExecutor, - }, - ); + const emgr = new EventManager({ InformationalEvent }); emgr.onEvent(InformationalEvent, async () => {}); await emgr.emit(new InformationalEvent()); + await emgr.executeAsyncs(asyncExecutor); expect(call).toHaveBeenCalled(); }); @@ -125,6 +124,9 @@ describe("EventManager", async () => { const e2 = await emgr.emit(new ReturnEvent({ foo: "bar" })); expect(e2.returned).toBe(true); expect(e2.params.foo).toBe("bar-1-0"); + + await emgr.executeAsyncs(); + expect(onInvalidReturn).toHaveBeenCalled(); expect(asyncEventCallback).toHaveBeenCalled(); }); diff --git a/app/__test__/data/specs/Mutator.spec.ts b/app/__test__/data/specs/Mutator.spec.ts index 4b3bee7..7110956 100644 --- a/app/__test__/data/specs/Mutator.spec.ts +++ b/app/__test__/data/specs/Mutator.spec.ts @@ -288,14 +288,17 @@ describe("[data] Mutator (Events)", async () => { test("events were fired", async () => { const { data } = await mutator.insertOne({ label: "test" }); + await mutator.emgr.executeAsyncs(); expect(events.has(MutatorEvents.MutatorInsertBefore.slug)).toBeTrue(); expect(events.has(MutatorEvents.MutatorInsertAfter.slug)).toBeTrue(); await mutator.updateOne(data.id, { label: "test2" }); + await mutator.emgr.executeAsyncs(); expect(events.has(MutatorEvents.MutatorUpdateBefore.slug)).toBeTrue(); expect(events.has(MutatorEvents.MutatorUpdateAfter.slug)).toBeTrue(); await mutator.deleteOne(data.id); + await mutator.emgr.executeAsyncs(); expect(events.has(MutatorEvents.MutatorDeleteBefore.slug)).toBeTrue(); expect(events.has(MutatorEvents.MutatorDeleteAfter.slug)).toBeTrue(); }); diff --git a/app/__test__/data/specs/Repository.spec.ts b/app/__test__/data/specs/Repository.spec.ts index 2a42b9e..982187c 100644 --- a/app/__test__/data/specs/Repository.spec.ts +++ b/app/__test__/data/specs/Repository.spec.ts @@ -198,22 +198,27 @@ describe("[data] Repository (Events)", async () => { }); test("events were fired", async () => { - await em.repository(items).findId(1); + const repo = em.repository(items); + await repo.findId(1); + await repo.emgr.executeAsyncs(); expect(events.has(RepositoryEvents.RepositoryFindOneBefore.slug)).toBeTrue(); expect(events.has(RepositoryEvents.RepositoryFindOneAfter.slug)).toBeTrue(); events.clear(); - await em.repository(items).findOne({ id: 1 }); + await repo.findOne({ id: 1 }); + await repo.emgr.executeAsyncs(); expect(events.has(RepositoryEvents.RepositoryFindOneBefore.slug)).toBeTrue(); expect(events.has(RepositoryEvents.RepositoryFindOneAfter.slug)).toBeTrue(); events.clear(); - await em.repository(items).findMany({ where: { id: 1 } }); + await repo.findMany({ where: { id: 1 } }); + await repo.emgr.executeAsyncs(); expect(events.has(RepositoryEvents.RepositoryFindManyBefore.slug)).toBeTrue(); expect(events.has(RepositoryEvents.RepositoryFindManyAfter.slug)).toBeTrue(); events.clear(); - await em.repository(items).findManyByReference(1, "categories"); + await repo.findManyByReference(1, "categories"); + await repo.emgr.executeAsyncs(); expect(events.has(RepositoryEvents.RepositoryFindManyBefore.slug)).toBeTrue(); expect(events.has(RepositoryEvents.RepositoryFindManyAfter.slug)).toBeTrue(); events.clear(); diff --git a/app/__test__/media/Storage.spec.ts b/app/__test__/media/Storage.spec.ts index f493606..1234123 100644 --- a/app/__test__/media/Storage.spec.ts +++ b/app/__test__/media/Storage.spec.ts @@ -1,8 +1,9 @@ import { describe, expect, test } from "bun:test"; -import { type FileBody, Storage, type StorageAdapter } from "../../src/media/storage/Storage"; +import { type FileBody, Storage } from "../../src/media/storage/Storage"; import * as StorageEvents from "../../src/media/storage/events"; +import { StorageAdapter } from "media"; -class TestAdapter implements StorageAdapter { +class TestAdapter extends StorageAdapter { files: Record = {}; getName() { @@ -61,7 +62,7 @@ describe("Storage", async () => { test("uploads a file", async () => { const { meta: { type, size }, - } = await storage.uploadFile("hello", "world.txt"); + } = await storage.uploadFile("hello" as any, "world.txt"); expect({ type, size }).toEqual({ type: "text/plain", size: 0 }); }); @@ -71,6 +72,7 @@ describe("Storage", async () => { }); test("events were fired", async () => { + await storage.emgr.executeAsyncs(); expect(events.has(StorageEvents.FileUploadedEvent.slug)).toBeTrue(); expect(events.has(StorageEvents.FileDeletedEvent.slug)).toBeTrue(); // @todo: file access must be tested in controllers diff --git a/app/src/App.ts b/app/src/App.ts index ac0ea1d..b4e0ea5 100644 --- a/app/src/App.ts +++ b/app/src/App.ts @@ -4,9 +4,10 @@ import { Event } from "core/events"; import { Connection, type LibSqlCredentials, LibsqlConnection } from "data"; import type { Hono } from "hono"; import { + ModuleManager, type InitialModuleConfigs, type ModuleBuildContext, - ModuleManager, + type ModuleConfigs, type ModuleManagerOptions, type Modules, } from "modules/ModuleManager"; @@ -16,6 +17,7 @@ import { SystemController } from "modules/server/SystemController"; // biome-ignore format: must be there import { Api, type ApiOptions } from "Api"; +import type { ServerEnv } from "modules/Controller"; export type AppPlugin = (app: App) => Promise | void; @@ -29,12 +31,25 @@ export class AppBuiltEvent extends AppEvent { export class AppFirstBoot extends AppEvent { static override slug = "app-first-boot"; } -export const AppEvents = { AppConfigUpdatedEvent, AppBuiltEvent, AppFirstBoot } as const; +export class AppRequest extends AppEvent<{ request: Request }> { + static override slug = "app-request"; +} +export class AppBeforeResponse extends AppEvent<{ request: Request; response: Response }> { + static override slug = "app-before-response"; +} +export const AppEvents = { + AppConfigUpdatedEvent, + AppBuiltEvent, + AppFirstBoot, + AppRequest, + AppBeforeResponse, +} as const; export type AppOptions = { plugins?: AppPlugin[]; seed?: (ctx: ModuleBuildContext & { app: App }) => Promise; manager?: Omit; + asyncEventsMode?: "sync" | "async" | "none"; }; export type CreateAppConfig = { connection?: @@ -70,35 +85,9 @@ export class App { this.modules = new ModuleManager(connection, { ...(options?.manager ?? {}), initial: _initialConfig, - onUpdated: async (key, config) => { - // if the EventManager was disabled, we assume we shouldn't - // respond to events, such as "onUpdated". - // this is important if multiple changes are done, and then build() is called manually - if (!this.emgr.enabled) { - $console.warn("App config updated, but event manager is disabled, skip."); - return; - } - - $console.log("App config updated", key); - // @todo: potentially double syncing - await this.build({ sync: true }); - await this.emgr.emit(new AppConfigUpdatedEvent({ app: this })); - }, - onFirstBoot: async () => { - $console.log("App first boot"); - this.trigger_first_boot = true; - }, - onServerInit: async (server) => { - server.use(async (c, next) => { - c.set("app", this); - await next(); - - try { - // gracefully add the app id - c.res.headers.set("X-bknd-id", this._id); - } catch (e) {} - }); - }, + onUpdated: this.onUpdated.bind(this), + onFirstBoot: this.onFirstBoot.bind(this), + onServerInit: this.onServerInit.bind(this), }); this.modules.ctx().emgr.registerEvents(AppEvents); } @@ -213,6 +202,53 @@ export class App { return new Api({ host: "http://localhost", ...(options ?? {}), fetcher }); } + + async onUpdated(module: Module, config: ModuleConfigs[Module]) { + // if the EventManager was disabled, we assume we shouldn't + // respond to events, such as "onUpdated". + // this is important if multiple changes are done, and then build() is called manually + if (!this.emgr.enabled) { + $console.warn("App config updated, but event manager is disabled, skip."); + return; + } + + $console.log("App config updated", module); + // @todo: potentially double syncing + await this.build({ sync: true }); + await this.emgr.emit(new AppConfigUpdatedEvent({ app: this })); + } + + async onFirstBoot() { + $console.log("App first boot"); + this.trigger_first_boot = true; + } + + async onServerInit(server: Hono) { + server.use(async (c, next) => { + c.set("app", this); + await this.emgr.emit(new AppRequest({ app: this, request: c.req.raw })); + await next(); + + try { + // gracefully add the app id + c.res.headers.set("X-bknd-id", this._id); + } catch (e) {} + + await this.emgr.emit( + new AppBeforeResponse({ app: this, request: c.req.raw, response: c.res }), + ); + + // execute collected async events (async by default) + switch (this.options?.asyncEventsMode ?? "async") { + case "sync": + await this.emgr.executeAsyncs(); + break; + case "async": + this.emgr.executeAsyncs(); + break; + } + }); + } } export function createApp(config: CreateAppConfig = {}) { diff --git a/app/src/adapter/cloudflare/cloudflare-workers.adapter.ts b/app/src/adapter/cloudflare/cloudflare-workers.adapter.ts index 7483d52..dec7d09 100644 --- a/app/src/adapter/cloudflare/cloudflare-workers.adapter.ts +++ b/app/src/adapter/cloudflare/cloudflare-workers.adapter.ts @@ -9,6 +9,7 @@ import { getBinding } from "./bindings"; import { getCached } from "./modes/cached"; import { getDurable } from "./modes/durable"; import { getFresh, getWarm } from "./modes/fresh"; +import type { CreateAppConfig } from "App"; export type CloudflareBkndConfig = FrameworkBkndConfig> & { mode?: "warm" | "fresh" | "cache" | "durable"; @@ -32,8 +33,14 @@ export type Context = { ctx: ExecutionContext; }; +export const constants = { + exec_async_event_id: "cf_register_waituntil", + cache_endpoint: "/__bknd/cache", + do_endpoint: "/__bknd/do", +}; + let media_registered: boolean = false; -export function makeCfConfig(config: CloudflareBkndConfig, context: Context) { +export function makeCfConfig(config: CloudflareBkndConfig, context: Context): CreateAppConfig { if (!media_registered) { registerMedia(context.env as any); media_registered = true; @@ -61,7 +68,14 @@ export function makeCfConfig(config: CloudflareBkndConfig, context: Context) { } } - return appConfig; + return { + ...appConfig, + options: { + ...appConfig.options, + // if not specified explicitly, disable it to use ExecutionContext's waitUntil + asyncEventsMode: config.options?.asyncEventsMode ?? "none", + }, + }; } export function serve(config: CloudflareBkndConfig = {}) { diff --git a/app/src/adapter/cloudflare/modes/cached.ts b/app/src/adapter/cloudflare/modes/cached.ts index c126ff7..78d8eb4 100644 --- a/app/src/adapter/cloudflare/modes/cached.ts +++ b/app/src/adapter/cloudflare/modes/cached.ts @@ -1,6 +1,6 @@ import { App } from "bknd"; import { createRuntimeApp } from "bknd/adapter"; -import { type CloudflareBkndConfig, type Context, makeCfConfig } from "../index"; +import { type CloudflareBkndConfig, constants, type Context, makeCfConfig } from "../index"; export async function getCached(config: CloudflareBkndConfig, { env, ctx, ...args }: Context) { const { kv } = config.bindings?.(env)!; @@ -19,13 +19,23 @@ export async function getCached(config: CloudflareBkndConfig, { env, ctx, ...arg ...makeCfConfig(config, { env, ctx, ...args }), initialConfig, onBuilt: async (app) => { - app.module.server.client.get("/__bknd/cache", async (c) => { + app.module.server.client.get(constants.cache_endpoint, async (c) => { await kv.delete(key); return c.json({ message: "Cache cleared" }); }); await config.onBuilt?.(app); }, beforeBuild: async (app) => { + app.emgr.onEvent( + App.Events.AppBeforeResponse, + async (event) => { + ctx.waitUntil(event.params.app.emgr.executeAsyncs()); + }, + { + mode: "sync", + id: constants.exec_async_event_id, + }, + ); app.emgr.onEvent( App.Events.AppConfigUpdatedEvent, async ({ params: { app } }) => { diff --git a/app/src/adapter/cloudflare/modes/durable.ts b/app/src/adapter/cloudflare/modes/durable.ts index 63fce34..369d451 100644 --- a/app/src/adapter/cloudflare/modes/durable.ts +++ b/app/src/adapter/cloudflare/modes/durable.ts @@ -1,7 +1,7 @@ import { DurableObject } from "cloudflare:workers"; -import type { App, CreateAppConfig } from "bknd"; +import { App, type CreateAppConfig } from "bknd"; import { createRuntimeApp, makeConfig } from "bknd/adapter"; -import type { CloudflareBkndConfig, Context } from "../index"; +import { type CloudflareBkndConfig, type Context, constants } from "../index"; export async function getDurable(config: CloudflareBkndConfig, ctx: Context) { const { dobj } = config.bindings?.(ctx.env)!; @@ -67,7 +67,17 @@ export class DurableBkndApp extends DurableObject { this.app = await createRuntimeApp({ ...config, onBuilt: async (app) => { - app.modules.server.get("/__do", async (c) => { + app.emgr.onEvent( + App.Events.AppBeforeResponse, + async (event) => { + this.ctx.waitUntil(event.params.app.emgr.executeAsyncs()); + }, + { + mode: "sync", + id: constants.exec_async_event_id, + }, + ); + app.modules.server.get(constants.do_endpoint, async (c) => { // @ts-ignore const context: any = c.req.raw.cf ? c.req.raw.cf : c.env.cf; return c.json({ @@ -92,7 +102,6 @@ export class DurableBkndApp extends DurableObject { this.keepAlive(options.keepAliveSeconds); } - console.log("id", this.id); const res = await this.app!.fetch(request); const headers = new Headers(res.headers); headers.set("X-BuildTime", buildtime.toString()); @@ -109,16 +118,13 @@ export class DurableBkndApp extends DurableObject { async beforeBuild(app: App) {} protected keepAlive(seconds: number) { - console.log("keep alive for", seconds); if (this.interval) { - console.log("clearing, there is a new"); clearInterval(this.interval); } let i = 0; this.interval = setInterval(() => { i += 1; - //console.log("keep-alive", i); if (i === seconds) { console.log("cleared"); clearInterval(this.interval); diff --git a/app/src/adapter/cloudflare/modes/fresh.ts b/app/src/adapter/cloudflare/modes/fresh.ts index b13c537..2d34d88 100644 --- a/app/src/adapter/cloudflare/modes/fresh.ts +++ b/app/src/adapter/cloudflare/modes/fresh.ts @@ -1,12 +1,25 @@ -import type { App } from "bknd"; +import { App } from "bknd"; import { createRuntimeApp } from "bknd/adapter"; -import { type CloudflareBkndConfig, type Context, makeCfConfig } from "../index"; +import { type CloudflareBkndConfig, type Context, makeCfConfig, constants } from "../index"; export async function makeApp(config: CloudflareBkndConfig, ctx: Context) { return await createRuntimeApp( { ...makeCfConfig(config, ctx), adminOptions: config.html ? { html: config.html } : undefined, + onBuilt: async (app) => { + app.emgr.onEvent( + App.Events.AppBeforeResponse, + async (event) => { + ctx.ctx.waitUntil(event.params.app.emgr.executeAsyncs()); + }, + { + mode: "sync", + id: constants.exec_async_event_id, + }, + ); + await config.onBuilt?.(app); + }, }, ctx, ); diff --git a/app/src/core/events/EventManager.ts b/app/src/core/events/EventManager.ts index c2de8c9..1c20e58 100644 --- a/app/src/core/events/EventManager.ts +++ b/app/src/core/events/EventManager.ts @@ -22,6 +22,7 @@ export class EventManager< protected events: EventClass[] = []; protected listeners: EventListener[] = []; enabled: boolean = true; + protected asyncs: (() => Promise)[] = []; constructor( events?: RegisteredEvents, @@ -29,7 +30,6 @@ export class EventManager< listeners?: EventListener[]; onError?: (event: Event, e: unknown) => void; onInvalidReturn?: (event: Event, e: InvalidEventReturn) => void; - asyncExecutor?: typeof Promise.all; }, ) { if (events) { @@ -176,9 +176,15 @@ export class EventManager< this.events.forEach((event) => this.onEvent(event, handler, config)); } - protected executeAsyncs(promises: (() => Promise)[]) { - const executor = this.options?.asyncExecutor ?? ((e) => Promise.all(e)); - executor(promises.map((p) => p())).then(() => void 0); + protected collectAsyncs(promises: (() => Promise)[]) { + this.asyncs.push(...promises); + } + + async executeAsyncs(executor: typeof Promise.all = (e) => Promise.all(e)): Promise { + if (this.asyncs.length === 0) return; + const asyncs = [...this.asyncs]; + this.asyncs = []; + await executor(asyncs.map((p) => p())); } async emit>(event: Actual): Promise { @@ -209,8 +215,8 @@ export class EventManager< return !listener.once; }); - // execute asyncs - this.executeAsyncs(asyncs); + // collect asyncs + this.collectAsyncs(asyncs); // execute syncs let _event: Actual = event;