feat: move postgres as part of the main repo

This commit is contained in:
dswbx
2025-10-31 17:13:23 +01:00
parent 5417aa174e
commit 2c7054c317
30 changed files with 310 additions and 403 deletions

View File

@@ -1,3 +1,4 @@
import type { MaybePromise } from "core/types";
import { getRuntimeKey as honoGetRuntimeKey } from "hono/adapter";
/**
@@ -77,3 +78,21 @@ export function threw(fn: () => any, instance?: new (...args: any[]) => Error) {
return true;
}
}
export async function $waitUntil(
message: string,
condition: () => MaybePromise<boolean>,
delay = 100,
maxAttempts = 10,
) {
let attempts = 0;
while (attempts < maxAttempts) {
if (await condition()) {
return;
}
await new Promise((resolve) => setTimeout(resolve, delay));
attempts++;
}
throw new Error(`$waitUntil: "${message}" failed after ${maxAttempts} attempts`);
}

View File

@@ -14,27 +14,31 @@ export function connectionTestSuite(
{
makeConnection,
rawDialectDetails,
disableConsoleLog: _disableConsoleLog = true,
}: {
makeConnection: () => MaybePromise<{
connection: Connection;
dispose: () => MaybePromise<void>;
}>;
rawDialectDetails: string[];
disableConsoleLog?: boolean;
},
) {
const { test, expect, describe, beforeEach, afterEach, afterAll, beforeAll } = testRunner;
beforeAll(() => disableConsoleLog());
afterAll(() => enableConsoleLog());
if (_disableConsoleLog) {
beforeAll(() => disableConsoleLog());
afterAll(() => enableConsoleLog());
}
describe("base", () => {
let ctx: Awaited<ReturnType<typeof makeConnection>>;
beforeEach(async () => {
ctx = await makeConnection();
});
afterEach(async () => {
await ctx.dispose();
});
let ctx: Awaited<ReturnType<typeof makeConnection>>;
beforeEach(async () => {
ctx = await makeConnection();
});
afterEach(async () => {
await ctx.dispose();
});
describe("base", async () => {
test("pings", async () => {
const res = await ctx.connection.ping();
expect(res).toBe(true);
@@ -98,52 +102,54 @@ export function connectionTestSuite(
});
describe("schema", async () => {
const { connection, dispose } = await makeConnection();
afterAll(async () => {
await dispose();
});
const makeSchema = async () => {
const fields = [
{
type: "integer",
name: "id",
primary: true,
},
{
type: "text",
name: "text",
},
{
type: "json",
name: "json",
},
] as const satisfies FieldSpec[];
const fields = [
{
type: "integer",
name: "id",
primary: true,
},
{
type: "text",
name: "text",
},
{
type: "json",
name: "json",
},
] as const satisfies FieldSpec[];
let b = ctx.connection.kysely.schema.createTable("test");
for (const field of fields) {
// @ts-expect-error
b = b.addColumn(...ctx.connection.getFieldSchema(field));
}
await b.execute();
let b = connection.kysely.schema.createTable("test");
for (const field of fields) {
// @ts-expect-error
b = b.addColumn(...connection.getFieldSchema(field));
}
await b.execute();
// add index
await connection.kysely.schema.createIndex("test_index").on("test").columns(["id"]).execute();
// add index
await ctx.connection.kysely.schema
.createIndex("test_index")
.on("test")
.columns(["id"])
.execute();
};
test("executes query", async () => {
await connection.kysely
await makeSchema();
await ctx.connection.kysely
.insertInto("test")
.values({ id: 1, text: "test", json: JSON.stringify({ a: 1 }) })
.execute();
const expected = { id: 1, text: "test", json: { a: 1 } };
const qb = connection.kysely.selectFrom("test").selectAll();
const res = await connection.executeQuery(qb);
const qb = ctx.connection.kysely.selectFrom("test").selectAll();
const res = await ctx.connection.executeQuery(qb);
expect(res.rows).toEqual([expected]);
expect(rawDialectDetails.every((detail) => getPath(res, detail) !== undefined)).toBe(true);
{
const res = await connection.executeQueries(qb, qb);
const res = await ctx.connection.executeQueries(qb, qb);
expect(res.length).toBe(2);
res.map((r) => {
expect(r.rows).toEqual([expected]);
@@ -155,15 +161,21 @@ export function connectionTestSuite(
});
test("introspects", async () => {
const tables = await connection.getIntrospector().getTables({
await makeSchema();
const tables = await ctx.connection.getIntrospector().getTables({
withInternalKyselyTables: false,
});
const clean = tables.map((t) => ({
...t,
columns: t.columns.map((c) => ({
...c,
dataType: undefined,
})),
columns: t.columns
.map((c) => ({
...c,
// ignore data type
dataType: undefined,
// ignore default value if "id"
hasDefaultValue: c.name !== "id" ? c.hasDefaultValue : undefined,
}))
.sort((a, b) => a.name.localeCompare(b.name)),
}));
expect(clean).toEqual([
@@ -176,14 +188,8 @@ export function connectionTestSuite(
dataType: undefined,
isNullable: false,
isAutoIncrementing: true,
hasDefaultValue: false,
},
{
name: "text",
dataType: undefined,
isNullable: true,
isAutoIncrementing: false,
hasDefaultValue: false,
hasDefaultValue: undefined,
comment: undefined,
},
{
name: "json",
@@ -191,25 +197,34 @@ export function connectionTestSuite(
isNullable: true,
isAutoIncrementing: false,
hasDefaultValue: false,
comment: undefined,
},
{
name: "text",
dataType: undefined,
isNullable: true,
isAutoIncrementing: false,
hasDefaultValue: false,
comment: undefined,
},
],
},
]);
expect(await ctx.connection.getIntrospector().getIndices()).toEqual([
{
name: "test_index",
table: "test",
isUnique: false,
columns: [
{
name: "id",
order: 0,
},
],
},
]);
});
expect(await connection.getIntrospector().getIndices()).toEqual([
{
name: "test_index",
table: "test",
isUnique: false,
columns: [
{
name: "id",
order: 0,
},
],
},
]);
});
describe("integration", async () => {

View File

@@ -0,0 +1,32 @@
import { Kysely, PostgresDialect } from "kysely";
import { PostgresIntrospector } from "./PostgresIntrospector";
import { PostgresConnection, plugins } from "./PostgresConnection";
import { customIntrospector } from "bknd";
import $pg from "pg";
export type PgPostgresConnectionConfig = $pg.PoolConfig;
export class PgPostgresConnection extends PostgresConnection<$pg.Pool> {
override name = "pg";
constructor(config: PgPostgresConnectionConfig) {
const pool = new $pg.Pool(config);
const kysely = new Kysely({
dialect: customIntrospector(PostgresDialect, PostgresIntrospector, {
excludeTables: [],
}).create({ pool }),
plugins,
});
super(kysely);
this.client = pool;
}
override async close(): Promise<void> {
await this.client.end();
}
}
export function pg(config: PgPostgresConnectionConfig): PgPostgresConnection {
return new PgPostgresConnection(config);
}

View File

@@ -0,0 +1,89 @@
import {
Connection,
type DbFunctions,
type FieldSpec,
type SchemaResponse,
type ConnQuery,
type ConnQueryResults,
} from "bknd";
import {
ParseJSONResultsPlugin,
type ColumnDataType,
type ColumnDefinitionBuilder,
type Kysely,
type KyselyPlugin,
type SelectQueryBuilder,
} from "kysely";
import { jsonArrayFrom, jsonBuildObject, jsonObjectFrom } from "kysely/helpers/postgres";
export type QB = SelectQueryBuilder<any, any, any>;
export const plugins = [new ParseJSONResultsPlugin()];
export abstract class PostgresConnection<Client = unknown> extends Connection<Client> {
protected override readonly supported = {
batching: true,
softscans: true,
};
constructor(kysely: Kysely<any>, fn?: Partial<DbFunctions>, _plugins?: KyselyPlugin[]) {
super(
kysely,
fn ?? {
jsonArrayFrom,
jsonBuildObject,
jsonObjectFrom,
},
_plugins ?? plugins,
);
}
override getFieldSchema(spec: FieldSpec): SchemaResponse {
this.validateFieldSpecType(spec.type);
let type: ColumnDataType = spec.type;
if (spec.primary) {
if (spec.type === "integer") {
type = "serial";
}
}
switch (spec.type) {
case "blob":
type = "bytea";
break;
case "date":
case "datetime":
// https://www.postgresql.org/docs/17/datatype-datetime.html
type = "timestamp";
break;
case "text":
// https://www.postgresql.org/docs/17/datatype-character.html
type = "varchar";
break;
}
return [
spec.name,
type,
(col: ColumnDefinitionBuilder) => {
if (spec.primary) {
return col.primaryKey().notNull();
}
if (spec.references) {
return col
.references(spec.references)
.onDelete(spec.onDelete ?? "set null")
.onUpdate(spec.onUpdate ?? "no action");
}
return col;
},
];
}
override async executeQueries<O extends ConnQuery[]>(...qbs: O): Promise<ConnQueryResults<O>> {
return this.kysely.transaction().execute(async (trx) => {
return Promise.all(qbs.map((q) => trx.executeQuery(q)));
}) as any;
}
}

View File

@@ -0,0 +1,128 @@
import { type SchemaMetadata, sql } from "kysely";
import { BaseIntrospector } from "bknd";
type PostgresSchemaSpec = {
name: string;
type: "VIEW" | "BASE TABLE";
columns: {
name: string;
type: string;
notnull: number;
dflt: string;
pk: boolean;
}[];
indices: {
name: string;
origin: string;
partial: number;
sql: string;
columns: { name: string; seqno: number }[];
}[];
};
export class PostgresIntrospector extends BaseIntrospector {
async getSchemas(): Promise<SchemaMetadata[]> {
const rawSchemas = await this.db
.selectFrom("pg_catalog.pg_namespace")
.select("nspname")
.$castTo<{ nspname: string }>()
.execute();
return rawSchemas.map((it) => ({ name: it.nspname }));
}
async getSchemaSpec() {
const query = sql`
WITH tables_and_views AS (
SELECT table_name AS name,
table_type AS type
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type IN ('BASE TABLE', 'VIEW')
AND table_name NOT LIKE 'pg_%'
AND table_name NOT IN (${this.getExcludedTableNames().join(", ")})
),
columns_info AS (
SELECT table_name AS name,
json_agg(json_build_object(
'name', column_name,
'type', data_type,
'notnull', (CASE WHEN is_nullable = 'NO' THEN true ELSE false END),
'dflt', column_default,
'pk', (SELECT COUNT(*) > 0
FROM information_schema.table_constraints tc
INNER JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
WHERE tc.table_name = c.table_name
AND tc.constraint_type = 'PRIMARY KEY'
AND kcu.column_name = c.column_name)
)) AS columns
FROM information_schema.columns c
WHERE table_schema = 'public'
GROUP BY table_name
),
indices_info AS (
SELECT
t.relname AS table_name,
json_agg(json_build_object(
'name', i.relname,
'origin', pg_get_indexdef(i.oid),
'partial', (CASE WHEN ix.indisvalid THEN false ELSE true END),
'sql', pg_get_indexdef(i.oid),
'columns', (
SELECT json_agg(json_build_object(
'name', a.attname,
'seqno', x.ordinal_position
))
FROM unnest(ix.indkey) WITH ORDINALITY AS x(attnum, ordinal_position)
JOIN pg_attribute a ON a.attnum = x.attnum AND a.attrelid = t.oid
))) AS indices
FROM pg_class t
LEFT JOIN pg_index ix ON t.oid = ix.indrelid
LEFT JOIN pg_class i ON i.oid = ix.indexrelid
WHERE t.relkind IN ('r', 'v') -- r = table, v = view
AND t.relname NOT LIKE 'pg_%'
GROUP BY t.relname
)
SELECT
tv.name,
tv.type,
ci.columns,
ii.indices
FROM tables_and_views tv
LEFT JOIN columns_info ci ON tv.name = ci.name
LEFT JOIN indices_info ii ON tv.name = ii.table_name;
`;
const tables = await this.executeWithPlugins<PostgresSchemaSpec[]>(query);
return tables.map((table) => ({
name: table.name,
isView: table.type === "VIEW",
columns: table.columns.map((col) => ({
name: col.name,
dataType: col.type,
isNullable: !col.notnull,
isAutoIncrementing: col.dflt?.toLowerCase().includes("nextval") ?? false,
hasDefaultValue: col.dflt != null,
comment: undefined,
})),
indices: table.indices
// filter out db-managed primary key index
.filter((index) => index.name !== `${table.name}_pkey`)
.map((index) => ({
name: index.name,
table: table.name,
isUnique: index.sql?.match(/unique/i) != null,
columns: index.columns.map((col) => ({
name: col.name,
// seqno starts at 1
order: col.seqno - 1,
})),
})),
}));
}
}

View File

@@ -0,0 +1,41 @@
import { Kysely } from "kysely";
import { PostgresIntrospector } from "./PostgresIntrospector";
import { PostgresConnection, plugins } from "./PostgresConnection";
import { customIntrospector } from "bknd";
import { PostgresJSDialect } from "kysely-postgres-js";
import $postgresJs, { type Sql, type Options, type PostgresType } from "postgres";
export type PostgresJsConfig = Options<Record<string, PostgresType>>;
export class PostgresJsConnection extends PostgresConnection<$postgresJs.Sql> {
override name = "postgres-js";
constructor(opts: { postgres: $postgresJs.Sql }) {
const kysely = new Kysely({
dialect: customIntrospector(PostgresJSDialect, PostgresIntrospector, {
excludeTables: [],
}).create({ postgres: opts.postgres }),
plugins,
});
super(kysely);
this.client = opts.postgres;
}
override async close(): Promise<void> {
await this.client.end();
}
}
export function postgresJs(
connectionString: string,
config?: PostgresJsConfig,
): PostgresJsConnection;
export function postgresJs(config: PostgresJsConfig): PostgresJsConnection;
export function postgresJs(
first: PostgresJsConfig | string,
second?: PostgresJsConfig,
): PostgresJsConnection {
const postgres = typeof first === "string" ? $postgresJs(first, second) : $postgresJs(first);
return new PostgresJsConnection({ postgres });
}

View File

@@ -0,0 +1,46 @@
import { customIntrospector, type DbFunctions } from "bknd";
import { Kysely, type Dialect, type KyselyPlugin } from "kysely";
import { plugins, PostgresConnection } from "./PostgresConnection";
import { PostgresIntrospector } from "./PostgresIntrospector";
export type Constructor<T> = new (...args: any[]) => T;
export type CustomPostgresConnection = {
supports?: PostgresConnection["supported"];
fn?: Partial<DbFunctions>;
plugins?: KyselyPlugin[];
excludeTables?: string[];
};
export function createCustomPostgresConnection<
T extends Constructor<Dialect>,
C extends ConstructorParameters<T>[0],
>(
name: string,
dialect: Constructor<Dialect>,
options?: CustomPostgresConnection,
): (config: C) => PostgresConnection {
const supported = {
batching: true,
...((options?.supports ?? {}) as any),
};
return (config: C) =>
new (class extends PostgresConnection {
override name = name;
override readonly supported = supported;
constructor(config: C) {
super(
new Kysely({
dialect: customIntrospector(dialect, PostgresIntrospector, {
excludeTables: options?.excludeTables ?? [],
}).create(config),
plugins: options?.plugins ?? plugins,
}),
options?.fn,
options?.plugins,
);
}
})(config);
}

View File

@@ -0,0 +1,5 @@
export { pg, PgPostgresConnection, type PgPostgresConnectionConfig } from "./PgPostgresConnection";
export { PostgresIntrospector } from "./PostgresIntrospector";
export { PostgresConnection, type QB, plugins } from "./PostgresConnection";
export { postgresJs, PostgresJsConnection, type PostgresJsConfig } from "./PostgresJsConnection";
export { createCustomPostgresConnection } from "./custom";

View File

@@ -13,7 +13,6 @@ import { customIntrospector } from "../Connection";
import { SqliteIntrospector } from "./SqliteIntrospector";
import type { Field } from "data/fields/Field";
// @todo: add pragmas
export type SqliteConnectionConfig<
CustomDialect extends Constructor<Dialect> = Constructor<Dialect>,
> = {

View File

@@ -83,7 +83,7 @@ export class SqliteIntrospector extends BaseIntrospector {
dataType: col.type,
isNullable: !col.notnull,
isAutoIncrementing: col.name === autoIncrementCol,
hasDefaultValue: col.dflt_value != null,
hasDefaultValue: col.name === autoIncrementCol ? true : col.dflt_value != null,
comment: undefined,
};
}) ?? [],

View File

@@ -1,8 +1,9 @@
import { test, describe, expect } from "bun:test";
import { test, describe, expect, beforeAll, afterAll } from "bun:test";
import * as q from "./query";
import { parse as $parse, type ParseOptions } from "bknd/utils";
import type { PrimaryFieldType } from "modules";
import type { Generated } from "kysely";
import { disableConsoleLog, enableConsoleLog } from "core/utils/test";
const parse = (v: unknown, o: ParseOptions = {}) =>
$parse(q.repoQuery, v, {
@@ -15,6 +16,9 @@ const decode = (input: any, output: any) => {
expect(parse(input)).toEqual(output);
};
beforeAll(() => disableConsoleLog());
afterAll(() => enableConsoleLog());
describe("server/query", () => {
test("limit & offset", () => {
//expect(() => parse({ limit: false })).toThrow();

View File

@@ -132,6 +132,8 @@ export type * from "data/entities/Entity";
export type { EntityManager } from "data/entities/EntityManager";
export type { SchemaManager } from "data/schema/SchemaManager";
export type * from "data/entities";
// data connection
export {
BaseIntrospector,
Connection,
@@ -144,9 +146,28 @@ export {
type ConnQuery,
type ConnQueryResults,
} from "data/connection";
// data sqlite
export { SqliteConnection } from "data/connection/sqlite/SqliteConnection";
export { SqliteIntrospector } from "data/connection/sqlite/SqliteIntrospector";
export { SqliteLocalConnection } from "data/connection/sqlite/SqliteLocalConnection";
// data postgres
export {
pg,
PgPostgresConnection,
type PgPostgresConnectionConfig,
} from "data/connection/postgres/PgPostgresConnection";
export { PostgresIntrospector } from "data/connection/postgres/PostgresIntrospector";
export { PostgresConnection } from "data/connection/postgres/PostgresConnection";
export {
postgresJs,
PostgresJsConnection,
type PostgresJsConfig,
} from "data/connection/postgres/PostgresJsConnection";
export { createCustomPostgresConnection } from "data/connection/postgres/custom";
// data prototype
export {
text,
number,