From 06d7558c3c451a99c6166dc0d07e67bc7c761207 Mon Sep 17 00:00:00 2001 From: dswbx Date: Wed, 24 Sep 2025 14:48:45 +0200 Subject: [PATCH] feat: batch schema manager statements run all schema modification queries in a single batch/transaction, to enable automatic rollbacks, and to stay within cloudflare's subrequest limits in free plan. --- app/__test__/data/specs/SchemaManager.spec.ts | 37 ++++++++++++++- .../cloudflare-workers.adapter.spec.ts | 4 +- app/src/data/entities/EntityManager.ts | 7 +-- app/src/data/schema/SchemaManager.ts | 46 +++++++------------ 4 files changed, 56 insertions(+), 38 deletions(-) diff --git a/app/__test__/data/specs/SchemaManager.spec.ts b/app/__test__/data/specs/SchemaManager.spec.ts index 679010b..e35df15 100644 --- a/app/__test__/data/specs/SchemaManager.spec.ts +++ b/app/__test__/data/specs/SchemaManager.spec.ts @@ -1,5 +1,5 @@ // eslint-disable-next-line import/no-unresolved -import { afterAll, describe, expect, test } from "bun:test"; +import { afterAll, describe, expect, spyOn, test } from "bun:test"; import { randomString } from "core/utils"; import { Entity, EntityManager } from "data/entities"; import { TextField, EntityIndex } from "data/fields"; @@ -268,4 +268,39 @@ describe("SchemaManager tests", async () => { const diffAfter = await em.schema().getDiff(); expect(diffAfter.length).toBe(0); }); + + test("returns statements", async () => { + const amount = 5; + const entities = new Array(amount) + .fill(0) + .map(() => new Entity(randomString(16), [new TextField("text")])); + const em = new EntityManager(entities, dummyConnection); + const statements = await em.schema().sync({ force: true }); + expect(statements.length).toBe(amount); + expect(statements.every((stmt) => Object.keys(stmt).join(",") === "sql,parameters")).toBe( + true, + ); + }); + + test("batches statements", async () => { + const { dummyConnection } = getDummyConnection(); + const entities = new Array(20) + .fill(0) + .map(() => new Entity(randomString(16), [new TextField("text")])); + const em = new EntityManager(entities, dummyConnection); + const spy = spyOn(em.connection, "executeQueries"); + const statements = await em.schema().sync(); + expect(statements.length).toBe(entities.length); + expect(statements.every((stmt) => Object.keys(stmt).join(",") === "sql,parameters")).toBe( + true, + ); + await em.schema().sync({ force: true }); + expect(spy).toHaveBeenCalledTimes(1); + const tables = await em.connection.kysely + .selectFrom("sqlite_master") + .where("type", "=", "table") + .selectAll() + .execute(); + expect(tables.length).toBe(entities.length + 1); /* 1+ for sqlite_sequence */ + }); }); diff --git a/app/src/adapter/cloudflare/cloudflare-workers.adapter.spec.ts b/app/src/adapter/cloudflare/cloudflare-workers.adapter.spec.ts index 65477b6..6cb0f90 100644 --- a/app/src/adapter/cloudflare/cloudflare-workers.adapter.spec.ts +++ b/app/src/adapter/cloudflare/cloudflare-workers.adapter.spec.ts @@ -5,8 +5,8 @@ import { adapterTestSuite } from "adapter/adapter-test-suite"; import { bunTestRunner } from "adapter/bun/test"; import { type CloudflareBkndConfig, createApp } from "./cloudflare-workers.adapter"; -/* beforeAll(disableConsoleLog); -afterAll(enableConsoleLog); */ +beforeAll(disableConsoleLog); +afterAll(enableConsoleLog); describe("cf adapter", () => { const DB_URL = ":memory:"; diff --git a/app/src/data/entities/EntityManager.ts b/app/src/data/entities/EntityManager.ts index 36168f8..033d51a 100644 --- a/app/src/data/entities/EntityManager.ts +++ b/app/src/data/entities/EntityManager.ts @@ -34,7 +34,6 @@ export class EntityManager { private _entities: Entity[] = []; private _relations: EntityRelation[] = []; private _indices: EntityIndex[] = []; - private _schema?: SchemaManager; readonly emgr: EventManager; static readonly Events = { ...MutatorEvents, ...RepositoryEvents }; @@ -249,11 +248,7 @@ export class EntityManager { } schema() { - if (!this._schema) { - this._schema = new SchemaManager(this); - } - - return this._schema; + return new SchemaManager(this); } // @todo: centralize and add tests diff --git a/app/src/data/schema/SchemaManager.ts b/app/src/data/schema/SchemaManager.ts index ab2d15f..a0618f4 100644 --- a/app/src/data/schema/SchemaManager.ts +++ b/app/src/data/schema/SchemaManager.ts @@ -247,20 +247,16 @@ export class SchemaManager { async sync(config: { force?: boolean; drop?: boolean } = { force: false, drop: false }) { const diff = await this.getDiff(); - let updates: number = 0; const statements: { sql: string; parameters: readonly unknown[] }[] = []; const schema = this.em.connection.kysely.schema; + const qbs: { compile(): CompiledQuery; execute(): Promise }[] = []; for (const table of diff) { - const qbs: { compile(): CompiledQuery; execute(): Promise }[] = []; - let local_updates: number = 0; const addFieldSchemas = this.collectFieldSchemas(table.name, table.columns.add); const dropFields = table.columns.drop; const dropIndices = table.indices.drop; if (table.isDrop) { - updates++; - local_updates++; if (config.drop) { qbs.push(schema.dropTable(table.name)); } @@ -268,8 +264,6 @@ export class SchemaManager { let createQb = schema.createTable(table.name); // add fields for (const fieldSchema of addFieldSchemas) { - updates++; - local_updates++; // @ts-ignore createQb = createQb.addColumn(...fieldSchema); } @@ -280,8 +274,6 @@ export class SchemaManager { if (addFieldSchemas.length > 0) { // add fields for (const fieldSchema of addFieldSchemas) { - updates++; - local_updates++; // @ts-ignore qbs.push(schema.alterTable(table.name).addColumn(...fieldSchema)); } @@ -291,8 +283,6 @@ export class SchemaManager { if (config.drop && dropFields.length > 0) { // drop fields for (const column of dropFields) { - updates++; - local_updates++; qbs.push(schema.alterTable(table.name).dropColumn(column)); } } @@ -310,35 +300,33 @@ export class SchemaManager { qb = qb.unique(); } qbs.push(qb); - local_updates++; - updates++; } // drop indices if (config.drop) { for (const index of dropIndices) { qbs.push(schema.dropIndex(index)); - local_updates++; - updates++; } } + } - if (local_updates === 0) continue; + if (qbs.length > 0) { + statements.push( + ...qbs.map((qb) => { + const { sql, parameters } = qb.compile(); + return { sql, parameters }; + }), + ); - // iterate through built qbs - // @todo: run in batches - for (const qb of qbs) { - const { sql, parameters } = qb.compile(); - statements.push({ sql, parameters }); + $console.debug( + "[SchemaManager]", + `${qbs.length} statements\n${statements.map((stmt) => stmt.sql).join(";\n")}`, + ); - if (config.force) { - try { - $console.debug("[SchemaManager]", sql); - await qb.execute(); - } catch (e) { - throw new Error(`Failed to execute query: ${sql}: ${(e as any).message}`); - } - } + try { + await this.em.connection.executeQueries(...qbs); + } catch (e) { + throw new Error(`Failed to execute batch: ${String(e)}`); } }