feat: batch schema manager statements

run all schema modification queries in a single batch/transaction, to enable automatic rollbacks, and to stay within cloudflare's subrequest limits in free plan.
This commit is contained in:
dswbx
2025-09-24 14:48:45 +02:00
parent 832eb6ac31
commit 06d7558c3c
4 changed files with 56 additions and 38 deletions

View File

@@ -1,5 +1,5 @@
// eslint-disable-next-line import/no-unresolved // eslint-disable-next-line import/no-unresolved
import { afterAll, describe, expect, test } from "bun:test"; import { afterAll, describe, expect, spyOn, test } from "bun:test";
import { randomString } from "core/utils"; import { randomString } from "core/utils";
import { Entity, EntityManager } from "data/entities"; import { Entity, EntityManager } from "data/entities";
import { TextField, EntityIndex } from "data/fields"; import { TextField, EntityIndex } from "data/fields";
@@ -268,4 +268,39 @@ describe("SchemaManager tests", async () => {
const diffAfter = await em.schema().getDiff(); const diffAfter = await em.schema().getDiff();
expect(diffAfter.length).toBe(0); expect(diffAfter.length).toBe(0);
}); });
test("returns statements", async () => {
const amount = 5;
const entities = new Array(amount)
.fill(0)
.map(() => new Entity(randomString(16), [new TextField("text")]));
const em = new EntityManager(entities, dummyConnection);
const statements = await em.schema().sync({ force: true });
expect(statements.length).toBe(amount);
expect(statements.every((stmt) => Object.keys(stmt).join(",") === "sql,parameters")).toBe(
true,
);
});
test("batches statements", async () => {
const { dummyConnection } = getDummyConnection();
const entities = new Array(20)
.fill(0)
.map(() => new Entity(randomString(16), [new TextField("text")]));
const em = new EntityManager(entities, dummyConnection);
const spy = spyOn(em.connection, "executeQueries");
const statements = await em.schema().sync();
expect(statements.length).toBe(entities.length);
expect(statements.every((stmt) => Object.keys(stmt).join(",") === "sql,parameters")).toBe(
true,
);
await em.schema().sync({ force: true });
expect(spy).toHaveBeenCalledTimes(1);
const tables = await em.connection.kysely
.selectFrom("sqlite_master")
.where("type", "=", "table")
.selectAll()
.execute();
expect(tables.length).toBe(entities.length + 1); /* 1+ for sqlite_sequence */
});
}); });

View File

