added batching for postgres

This commit is contained in:
dswbx
2025-03-07 19:40:58 +01:00
parent 3704cad53a
commit a4cb012ce8
7 changed files with 53 additions and 5 deletions

View File

@@ -151,4 +151,5 @@ export abstract class Connection<DB = any> {
}
abstract getFieldSchema(spec: FieldSpec, strict?: boolean): SchemaResponse;
abstract close(): Promise<void>;
}

View File

@@ -8,7 +8,12 @@ import {
} from "kysely";
import pg from "pg";
import { PostgresIntrospector } from "./PostgresIntrospector";
import { type FieldSpec, type SchemaResponse, Connection } from "data/connection/Connection";
import {
type FieldSpec,
type SchemaResponse,
Connection,
type QB,
} from "data/connection/Connection";
export type PostgresConnectionConfig = pg.PoolConfig;
@@ -21,16 +26,20 @@ class CustomPostgresDialect extends PostgresDialect {
}
export class PostgresConnection extends Connection {
private pool: pg.Pool;
constructor(config: PostgresConnectionConfig) {
const pool = new pg.Pool(config);
const kysely = new Kysely({
dialect: new CustomPostgresDialect({
pool: new pg.Pool(config),
pool,
}),
plugins,
//log: ["query", "error"],
});
super(kysely, {}, plugins);
this.pool = pool;
}
override supportsIndices(): boolean {
@@ -73,4 +82,22 @@ export class PostgresConnection extends Connection {
},
];
}
override supportsBatching(): boolean {
return true;
}
override async close(): Promise<void> {
await this.pool.end();
}
protected override async batch<Queries extends QB[]>(
queries: [...Queries],
): Promise<{
[K in keyof Queries]: Awaited<ReturnType<Queries[K]["execute"]>>;
}> {
return this.kysely.transaction().execute(async (trx) => {
return Promise.all(queries.map((q) => trx.executeQuery(q).then((r) => r.rows)));
}) as any;
}
}

View File

@@ -47,4 +47,8 @@ export class SqliteConnection extends Connection {
},
] as const;
}
override async close(): Promise<void> {
// no-op
}
}

View File

@@ -14,6 +14,7 @@ import {
WithBuilder,
} from "../index";
import { JoinBuilder } from "./JoinBuilder";
import { ensureInt } from "core/utils";
export type RepositoryQB = SelectQueryBuilder<any, any, any>;
@@ -225,8 +226,9 @@ export class Repository<TBD extends object = DefaultDB, TB extends keyof TBD = a
data,
meta: {
...payload.meta,
total: _total[0]?.total ?? 0,
count: _count[0]?.count ?? 0, // @todo: better graceful method
// parsing is important since pg returns string
total: ensureInt(_total[0]?.total),
count: ensureInt(_count[0]?.count),
items: result.length,
time,
},

View File

@@ -329,6 +329,7 @@ export class SchemaManager {
if (local_updates === 0) continue;
// iterate through built qbs
// @todo: run in batches
for (const qb of qbs) {
const { sql, parameters } = qb.compile();
statements.push({ sql, parameters });