refactored EventManager to run asyncs on call only, app defaults to run before response (#129)

* refactored EventManager to run asyncs on call only, app defaults to run before response

* fix tests
This commit is contained in:
dswbx
2025-04-01 11:19:55 +02:00
committed by GitHub
parent 434d56672c
commit 36e4224b33
11 changed files with 244 additions and 64 deletions

View File

@@ -1,6 +1,6 @@
import { describe, expect, mock, test } from "bun:test";
import type { ModuleBuildContext } from "../../src";
import { type App, createApp } from "../../src/App";
import { App, createApp } from "../../src/App";
import * as proto from "../../src/data/prototype";
describe("App", () => {
@@ -51,4 +51,87 @@ describe("App", () => {
expect(todos[0]?.title).toBe("ctx");
expect(todos[1]?.title).toBe("api");
});
test("lifecycle events are triggered", async () => {
const firstBoot = mock(() => null);
const configUpdate = mock(() => null);
const appBuilt = mock(() => null);
const appRequest = mock(() => null);
const beforeResponse = mock(() => null);
const app = createApp();
app.emgr.onEvent(
App.Events.AppFirstBoot,
(event) => {
expect(event).toBeInstanceOf(App.Events.AppFirstBoot);
expect(event.params.app.version()).toBe(app.version());
firstBoot();
},
"sync",
);
app.emgr.onEvent(
App.Events.AppBuiltEvent,
(event) => {
expect(event).toBeInstanceOf(App.Events.AppBuiltEvent);
expect(event.params.app.version()).toBe(app.version());
appBuilt();
},
"sync",
);
app.emgr.onEvent(
App.Events.AppConfigUpdatedEvent,
() => {
configUpdate();
},
"sync",
);
app.emgr.onEvent(
App.Events.AppRequest,
(event) => {
expect(event).toBeInstanceOf(App.Events.AppRequest);
expect(event.params.app.version()).toBe(app.version());
expect(event.params.request).toBeInstanceOf(Request);
appRequest();
},
"sync",
);
app.emgr.onEvent(
App.Events.AppBeforeResponse,
(event) => {
expect(event).toBeInstanceOf(App.Events.AppBeforeResponse);
expect(event.params.app.version()).toBe(app.version());
expect(event.params.response).toBeInstanceOf(Response);
beforeResponse();
},
"sync",
);
await app.build();
expect(firstBoot).toHaveBeenCalled();
expect(appBuilt).toHaveBeenCalled();
//expect(configUpdate).toHaveBeenCalled();
expect(appRequest).not.toHaveBeenCalled();
expect(beforeResponse).not.toHaveBeenCalled();
});
test("emgr exec modes", async () => {
const called = mock(() => null);
const app = createApp({
options: {
asyncEventsMode: "sync",
},
});
// register async listener
app.emgr.onEvent(App.Events.AppFirstBoot, async () => {
called();
});
await app.build();
await app.server.request(new Request("http://localhost"));
// expect async listeners to be executed sync after request
expect(called).toHaveBeenCalled();
});
});

View File

@@ -70,6 +70,9 @@ describe("EventManager", async () => {
new SpecialEvent({ foo: "bar" });
new InformationalEvent();
// execute asyncs
await emgr.executeAsyncs();
expect(call).toHaveBeenCalledTimes(2);
expect(delayed).toHaveBeenCalled();
});
@@ -80,15 +83,11 @@ describe("EventManager", async () => {
call();
return Promise.all(p);
};
const emgr = new EventManager(
{ InformationalEvent },
{
asyncExecutor,
},
);
const emgr = new EventManager({ InformationalEvent });
emgr.onEvent(InformationalEvent, async () => {});
await emgr.emit(new InformationalEvent());
await emgr.executeAsyncs(asyncExecutor);
expect(call).toHaveBeenCalled();
});
@@ -125,6 +124,9 @@ describe("EventManager", async () => {
const e2 = await emgr.emit(new ReturnEvent({ foo: "bar" }));
expect(e2.returned).toBe(true);
expect(e2.params.foo).toBe("bar-1-0");
await emgr.executeAsyncs();
expect(onInvalidReturn).toHaveBeenCalled();
expect(asyncEventCallback).toHaveBeenCalled();
});