@@ -5,8 +5,8 @@ import { adapterTestSuite } from "adapter/adapter-test-suite";
import { bunTestRunner } from "adapter/bun/test"; import { bunTestRunner } from "adapter/bun/test";
import { type CloudflareBkndConfig, createApp } from "./cloudflare-workers.adapter"; import { type CloudflareBkndConfig, createApp } from "./cloudflare-workers.adapter";
/* beforeAll(disableConsoleLog); beforeAll(disableConsoleLog);
afterAll(enableConsoleLog); */ afterAll(enableConsoleLog);
describe("cf adapter", () => { describe("cf adapter", () => {
const DB_URL = ":memory:"; const DB_URL = ":memory:";

View File

@@ -34,7 +34,6 @@ export class EntityManager<TBD extends object = DefaultDB> {
private _entities: Entity[] = []; private _entities: Entity[] = [];
private _relations: EntityRelation[] = []; private _relations: EntityRelation[] = [];
private _indices: EntityIndex[] = []; private _indices: EntityIndex[] = [];
private _schema?: SchemaManager;
readonly emgr: EventManager<typeof EntityManager.Events>; readonly emgr: EventManager<typeof EntityManager.Events>;
static readonly Events = { ...MutatorEvents, ...RepositoryEvents }; static readonly Events = { ...MutatorEvents, ...RepositoryEvents };
@@ -249,11 +248,7 @@ export class EntityManager<TBD extends object = DefaultDB> {
} }
schema() { schema() {
if (!this._schema) { return new SchemaManager(this);
this._schema = new SchemaManager(this);
}
return this._schema;
} }
// @todo: centralize and add tests // @todo: centralize and add tests

View File

@@ -247,20 +247,16 @@ export class SchemaManager {
async sync(config: { force?: boolean; drop?: boolean } = { force: false, drop: false }) { async sync(config: { force?: boolean; drop?: boolean } = { force: false, drop: false }) {
const diff = await this.getDiff(); const diff = await this.getDiff();
let updates: number = 0;
const statements: { sql: string; parameters: readonly unknown[] }[] = []; const statements: { sql: string; parameters: readonly unknown[] }[] = [];
const schema = this.em.connection.kysely.schema; const schema = this.em.connection.kysely.schema;
const qbs: { compile(): CompiledQuery; execute(): Promise<void> }[] = [];
for (const table of diff) { for (const table of diff) {
const qbs: { compile(): CompiledQuery; execute(): Promise<void> }[] = [];
let local_updates: number = 0;
const addFieldSchemas = this.collectFieldSchemas(table.name, table.columns.add); const addFieldSchemas = this.collectFieldSchemas(table.name, table.columns.add);
const dropFields = table.columns.drop; const dropFields = table.columns.drop;
const dropIndices = table.indices.drop; const dropIndices = table.indices.drop;
if (table.isDrop) { if (table.isDrop) {
updates++;
local_updates++;
if (config.drop) { if (config.drop) {
qbs.push(schema.dropTable(table.name)); qbs.push(schema.dropTable(table.name));
} }
@@ -268,8 +264,6 @@ export class SchemaManager {
let createQb = schema.createTable(table.name); let createQb = schema.createTable(table.name);
// add fields // add fields
for (const fieldSchema of addFieldSchemas) { for (const fieldSchema of addFieldSchemas) {
updates++;
local_updates++;
// @ts-ignore // @ts-ignore
createQb = createQb.addColumn(...fieldSchema); createQb = createQb.addColumn(...fieldSchema);
} }
@@ -280,8 +274,6 @@ export class SchemaManager {
if (addFieldSchemas.length > 0) { if (addFieldSchemas.length > 0) {
// add fields // add fields
for (const fieldSchema of addFieldSchemas) { for (const fieldSchema of addFieldSchemas) {
updates++;
local_updates++;
// @ts-ignore // @ts-ignore
qbs.push(schema.alterTable(table.name).addColumn(...fieldSchema)); qbs.push(schema.alterTable(table.name).addColumn(...fieldSchema));
} }
@@ -291,8 +283,6 @@ export class SchemaManager {
if (config.drop && dropFields.length > 0) { if (config.drop && dropFields.length > 0) {
// drop fields // drop fields
for (const column of dropFields) { for (const column of dropFields) {
updates++;
local_updates++;
qbs.push(schema.alterTable(table.name).dropColumn(column)); qbs.push(schema.alterTable(table.name).dropColumn(column));
} }
} }
@@ -310,35 +300,33 @@ export class SchemaManager {
qb = qb.unique(); qb = qb.unique();
} }
qbs.push(qb); qbs.push(qb);
local_updates++;
updates++;
} }
// drop indices // drop indices
if (config.drop) { if (config.drop) {
for (const index of dropIndices) { for (const index of dropIndices) {
qbs.push(schema.dropIndex(index)); qbs.push(schema.dropIndex(index));
local_updates++;
updates++;
} }
} }
}
if (local_updates === 0) continue; if (qbs.length > 0) {
statements.push(
...qbs.map((qb) => {
const { sql, parameters } = qb.compile();
return { sql, parameters };
}),
);
// iterate through built qbs $console.debug(
// @todo: run in batches "[SchemaManager]",
for (const qb of qbs) { `${qbs.length} statements\n${statements.map((stmt) => stmt.sql).join(";\n")}`,
const { sql, parameters } = qb.compile(); );
statements.push({ sql, parameters });
if (config.force) { try {
try { await this.em.connection.executeQueries(...qbs);
$console.debug("[SchemaManager]", sql); } catch (e) {
await qb.execute(); throw new Error(`Failed to execute batch: ${String(e)}`);
} catch (e) {
throw new Error(`Failed to execute query: ${sql}: ${(e as any).message}`);
}
}
} }
} }