connection: rewrote query execution, batching, added generic sqlite, added node/bun sqlite, aligned repo/mutator results

This commit is contained in:
dswbx
2025-06-12 09:02:18 +02:00
parent 88419548c7
commit 6c2e579596
40 changed files with 990 additions and 649 deletions

View File

@@ -2,12 +2,15 @@ import {
type AliasableExpression,
type ColumnBuilderCallback,
type ColumnDataType,
type Compilable,
type CompiledQuery,
type DatabaseIntrospector,
type Dialect,
type Expression,
type Kysely,
type KyselyPlugin,
type OnModifyForeignAction,
type QueryResult,
type RawBuilder,
type SelectQueryBuilder,
type SelectQueryNode,
@@ -15,7 +18,8 @@ import {
sql,
} from "kysely";
import type { BaseIntrospector, BaseIntrospectorConfig } from "./BaseIntrospector";
import type { Constructor } from "core";
import type { Constructor, DB } from "core";
import { KyselyPluginRunner } from "data/plugins/KyselyPluginRunner";
export type QB = SelectQueryBuilder<any, any, any>;
@@ -75,22 +79,44 @@ export type DbFunctions = {
>;
};
export type ConnQuery = CompiledQuery | Compilable;
export type ConnQueryResult<T extends ConnQuery> = T extends CompiledQuery<infer R>
? QueryResult<R>
: T extends Compilable<infer R>
? QueryResult<R>
: never;
export type ConnQueryResults<T extends ConnQuery[]> = {
[K in keyof T]: ConnQueryResult<T[K]>;
};
const CONN_SYMBOL = Symbol.for("bknd:connection");
export abstract class Connection<DB = any> {
export type Features = {
batching: boolean;
softscans: boolean;
};
export abstract class Connection<Client = unknown> {
abstract name: string;
protected initialized = false;
kysely: Kysely<DB>;
protected readonly supported = {
protected pluginRunner: KyselyPluginRunner;
protected readonly supported: Partial<Features> = {
batching: false,
softscans: true,
};
kysely: Kysely<DB>;
client!: Client;
constructor(
kysely: Kysely<DB>,
kysely: Kysely<any>,
public fn: Partial<DbFunctions> = {},
protected plugins: KyselyPlugin[] = [],
) {
this.kysely = kysely;
this[CONN_SYMBOL] = true;
this.pluginRunner = new KyselyPluginRunner(plugins);
}
// @todo: consider moving constructor logic here, required by sqlocal
@@ -121,30 +147,46 @@ export abstract class Connection<DB = any> {
return res.rows.length > 0;
}
protected async batch<Queries extends QB[]>(
queries: [...Queries],
): Promise<{
[K in keyof Queries]: Awaited<ReturnType<Queries[K]["execute"]>>;
}> {
throw new Error("Batching not supported");
protected async transformResultRows(result: any[]): Promise<any[]> {
return await this.pluginRunner.transformResultRows(result);
}
async batchQuery<Queries extends QB[]>(
queries: [...Queries],
): Promise<{
[K in keyof Queries]: Awaited<ReturnType<Queries[K]["execute"]>>;
}> {
// bypass if no client support
if (!this.supports("batching")) {
const data: any = [];
for (const q of queries) {
const result = await q.execute();
data.push(result);
}
return data;
}
/**
* Execute a query and return the result including all metadata
* returned from the dialect.
*/
async executeQueries<O extends ConnQuery[]>(...qbs: O): Promise<ConnQueryResults<O>> {
return Promise.all(qbs.map(async (qb) => await this.kysely.executeQuery(qb))) as any;
}
return await this.batch(queries);
async executeQuery<O extends ConnQuery>(qb: O): Promise<ConnQueryResult<O>> {
const res = await this.executeQueries(qb);
return res[0] as any;
}
protected getCompiled(...qbs: ConnQuery[]): CompiledQuery[] {
return qbs.map((qb) => {
if ("compile" in qb) {
return qb.compile();
}
return qb;
});
}
protected async withTransformedRows<
Key extends string = "rows",
O extends { [K in Key]: any[] }[] = [],
>(result: O, _key?: Key): Promise<O> {
return (await Promise.all(
result.map(async (row) => {
const key = _key ?? "rows";
const { [key]: rows, ...r } = row;
return {
...r,
rows: await this.transformResultRows(rows),
};
}),
)) as any;
}
protected validateFieldSpecType(type: string): type is FieldSpec["type"] {