View File

@@ -288,14 +288,17 @@ describe("[data] Mutator (Events)", async () => {
test("events were fired", async () => {
const { data } = await mutator.insertOne({ label: "test" });
await mutator.emgr.executeAsyncs();
expect(events.has(MutatorEvents.MutatorInsertBefore.slug)).toBeTrue();
expect(events.has(MutatorEvents.MutatorInsertAfter.slug)).toBeTrue();
await mutator.updateOne(data.id, { label: "test2" });
await mutator.emgr.executeAsyncs();
expect(events.has(MutatorEvents.MutatorUpdateBefore.slug)).toBeTrue();
expect(events.has(MutatorEvents.MutatorUpdateAfter.slug)).toBeTrue();
await mutator.deleteOne(data.id);
await mutator.emgr.executeAsyncs();
expect(events.has(MutatorEvents.MutatorDeleteBefore.slug)).toBeTrue();
expect(events.has(MutatorEvents.MutatorDeleteAfter.slug)).toBeTrue();
});

View File

@@ -198,22 +198,27 @@ describe("[data] Repository (Events)", async () => {
});
test("events were fired", async () => {
await em.repository(items).findId(1);
const repo = em.repository(items);
await repo.findId(1);
await repo.emgr.executeAsyncs();
expect(events.has(RepositoryEvents.RepositoryFindOneBefore.slug)).toBeTrue();
expect(events.has(RepositoryEvents.RepositoryFindOneAfter.slug)).toBeTrue();
events.clear();
await em.repository(items).findOne({ id: 1 });
await repo.findOne({ id: 1 });
await repo.emgr.executeAsyncs();
expect(events.has(RepositoryEvents.RepositoryFindOneBefore.slug)).toBeTrue();
expect(events.has(RepositoryEvents.RepositoryFindOneAfter.slug)).toBeTrue();
events.clear();
await em.repository(items).findMany({ where: { id: 1 } });
await repo.findMany({ where: { id: 1 } });
await repo.emgr.executeAsyncs();
expect(events.has(RepositoryEvents.RepositoryFindManyBefore.slug)).toBeTrue();
expect(events.has(RepositoryEvents.RepositoryFindManyAfter.slug)).toBeTrue();
events.clear();
await em.repository(items).findManyByReference(1, "categories");
await repo.findManyByReference(1, "categories");
await repo.emgr.executeAsyncs();
expect(events.has(RepositoryEvents.RepositoryFindManyBefore.slug)).toBeTrue();
expect(events.has(RepositoryEvents.RepositoryFindManyAfter.slug)).toBeTrue();
events.clear();

View File

@@ -1,8 +1,9 @@
import { describe, expect, test } from "bun:test";
import { type FileBody, Storage, type StorageAdapter } from "../../src/media/storage/Storage";
import { type FileBody, Storage } from "../../src/media/storage/Storage";
import * as StorageEvents from "../../src/media/storage/events";
import { StorageAdapter } from "media";
class TestAdapter implements StorageAdapter {
class TestAdapter extends StorageAdapter {
files: Record<string, FileBody> = {};
getName() {
@@ -61,7 +62,7 @@ describe("Storage", async () => {
test("uploads a file", async () => {
const {
meta: { type, size },
} = await storage.uploadFile("hello", "world.txt");
} = await storage.uploadFile("hello" as any, "world.txt");
expect({ type, size }).toEqual({ type: "text/plain", size: 0 });
});
@@ -71,6 +72,7 @@ describe("Storage", async () => {
});
test("events were fired", async () => {
await storage.emgr.executeAsyncs();
expect(events.has(StorageEvents.FileUploadedEvent.slug)).toBeTrue();
expect(events.has(StorageEvents.FileDeletedEvent.slug)).toBeTrue();
// @todo: file access must be tested in controllers

View File

@@ -4,9 +4,10 @@ import { Event } from "core/events";
import { Connection, type LibSqlCredentials, LibsqlConnection } from "data";
import type { Hono } from "hono";
import {
ModuleManager,
type InitialModuleConfigs,
type ModuleBuildContext,
ModuleManager,
type ModuleConfigs,
type ModuleManagerOptions,
type Modules,
} from "modules/ModuleManager";
@@ -16,6 +17,7 @@ import { SystemController } from "modules/server/SystemController";
// biome-ignore format: must be there
import { Api, type ApiOptions } from "Api";
import type { ServerEnv } from "modules/Controller";
export type AppPlugin = (app: App) => Promise<void> | void;
@@ -29,12 +31,25 @@ export class AppBuiltEvent extends AppEvent {
export class AppFirstBoot extends AppEvent {
static override slug = "app-first-boot";
}
export const AppEvents = { AppConfigUpdatedEvent, AppBuiltEvent, AppFirstBoot } as const;
export class AppRequest extends AppEvent<{ request: Request }> {
static override slug = "app-request";
}
export class AppBeforeResponse extends AppEvent<{ request: Request; response: Response }> {
static override slug = "app-before-response";
}
export const AppEvents = {
AppConfigUpdatedEvent,
AppBuiltEvent,
AppFirstBoot,
AppRequest,
AppBeforeResponse,
} as const;
export type AppOptions = {
plugins?: AppPlugin[];
seed?: (ctx: ModuleBuildContext & { app: App }) => Promise<void>;
manager?: Omit<ModuleManagerOptions, "initial" | "onUpdated" | "seed">;
asyncEventsMode?: "sync" | "async" | "none";
};
export type CreateAppConfig = {
connection?:
@@ -70,35 +85,9 @@ export class App {
this.modules = new ModuleManager(connection, {
...(options?.manager ?? {}),
initial: _initialConfig,
onUpdated: async (key, config) => {
// if the EventManager was disabled, we assume we shouldn't
// respond to events, such as "onUpdated".
// this is important if multiple changes are done, and then build() is called manually
if (!this.emgr.enabled) {
$console.warn("App config updated, but event manager is disabled, skip.");
return;
}
$console.log("App config updated", key);
// @todo: potentially double syncing
await this.build({ sync: true });
await this.emgr.emit(new AppConfigUpdatedEvent({ app: this }));
},
onFirstBoot: async () => {
$console.log("App first boot");
this.trigger_first_boot = true;
},
onServerInit: async (server) => {
server.use(async (c, next) => {
c.set("app", this);
await next();
try {
// gracefully add the app id
c.res.headers.set("X-bknd-id", this._id);
} catch (e) {}
});
},
onUpdated: this.onUpdated.bind(this),
onFirstBoot: this.onFirstBoot.bind(this),
onServerInit: this.onServerInit.bind(this),
});
this.modules.ctx().emgr.registerEvents(AppEvents);
}
@@ -213,6 +202,53 @@ export class App {
return new Api({ host: "http://localhost", ...(options ?? {}), fetcher });
}
async onUpdated<Module extends keyof Modules>(module: Module, config: ModuleConfigs[Module]) {
// if the EventManager was disabled, we assume we shouldn't
// respond to events, such as "onUpdated".
// this is important if multiple changes are done, and then build() is called manually
if (!this.emgr.enabled) {
$console.warn("App config updated, but event manager is disabled, skip.");
return;
}
$console.log("App config updated", module);
// @todo: potentially double syncing
await this.build({ sync: true });
await this.emgr.emit(new AppConfigUpdatedEvent({ app: this }));
}
async onFirstBoot() {
$console.log("App first boot");
this.trigger_first_boot = true;
}
async onServerInit(server: Hono<ServerEnv>) {
server.use(async (c, next) => {
c.set("app", this);
await this.emgr.emit(new AppRequest({ app: this, request: c.req.raw }));
await next();
try {
// gracefully add the app id
c.res.headers.set("X-bknd-id", this._id);
} catch (e) {}
await this.emgr.emit(
new AppBeforeResponse({ app: this, request: c.req.raw, response: c.res }),
);
// execute collected async events (async by default)
switch (this.options?.asyncEventsMode ?? "async") {
case "sync":
await this.emgr.executeAsyncs();
break;
case "async":
this.emgr.executeAsyncs();
break;
}
});
}
}
export function createApp(config: CreateAppConfig = {}) {

View File

@@ -9,6 +9,7 @@ import { getBinding } from "./bindings";
import { getCached } from "./modes/cached";
import { getDurable } from "./modes/durable";
import { getFresh, getWarm } from "./modes/fresh";
import type { CreateAppConfig } from "App";
export type CloudflareBkndConfig<Env = any> = FrameworkBkndConfig<Context<Env>> & {
mode?: "warm" | "fresh" | "cache" | "durable";
@@ -32,8 +33,14 @@ export type Context<Env = any> = {
ctx: ExecutionContext;
};
export const constants = {
exec_async_event_id: "cf_register_waituntil",
cache_endpoint: "/__bknd/cache",
do_endpoint: "/__bknd/do",
};
let media_registered: boolean = false;
export function makeCfConfig(config: CloudflareBkndConfig, context: Context) {
export function makeCfConfig(config: CloudflareBkndConfig, context: Context): CreateAppConfig {
if (!media_registered) {
registerMedia(context.env as any);
media_registered = true;
@@ -61,7 +68,14 @@ export function makeCfConfig(config: CloudflareBkndConfig, context: Context) {
}
}
return appConfig;
return {
...appConfig,
options: {
...appConfig.options,
// if not specified explicitly, disable it to use ExecutionContext's waitUntil
asyncEventsMode: config.options?.asyncEventsMode ?? "none",
},
};
}
export function serve<Env = any>(config: CloudflareBkndConfig<Env> = {}) {

View File

@@ -1,6 +1,6 @@
import { App } from "bknd";
import { createRuntimeApp } from "bknd/adapter";
import { type CloudflareBkndConfig, type Context, makeCfConfig } from "../index";
import { type CloudflareBkndConfig, constants, type Context, makeCfConfig } from "../index";
export async function getCached(config: CloudflareBkndConfig, { env, ctx, ...args }: Context) {
const { kv } = config.bindings?.(env)!;
@@ -19,13 +19,23 @@ export async function getCached(config: CloudflareBkndConfig, { env, ctx, ...arg
...makeCfConfig(config, { env, ctx, ...args }),
initialConfig,
onBuilt: async (app) => {
app.module.server.client.get("/__bknd/cache", async (c) => {
app.module.server.client.get(constants.cache_endpoint, async (c) => {
await kv.delete(key);
return c.json({ message: "Cache cleared" });
});
await config.onBuilt?.(app);
},
beforeBuild: async (app) => {
app.emgr.onEvent(
App.Events.AppBeforeResponse,
async (event) => {
ctx.waitUntil(event.params.app.emgr.executeAsyncs());
},
{
mode: "sync",
id: constants.exec_async_event_id,
},
);
app.emgr.onEvent(
App.Events.AppConfigUpdatedEvent,
async ({ params: { app } }) => {

View File

@@ -1,7 +1,7 @@
import { DurableObject } from "cloudflare:workers";
import type { App, CreateAppConfig } from "bknd";
import { App, type CreateAppConfig } from "bknd";
import { createRuntimeApp, makeConfig } from "bknd/adapter";
import type { CloudflareBkndConfig, Context } from "../index";
import { type CloudflareBkndConfig, type Context, constants } from "../index";
export async function getDurable(config: CloudflareBkndConfig, ctx: Context) {
const { dobj } = config.bindings?.(ctx.env)!;
@@ -67,7 +67,17 @@ export class DurableBkndApp extends DurableObject {
this.app = await createRuntimeApp({
...config,
onBuilt: async (app) => {
app.modules.server.get("/__do", async (c) => {
app.emgr.onEvent(
App.Events.AppBeforeResponse,
async (event) => {
this.ctx.waitUntil(event.params.app.emgr.executeAsyncs());
},
{
mode: "sync",
id: constants.exec_async_event_id,
},
);
app.modules.server.get(constants.do_endpoint, async (c) => {
// @ts-ignore
const context: any = c.req.raw.cf ? c.req.raw.cf : c.env.cf;
return c.json({
@@ -92,7 +102,6 @@ export class DurableBkndApp extends DurableObject {
this.keepAlive(options.keepAliveSeconds);
}
console.log("id", this.id);
const res = await this.app!.fetch(request);
const headers = new Headers(res.headers);
headers.set("X-BuildTime", buildtime.toString());
@@ -109,16 +118,13 @@ export class DurableBkndApp extends DurableObject {
async beforeBuild(app: App) {}
protected keepAlive(seconds: number) {
console.log("keep alive for", seconds);
if (this.interval) {
console.log("clearing, there is a new");
clearInterval(this.interval);
}
let i = 0;
this.interval = setInterval(() => {
i += 1;
//console.log("keep-alive", i);
if (i === seconds) {
console.log("cleared");
clearInterval(this.interval);

View File

@@ -1,12 +1,25 @@
import type { App } from "bknd";
import { App } from "bknd";
import { createRuntimeApp } from "bknd/adapter";
import { type CloudflareBkndConfig, type Context, makeCfConfig } from "../index";
import { type CloudflareBkndConfig, type Context, makeCfConfig, constants } from "../index";
export async function makeApp(config: CloudflareBkndConfig, ctx: Context) {
return await createRuntimeApp(
{
...makeCfConfig(config, ctx),
adminOptions: config.html ? { html: config.html } : undefined,
onBuilt: async (app) => {
app.emgr.onEvent(
App.Events.AppBeforeResponse,
async (event) => {
ctx.ctx.waitUntil(event.params.app.emgr.executeAsyncs());
},
{
mode: "sync",
id: constants.exec_async_event_id,
},
);
await config.onBuilt?.(app);
},
},
ctx,
);

View File

@@ -22,6 +22,7 @@ export class EventManager<
protected events: EventClass[] = [];
protected listeners: EventListener[] = [];
enabled: boolean = true;
protected asyncs: (() => Promise<void>)[] = [];
constructor(
events?: RegisteredEvents,
@@ -29,7 +30,6 @@ export class EventManager<
listeners?: EventListener[];
onError?: (event: Event, e: unknown) => void;
onInvalidReturn?: (event: Event, e: InvalidEventReturn) => void;
asyncExecutor?: typeof Promise.all;
},
) {
if (events) {
@@ -176,9 +176,15 @@ export class EventManager<
this.events.forEach((event) => this.onEvent(event, handler, config));
}
protected executeAsyncs(promises: (() => Promise<void>)[]) {
const executor = this.options?.asyncExecutor ?? ((e) => Promise.all(e));
executor(promises.map((p) => p())).then(() => void 0);
protected collectAsyncs(promises: (() => Promise<void>)[]) {
this.asyncs.push(...promises);
}
async executeAsyncs(executor: typeof Promise.all = (e) => Promise.all(e)): Promise<void> {
if (this.asyncs.length === 0) return;
const asyncs = [...this.asyncs];
this.asyncs = [];
await executor(asyncs.map((p) => p()));
}
async emit<Actual extends Event<any, any>>(event: Actual): Promise<Actual> {
@@ -209,8 +215,8 @@ export class EventManager<
return !listener.once;
});
// execute asyncs
this.executeAsyncs(asyncs);
// collect asyncs
this.collectAsyncs(asyncs);
// execute syncs
let _event: Actual = event;