move postgres to a separate package

This commit is contained in:
dswbx
2025-03-14 09:42:51 +01:00
parent b1b2e97a5a
commit c4aaa5a90b
20 changed files with 381 additions and 80 deletions

View File

@@ -2,7 +2,6 @@ import {
type AliasableExpression,
type ColumnBuilderCallback,
type ColumnDataType,
type DatabaseIntrospector,
type Expression,
type Kysely,
type KyselyPlugin,
@@ -77,6 +76,9 @@ const CONN_SYMBOL = Symbol.for("bknd:connection");
export abstract class Connection<DB = any> {
kysely: Kysely<DB>;
protected readonly supported = {
batching: false,
};
constructor(
kysely: Kysely<DB>,
@@ -101,13 +103,8 @@ export abstract class Connection<DB = any> {
return this.kysely.introspection as any;
}
supportsBatching(): boolean {
return false;
}
// @todo: add if only first field is used in index
supportsIndices(): boolean {
return false;
supports(feature: keyof typeof this.supported): boolean {
return this.supported[feature] ?? false;
}
async ping(): Promise<boolean> {
@@ -129,7 +126,7 @@ export abstract class Connection<DB = any> {
[K in keyof Queries]: Awaited<ReturnType<Queries[K]["execute"]>>;
}> {
// bypass if no client support
if (!this.supportsBatching()) {
if (!this.supports("batching")) {
const data: any = [];
for (const q of queries) {
const result = await q.execute();
@@ -151,5 +148,8 @@ export abstract class Connection<DB = any> {
}
abstract getFieldSchema(spec: FieldSpec, strict?: boolean): SchemaResponse;
abstract close(): Promise<void>;
async close(): Promise<void> {
// no-op by default
}
}

View File

@@ -1,6 +1,10 @@
import { Connection, type FieldSpec, type SchemaResponse } from "./Connection";
export class DummyConnection extends Connection {
protected override readonly supported = {
batching: true,
};
constructor() {
super(undefined as any);
}

View File

@@ -1,12 +1,14 @@
export { Connection } from "./Connection";
export { BaseIntrospector } from "./BaseIntrospector";
export {
Connection,
type FieldSpec,
type IndexSpec,
type DbFunctions,
type SchemaResponse,
} from "./Connection";
// sqlite
export { LibsqlConnection, type LibSqlCredentials } from "./sqlite/LibsqlConnection";
export { SqliteConnection } from "./sqlite/SqliteConnection";
export { SqliteLocalConnection } from "./sqlite/SqliteLocalConnection";
export { SqliteIntrospector } from "./sqlite/SqliteIntrospector";
// postgres
export { PostgresConnection, type PostgresConnectionConfig } from "./postgres/PostgresConnection";
export { PostgresIntrospector } from "./postgres/PostgresIntrospector";
export { SqliteLocalConnection } from "./sqlite/SqliteLocalConnection";

View File

@@ -1,103 +0,0 @@
import {
Kysely,
PostgresDialect,
type DatabaseIntrospector,
type ColumnDataType,
type ColumnDefinitionBuilder,
ParseJSONResultsPlugin,
} from "kysely";
import pg from "pg";
import { PostgresIntrospector } from "./PostgresIntrospector";
import {
type FieldSpec,
type SchemaResponse,
Connection,
type QB,
} from "data/connection/Connection";
export type PostgresConnectionConfig = pg.PoolConfig;
const plugins = [new ParseJSONResultsPlugin()];
class CustomPostgresDialect extends PostgresDialect {
override createIntrospector(db: Kysely<any>): DatabaseIntrospector {
return new PostgresIntrospector(db);
}
}
export class PostgresConnection extends Connection {
private pool: pg.Pool;
constructor(config: PostgresConnectionConfig) {
const pool = new pg.Pool(config);
const kysely = new Kysely({
dialect: new CustomPostgresDialect({
pool,
}),
plugins,
//log: ["query", "error"],
});
super(kysely, {}, plugins);
this.pool = pool;
}
override supportsIndices(): boolean {
return true;
}
override getFieldSchema(spec: FieldSpec): SchemaResponse {
this.validateFieldSpecType(spec.type);
let type: ColumnDataType = spec.primary ? "serial" : spec.type;
switch (spec.type) {
case "blob":
type = "bytea";
break;
case "date":
case "datetime":
// https://www.postgresql.org/docs/17/datatype-datetime.html
type = "timestamp";
break;
case "text":
// https://www.postgresql.org/docs/17/datatype-character.html
type = "varchar";
break;
}
return [
spec.name,
type,
(col: ColumnDefinitionBuilder) => {
if (spec.primary) {
return col.primaryKey();
}
if (spec.references) {
return col
.references(spec.references)
.onDelete(spec.onDelete ?? "set null")
.onUpdate(spec.onUpdate ?? "no action");
}
return spec.nullable ? col : col.notNull();
},
];
}
override supportsBatching(): boolean {
return true;
}
override async close(): Promise<void> {
await this.pool.end();
}
protected override async batch<Queries extends QB[]>(
queries: [...Queries],
): Promise<{
[K in keyof Queries]: Awaited<ReturnType<Queries[K]["execute"]>>;
}> {
return this.kysely.transaction().execute(async (trx) => {
return Promise.all(queries.map((q) => trx.executeQuery(q).then((r) => r.rows)));
}) as any;
}
}

View File

@@ -1,127 +0,0 @@
import { type SchemaMetadata, sql } from "kysely";
import { BaseIntrospector } from "data/connection/BaseIntrospector";
type PostgresSchemaSpec = {
name: string;
type: "VIEW" | "BASE TABLE";
columns: {
name: string;
type: string;
notnull: number;
dflt: string;
pk: boolean;
}[];
indices: {
name: string;
origin: string;
partial: number;
sql: string;
columns: { name: string; seqno: number }[];
}[];
};
export class PostgresIntrospector extends BaseIntrospector {
async getSchemas(): Promise<SchemaMetadata[]> {
const rawSchemas = await this.db
.selectFrom("pg_catalog.pg_namespace")
.select("nspname")
.$castTo<{ nspname: string }>()
.execute();
return rawSchemas.map((it) => ({ name: it.nspname }));
}
async getSchemaSpec() {
const query = sql`
WITH tables_and_views AS (
SELECT table_name AS name,
table_type AS type
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type IN ('BASE TABLE', 'VIEW')
AND table_name NOT LIKE 'pg_%'
AND table_name NOT IN (${this.getExcludedTableNames().join(", ")})
),
columns_info AS (
SELECT table_name AS name,
json_agg(json_build_object(
'name', column_name,
'type', data_type,
'notnull', (CASE WHEN is_nullable = 'NO' THEN true ELSE false END),
'dflt', column_default,
'pk', (SELECT COUNT(*) > 0
FROM information_schema.table_constraints tc
INNER JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
WHERE tc.table_name = c.table_name
AND tc.constraint_type = 'PRIMARY KEY'
AND kcu.column_name = c.column_name)
)) AS columns
FROM information_schema.columns c
WHERE table_schema = 'public'
GROUP BY table_name
),
indices_info AS (
SELECT
t.relname AS table_name,
json_agg(json_build_object(
'name', i.relname,
'origin', pg_get_indexdef(i.oid),
'partial', (CASE WHEN ix.indisvalid THEN false ELSE true END),
'sql', pg_get_indexdef(i.oid),
'columns', (
SELECT json_agg(json_build_object(
'name', a.attname,
'seqno', x.ordinal_position
))
FROM unnest(ix.indkey) WITH ORDINALITY AS x(attnum, ordinal_position)
JOIN pg_attribute a ON a.attnum = x.attnum AND a.attrelid = t.oid
))) AS indices
FROM pg_class t
LEFT JOIN pg_index ix ON t.oid = ix.indrelid
LEFT JOIN pg_class i ON i.oid = ix.indexrelid
WHERE t.relkind IN ('r', 'v') -- r = table, v = view
AND t.relname NOT LIKE 'pg_%'
GROUP BY t.relname
)
SELECT
tv.name,
tv.type,
ci.columns,
ii.indices
FROM tables_and_views tv
LEFT JOIN columns_info ci ON tv.name = ci.name
LEFT JOIN indices_info ii ON tv.name = ii.table_name;
`;
const tables = await this.executeWithPlugins<PostgresSchemaSpec[]>(query);
return tables.map((table) => ({
name: table.name,
isView: table.type === "VIEW",
columns: table.columns.map((col) => {
return {
name: col.name,
dataType: col.type,
isNullable: !col.notnull,
// @todo: check default value on 'nextval' see https://www.postgresql.org/docs/17/datatype-numeric.html#DATATYPE-SERIAL
isAutoIncrementing: true, // just for now
hasDefaultValue: col.dflt != null,
comment: undefined,
};
}),
indices: table.indices.map((index) => ({
name: index.name,
table: table.name,
isUnique: index.sql?.match(/unique/i) != null,
columns: index.columns.map((col) => ({
name: col.name,
order: col.seqno,
})),
})),
}));
}
}

View File

@@ -1,8 +1,8 @@
import { type Client, type Config, type InStatement, createClient } from "@libsql/client";
import { LibsqlDialect } from "@libsql/kysely-libsql";
import { type DatabaseIntrospector, Kysely, ParseJSONResultsPlugin } from "kysely";
import { FilterNumericKeysPlugin } from "data/plugins/FilterNumericKeysPlugin";
import { KyselyPluginRunner } from "data/plugins/KyselyPluginRunner";
import { type DatabaseIntrospector, Kysely, ParseJSONResultsPlugin } from "kysely";
import type { QB } from "../Connection";
import { SqliteConnection } from "./SqliteConnection";
import { SqliteIntrospector } from "./SqliteIntrospector";
@@ -25,6 +25,9 @@ class CustomLibsqlDialect extends LibsqlDialect {
export class LibsqlConnection extends SqliteConnection {
private client: Client;
protected override readonly supported = {
batching: true,
};
constructor(client: Client);
constructor(credentials: LibSqlCredentials);
@@ -53,14 +56,6 @@ export class LibsqlConnection extends SqliteConnection {
this.client = client;
}
override supportsBatching(): boolean {
return true;
}
override supportsIndices(): boolean {
return true;
}
getClient(): Client {
return this.client;
}

View File

@@ -16,10 +16,6 @@ export class SqliteConnection extends Connection {
);
}
override supportsIndices(): boolean {
return true;
}
override getFieldSchema(spec: FieldSpec): SchemaResponse {
this.validateFieldSpecType(spec.type);
let type: ColumnDataType = spec.type;
@@ -47,8 +43,4 @@ export class SqliteConnection extends Connection {
},
] as const;
}
override async close(): Promise<void> {
// no-op
}
}

View File

@@ -1,5 +1,10 @@
import { type DatabaseIntrospector, ParseJSONResultsPlugin, type SqliteDatabase } from "kysely";
import { Kysely, SqliteDialect } from "kysely";
import {
type DatabaseIntrospector,
Kysely,
ParseJSONResultsPlugin,
type SqliteDatabase,
SqliteDialect,
} from "kysely";
import { SqliteConnection } from "./SqliteConnection";
import { SqliteIntrospector } from "./SqliteIntrospector";
@@ -23,8 +28,4 @@ export class SqliteLocalConnection extends SqliteConnection {
super(kysely, {}, plugins);
}
override supportsIndices(): boolean {
return true;
}
}

View File

@@ -49,10 +49,6 @@ export class SchemaManager {
constructor(private readonly em: EntityManager<any>) {}
private getIntrospector() {
if (!this.em.connection.supportsIndices()) {
throw new Error("Indices are not supported by the current connection");
}
return this.em.connection.getIntrospector();
}