rewrite libsql and cloudflare sqlite's to use the generic adapter

This commit is contained in:
dswbx
2025-07-02 14:02:33 +02:00
parent a9f3a582eb
commit d41fd5541f
16 changed files with 533 additions and 458 deletions

View File

@@ -6,7 +6,10 @@ import { Database } from "bun:sqlite";
describe("BunSqliteConnection", () => {
connectionTestSuite(bunTestRunner, {
makeConnection: () => bunSqlite({ database: new Database(":memory:") }),
makeConnection: () => ({
connection: bunSqlite({ database: new Database(":memory:") }),
dispose: async () => {},
}),
rawDialectDetails: [],
});
});

View File

@@ -1,8 +1,11 @@
import { expect, test, mock, describe } from "bun:test";
import { expect, test, mock, describe, beforeEach, afterEach, afterAll } from "bun:test";
export const bunTestRunner = {
describe,
expect,
test,
mock,
beforeEach,
afterEach,
afterAll,
};

View File

@@ -4,51 +4,30 @@ import { viTestRunner } from "adapter/node/vitest";
import { connectionTestSuite } from "data/connection/connection-test-suite";
import { Miniflare } from "miniflare";
import { d1Sqlite } from "./D1Connection";
import { sql } from "kysely";
describe("d1Sqlite", async () => {
const mf = new Miniflare({
modules: true,
script: "export default { async fetch() { return new Response(null); } }",
d1Databases: ["DB"],
});
const binding = (await mf.getD1Database("DB")) as D1Database;
test("connection", async () => {
const conn = d1Sqlite({ binding });
expect(conn.supports("batching")).toBe(true);
expect(conn.supports("softscans")).toBe(false);
});
test("query details", async () => {
const conn = d1Sqlite({ binding });
const res = await conn.executeQuery(sql`select 1`.compile(conn.kysely));
expect(res.rows).toEqual([{ "1": 1 }]);
expect(res.numAffectedRows).toBe(undefined);
expect(res.insertId).toBe(undefined);
// @ts-expect-error
expect(res.meta.changed_db).toBe(false);
// @ts-expect-error
expect(res.meta.rows_read).toBe(0);
const batchResult = await conn.executeQueries(
sql`select 1`.compile(conn.kysely),
sql`select 2`.compile(conn.kysely),
);
// rewrite to get index
for (const [index, result] of batchResult.entries()) {
expect(result.rows).toEqual([{ [String(index + 1)]: index + 1 }]);
expect(result.numAffectedRows).toBe(undefined);
expect(result.insertId).toBe(undefined);
// @ts-expect-error
expect(result.meta.changed_db).toBe(false);
}
});
connectionTestSuite(viTestRunner, {
makeConnection: () => d1Sqlite({ binding }),
rawDialectDetails: [],
makeConnection: async () => {
const mf = new Miniflare({
modules: true,
script: "export default { async fetch() { return new Response(null); } }",
d1Databases: ["DB"],
});
const binding = (await mf.getD1Database("DB")) as D1Database;
return {
connection: d1Sqlite({ binding }),
dispose: () => mf.dispose(),
};
},
rawDialectDetails: [
"meta.served_by",
"meta.duration",
"meta.changes",
"meta.changed_db",
"meta.size_after",
"meta.rows_read",
"meta.rows_written",
],
});
});

View File

@@ -1,138 +0,0 @@
import {
SqliteAdapter,
SqliteIntrospector,
SqliteQueryCompiler,
type CompiledQuery,
type DatabaseConnection,
type DatabaseIntrospector,
type Dialect,
type Driver,
type Kysely,
type QueryCompiler,
type QueryResult,
} from "kysely";
/**
* Config for the D1 dialect. Pass your D1 instance to this object that you bound in `wrangler.toml`.
*/
export interface D1DialectConfig {
database: D1Database;
}
/**
* D1 dialect that adds support for [Cloudflare D1][0] in [Kysely][1].
* The constructor takes the instance of your D1 database that you bound in `wrangler.toml`.
*
* ```typescript
* new D1Dialect({
* database: env.DB,
* })
* ```
*
* [0]: https://blog.cloudflare.com/introducing-d1/
* [1]: https://github.com/koskimas/kysely
*/
export class D1Dialect implements Dialect {
#config: D1DialectConfig;
constructor(config: D1DialectConfig) {
this.#config = config;
}
createAdapter() {
return new SqliteAdapter();
}
createDriver(): Driver {
return new D1Driver(this.#config);
}
createQueryCompiler(): QueryCompiler {
return new SqliteQueryCompiler();
}
createIntrospector(db: Kysely<any>): DatabaseIntrospector {
return new SqliteIntrospector(db);
}
}
class D1Driver implements Driver {
#config: D1DialectConfig;
constructor(config: D1DialectConfig) {
this.#config = config;
}
async init(): Promise<void> {}
async acquireConnection(): Promise<DatabaseConnection> {
return new D1Connection(this.#config);
}
async beginTransaction(conn: D1Connection): Promise<void> {
return await conn.beginTransaction();
}
async commitTransaction(conn: D1Connection): Promise<void> {
return await conn.commitTransaction();
}
async rollbackTransaction(conn: D1Connection): Promise<void> {
return await conn.rollbackTransaction();
}
async releaseConnection(_conn: D1Connection): Promise<void> {}
async destroy(): Promise<void> {}
}
class D1Connection implements DatabaseConnection {
#config: D1DialectConfig;
constructor(config: D1DialectConfig) {
this.#config = config;
}
async executeQuery<O>(compiledQuery: CompiledQuery): Promise<QueryResult<O>> {
const results = await this.#config.database
.prepare(compiledQuery.sql)
.bind(...compiledQuery.parameters)
.all();
if (results.error) {
throw new Error(results.error);
}
const numAffectedRows = results.meta.changes > 0 ? BigInt(results.meta.changes) : undefined;
return {
insertId:
results.meta.last_row_id === undefined || results.meta.last_row_id === null
? undefined
: BigInt(results.meta.last_row_id),
rows: (results?.results as O[]) || [],
numAffectedRows,
// @ts-ignore deprecated in kysely >= 0.23, keep for backward compatibility.
numUpdatedOrDeletedRows: numAffectedRows,
};
}
async beginTransaction() {
throw new Error("Transactions are not supported yet.");
}
async commitTransaction() {
throw new Error("Transactions are not supported yet.");
}
async rollbackTransaction() {
throw new Error("Transactions are not supported yet.");
}
// biome-ignore lint/correctness/useYield: <explanation>
async *streamQuery<O>(
_compiledQuery: CompiledQuery,
_chunkSize: number,
): AsyncIterableIterator<QueryResult<O>> {
throw new Error("D1 Driver does not support streaming");
}
}

View File

@@ -0,0 +1,83 @@
/// <reference types="@cloudflare/workers-types" />
import {
genericSqlite,
type GenericSqliteConnection,
} from "data/connection/sqlite/GenericSqliteConnection";
import type { QueryResult } from "kysely";
export type D1SqliteConnection = GenericSqliteConnection<D1Database>;
export type DurableObjecSql = DurableObjectState["storage"]["sql"];
export type D1ConnectionConfig<DB extends DurableObjecSql> =
| DurableObjectState
| {
sql: DB;
};
export function doSqlite<DB extends DurableObjecSql>(config: D1ConnectionConfig<DB>) {
const db = "sql" in config ? config.sql : config.storage.sql;
return genericSqlite(
"do-sqlite",
db,
(utils) => {
// must be async to work with the miniflare mock
const getStmt = async (sql: string, parameters?: any[] | readonly any[]) =>
await db.exec(sql, ...(parameters || []));
const mapResult = (
cursor: SqlStorageCursor<Record<string, SqlStorageValue>>,
): QueryResult<any> => {
const numAffectedRows =
cursor.rowsWritten > 0 ? utils.parseBigInt(cursor.rowsWritten) : undefined;
const insertId = undefined;
const obj = {
insertId,
numAffectedRows,
rows: cursor.toArray() || [],
// @ts-ignore
meta: {
rowsWritten: cursor.rowsWritten,
rowsRead: cursor.rowsRead,
databaseSize: db.databaseSize,
},
};
//console.info("mapResult", obj);
return obj;
};
return {
db,
batch: async (stmts) => {
// @todo: maybe wrap in a transaction?
// because d1 implicitly does a transaction on batch
return Promise.all(
stmts.map(async (stmt) => {
return mapResult(await getStmt(stmt.sql, stmt.parameters));
}),
);
},
query: utils.buildQueryFn({
all: async (sql, parameters) => {
const prep = getStmt(sql, parameters);
return mapResult(await prep).rows;
},
run: async (sql, parameters) => {
const prep = getStmt(sql, parameters);
return mapResult(await prep);
},
}),
close: () => {},
};
},
{
supports: {
batching: true,
softscans: false,
},
excludeTables: ["_cf_KV", "_cf_METADATA"],
},
);
}

View File

@@ -0,0 +1,92 @@
/// <reference types="@cloudflare/workers-types" />
import { describe, test, expect } from "vitest";
import { viTestRunner } from "adapter/node/vitest";
import { connectionTestSuite } from "data/connection/connection-test-suite";
import { Miniflare } from "miniflare";
import { doSqlite } from "./DoConnection";
const script = `
import { DurableObject } from "cloudflare:workers";
export class TestObject extends DurableObject {
constructor(ctx, env) {
super(ctx, env);
this.storage = ctx.storage;
}
async exec(sql, ...parameters) {
//return { sql, parameters }
const cursor = this.storage.sql.exec(sql, ...parameters);
return {
rows: cursor.toArray() || [],
rowsWritten: cursor.rowsWritten,
rowsRead: cursor.rowsRead,
databaseSize: this.storage.sql.databaseSize,
}
}
async databaseSize() {
return this.storage.sql.databaseSize;
}
}
export default {
async fetch(request, env) {
const stub = env.TEST_OBJECT.get(env.TEST_OBJECT.idFromName("test"));
return stub.fetch(request);
}
}
`;
describe("doSqlite", async () => {
connectionTestSuite(viTestRunner, {
makeConnection: async () => {
const mf = new Miniflare({
modules: true,
durableObjects: { TEST_OBJECT: { className: "TestObject", useSQLite: true } },
script,
});
const ns = await mf.getDurableObjectNamespace("TEST_OBJECT");
const id = ns.idFromName("test");
const stub = ns.get(id) as unknown as DurableObjectStub<
Rpc.DurableObjectBranded & {
exec: (sql: string, ...parameters: any[]) => Promise<any>;
}
>;
const stubs: any[] = [];
const mock = {
databaseSize: 0,
exec: async function (sql: string, ...parameters: any[]) {
// @ts-ignore
const result = (await stub.exec(sql, ...parameters)) as any;
this.databaseSize = result.databaseSize;
stubs.push(result);
return {
toArray: () => result.rows,
rowsWritten: result.rowsWritten,
rowsRead: result.rowsRead,
};
},
};
return {
connection: doSqlite({ sql: mock as any }),
dispose: async () => {
await Promise.all(
stubs.map((stub) => {
try {
return stub[Symbol.dispose]();
} catch (e) {}
}),
);
await mf.dispose();
},
};
},
rawDialectDetails: ["meta.rowsWritten", "meta.rowsRead", "meta.databaseSize"],
});
});

View File

@@ -1,11 +1,15 @@
import { nodeSqlite } from "./NodeSqliteConnection";
import { DatabaseSync } from "node:sqlite";
import { connectionTestSuite } from "data/connection/connection-test-suite";
import { describe, test, expect } from "vitest";
import { describe } from "vitest";
import { viTestRunner } from "../vitest";
describe("NodeSqliteConnection", () => {
connectionTestSuite({ describe, test, expect } as any, {
makeConnection: () => nodeSqlite({ database: new DatabaseSync(":memory:") }),
connectionTestSuite(viTestRunner, {
makeConnection: () => ({
connection: nodeSqlite({ database: new DatabaseSync(":memory:") }),
dispose: async () => {},
}),
rawDialectDetails: [],
});
});

View File

@@ -1,5 +1,5 @@
import nodeAssert from "node:assert/strict";
import { test, describe } from "node:test";
import { test, describe, beforeEach, afterEach } from "node:test";
import type { Matcher, Test, TestFn, TestRunner } from "core/test";
// Track mock function calls
@@ -97,4 +97,7 @@ export const nodeTestRunner: TestRunner = {
reject: (r) => nodeTestMatcher(r, failMsg),
}),
}),
beforeEach: beforeEach,
afterEach: afterEach,
afterAll: () => {},
};

View File

@@ -1,5 +1,5 @@
import type { TestFn, TestRunner, Test } from "core/test";
import { describe, test, expect, vi } from "vitest";
import { describe, test, expect, vi, beforeEach, afterEach, afterAll } from "vitest";
function vitestTest(label: string, fn: TestFn, options?: any) {
return test(label, fn as any);
@@ -47,4 +47,7 @@ export const viTestRunner: TestRunner = {
test: vitestTest,
expect: vitestExpect as any,
mock: (fn) => vi.fn(fn),
beforeEach: beforeEach,
afterEach: afterEach,
afterAll: afterAll,
};

View File

@@ -1,3 +1,5 @@
import type { MaybePromise } from "core/types";
export type Matcher<T = unknown> = {
toEqual: (expected: T, failMsg?: string) => void;
toBe: (expected: T, failMsg?: string) => void;
@@ -16,7 +18,7 @@ export interface Test {
skipIf: (condition: boolean) => (label: string, fn: TestFn) => void;
}
export type TestRunner = {
describe: (label: string, asyncFn: () => Promise<void>) => void;
describe: (label: string, asyncFn: () => MaybePromise<void>) => void;
test: Test;
mock: <T extends (...args: any[]) => any>(fn: T) => T | any;
expect: <T = unknown>(
@@ -26,6 +28,9 @@ export type TestRunner = {
resolves: Matcher<Awaited<T>>;
rejects: Matcher<Awaited<T>>;
};
beforeEach: (fn: () => MaybePromise<void>) => void;
afterEach: (fn: () => MaybePromise<void>) => void;
afterAll: (fn: () => MaybePromise<void>) => void;
};
export async function retry<T>(

View File

@@ -1,5 +1,9 @@
import type { TestRunner } from "core/test";
import { Connection, type FieldSpec } from "./Connection";
import { getPath } from "core/utils";
import * as proto from "data/prototype";
import { createApp } from "App";
import type { MaybePromise } from "core/types";
// @todo: add various datatypes: string, number, boolean, object, array, null, undefined, date, etc.
// @todo: add toDriver/fromDriver tests on all types and fields
@@ -10,77 +14,92 @@ export function connectionTestSuite(
makeConnection,
rawDialectDetails,
}: {
makeConnection: () => Connection;
makeConnection: () => MaybePromise<{
connection: Connection;
dispose: () => MaybePromise<void>;
}>;
rawDialectDetails: string[];
},
) {
const { test, expect, describe } = testRunner;
const { test, expect, describe, beforeEach, afterEach, afterAll } = testRunner;
test("pings", async () => {
const connection = makeConnection();
const res = await connection.ping();
expect(res).toBe(true);
});
describe("base", () => {
let ctx: Awaited<ReturnType<typeof makeConnection>>;
beforeEach(async () => {
ctx = await makeConnection();
});
afterEach(async () => {
await ctx.dispose();
});
test("initializes", async () => {
const connection = makeConnection();
await connection.init();
// @ts-expect-error
expect(connection.initialized).toBe(true);
expect(connection.client).toBeDefined();
});
test("pings", async () => {
const res = await ctx.connection.ping();
expect(res).toBe(true);
});
test("isConnection", async () => {
const connection = makeConnection();
expect(Connection.isConnection(connection)).toBe(true);
});
test("getFieldSchema", async () => {
const c = makeConnection();
const specToNode = (spec: FieldSpec) => {
test("initializes", async () => {
await ctx.connection.init();
// @ts-expect-error
const schema = c.kysely.schema.createTable("test").addColumn(...c.getFieldSchema(spec));
return schema.toOperationNode();
};
expect(ctx.connection.initialized).toBe(true);
expect(ctx.connection.client).toBeDefined();
});
{
// primary
const node = specToNode({
type: "integer",
name: "id",
primary: true,
});
const col = node.columns[0]!;
expect(col.primaryKey).toBe(true);
expect(col.notNull).toBe(true);
}
test("isConnection", async () => {
expect(Connection.isConnection(ctx.connection)).toBe(true);
});
{
// normal
const node = specToNode({
type: "text",
name: "text",
});
const col = node.columns[0]!;
expect(!col.primaryKey).toBe(true);
expect(!col.notNull).toBe(true);
}
test("getFieldSchema", async () => {
const specToNode = (spec: FieldSpec) => {
const schema = ctx.connection.kysely.schema
.createTable("test")
// @ts-expect-error
.addColumn(...ctx.connection.getFieldSchema(spec));
return schema.toOperationNode();
};
{
// nullable (expect to be same as normal)
const node = specToNode({
type: "text",
name: "text",
nullable: true,
});
const col = node.columns[0]!;
expect(!col.primaryKey).toBe(true);
expect(!col.notNull).toBe(true);
}
{
// primary
const node = specToNode({
type: "integer",
name: "id",
primary: true,
});
const col = node.columns[0]!;
expect(col.primaryKey).toBe(true);
expect(col.notNull).toBe(true);
}
{
// normal
const node = specToNode({
type: "text",
name: "text",
});
const col = node.columns[0]!;
expect(!col.primaryKey).toBe(true);
expect(!col.notNull).toBe(true);
}
{
// nullable (expect to be same as normal)
const node = specToNode({
type: "text",
name: "text",
nullable: true,
});
const col = node.columns[0]!;
expect(!col.primaryKey).toBe(true);
expect(!col.notNull).toBe(true);
}
});
});
describe("schema", async () => {
const connection = makeConnection();
const { connection, dispose } = await makeConnection();
afterAll(async () => {
await dispose();
});
const fields = [
{
type: "integer",
@@ -118,14 +137,16 @@ export function connectionTestSuite(
const qb = connection.kysely.selectFrom("test").selectAll();
const res = await connection.executeQuery(qb);
expect(res.rows).toEqual([expected]);
expect(rawDialectDetails.every((detail) => detail in res)).toBe(true);
expect(rawDialectDetails.every((detail) => getPath(res, detail) !== undefined)).toBe(true);
{
const res = await connection.executeQueries(qb, qb);
expect(res.length).toBe(2);
res.map((r) => {
expect(r.rows).toEqual([expected]);
expect(rawDialectDetails.every((detail) => detail in r)).toBe(true);
expect(rawDialectDetails.every((detail) => getPath(r, detail) !== undefined)).toBe(
true,
);
});
}
});
@@ -187,4 +208,146 @@ export function connectionTestSuite(
},
]);
});
describe("integration", async () => {
let ctx: Awaited<ReturnType<typeof makeConnection>>;
beforeEach(async () => {
ctx = await makeConnection();
});
afterEach(async () => {
await ctx.dispose();
});
test("should create app and ping", async () => {
const app = createApp({
connection: ctx.connection,
});
await app.build();
expect(app.version()).toBeDefined();
expect(await app.em.ping()).toBe(true);
});
test("should create a basic schema", async () => {
const schema = proto.em(
{
posts: proto.entity("posts", {
title: proto.text().required(),
content: proto.text(),
}),
comments: proto.entity("comments", {
content: proto.text(),
}),
},
(fns, s) => {
fns.relation(s.comments).manyToOne(s.posts);
fns.index(s.posts).on(["title"], true);
},
);
const app = createApp({
connection: ctx.connection,
initialConfig: {
data: schema.toJSON(),
},
});
await app.build();
expect(app.em.entities.length).toBe(2);
expect(app.em.entities.map((e) => e.name)).toEqual(["posts", "comments"]);
const api = app.getApi();
expect(
(
await api.data.createMany("posts", [
{
title: "Hello",
content: "World",
},
{
title: "Hello 2",
content: "World 2",
},
])
).data,
).toEqual([
{
id: 1,
title: "Hello",
content: "World",
},
{
id: 2,
title: "Hello 2",
content: "World 2",
},
] as any);
// try to create an existing
expect(
(
await api.data.createOne("posts", {
title: "Hello",
})
).ok,
).toBe(false);
// add a comment to a post
await api.data.createOne("comments", {
content: "Hello",
posts_id: 1,
});
// and then query using a `with` property
const result = await api.data.readMany("posts", { with: ["comments"] });
expect(result.length).toBe(2);
expect(result[0]?.comments?.length).toBe(1);
expect(result[0]?.comments?.[0]?.content).toBe("Hello");
expect(result[1]?.comments?.length).toBe(0);
});
test("should support uuid", async () => {
const schema = proto.em(
{
posts: proto.entity(
"posts",
{
title: proto.text().required(),
content: proto.text(),
},
{
primary_format: "uuid",
},
),
comments: proto.entity("comments", {
content: proto.text(),
}),
},
(fns, s) => {
fns.relation(s.comments).manyToOne(s.posts);
fns.index(s.posts).on(["title"], true);
},
);
const app = createApp({
connection: ctx.connection,
initialConfig: {
data: schema.toJSON(),
},
});
await app.build();
const config = app.toJSON();
// @ts-expect-error
expect(config.data.entities?.posts.fields?.id.config?.format).toBe("uuid");
const em = app.em;
const mutator = em.mutator(em.entity("posts"));
const data = await mutator.insertOne({ title: "Hello", content: "World" });
expect(data.data.id).toBeString();
expect(String(data.data.id).length).toBe(36);
});
});
}

View File

@@ -61,7 +61,7 @@ export class GenericSqliteConnection<DB = unknown> extends SqliteConnection<DB>
override async executeQueries<O extends ConnQuery[]>(...qbs: O): Promise<ConnQueryResults<O>> {
const executor = await this.getExecutor();
if (!executor.batch) {
console.warn("Batching is not supported by this database");
//$console.debug("Batching is not supported by this database");
return super.executeQueries(...qbs);
}

View File

@@ -68,32 +68,34 @@ export class SqliteIntrospector extends BaseIntrospector {
return tables.map((table) => ({
name: table.name,
isView: table.type === "view",
columns: table.columns.map((col) => {
const autoIncrementCol = table.sql
?.split(/[\(\),]/)
?.find((it) => it.toLowerCase().includes("autoincrement"))
?.trimStart()
?.split(/\s+/)?.[0]
?.replace(/["`]/g, "");
columns:
table.columns?.map((col) => {
const autoIncrementCol = table.sql
?.split(/[\(\),]/)
?.find((it) => it.toLowerCase().includes("autoincrement"))
?.trimStart()
?.split(/\s+/)?.[0]
?.replace(/["`]/g, "");
return {
name: col.name,
dataType: col.type,
isNullable: !col.notnull,
isAutoIncrementing: col.name === autoIncrementCol,
hasDefaultValue: col.dflt_value != null,
comment: undefined,
};
}),
indices: table.indices.map((index) => ({
name: index.name,
table: table.name,
isUnique: index.sql?.match(/unique/i) != null,
columns: index.columns.map((col) => ({
name: col.name,
order: col.seqno,
})),
})),
return {
name: col.name,
dataType: col.type,
isNullable: !col.notnull,
isAutoIncrementing: col.name === autoIncrementCol,
hasDefaultValue: col.dflt_value != null,
comment: undefined,
};
}) ?? [],
indices:
table.indices?.map((index) => ({
name: index.name,
table: table.name,
isUnique: index.sql?.match(/unique/i) != null,
columns: index.columns.map((col) => ({
name: col.name,
order: col.seqno,
})),
})) ?? [],
}));
}
}

View File

@@ -1,12 +1,15 @@
import { connectionTestSuite } from "../../connection-test-suite";
import { LibsqlConnection } from "./LibsqlConnection";
import { libsql } from "./LibsqlConnection";
import { bunTestRunner } from "adapter/bun/test";
import { describe } from "bun:test";
import { createClient } from "@libsql/client";
describe("LibsqlConnection", () => {
connectionTestSuite(bunTestRunner, {
makeConnection: () => new LibsqlConnection(createClient({ url: ":memory:" })),
rawDialectDetails: ["rowsAffected", "lastInsertRowid"],
makeConnection: () => ({
connection: libsql(createClient({ url: ":memory:" })),
dispose: async () => {},
}),
rawDialectDetails: [],
});
});

View File

@@ -1,9 +1,13 @@
import type { Client, Config, InStatement } from "@libsql/client";
import type { Client, Config, ResultSet } from "@libsql/client";
import { createClient } from "libsql-stateless-easy";
import { LibsqlDialect } from "./LibsqlDialect";
import { FilterNumericKeysPlugin } from "data/plugins/FilterNumericKeysPlugin";
import { type ConnQuery, type ConnQueryResults, SqliteConnection } from "bknd/data";
import {
genericSqlite,
type GenericSqliteConnection,
} from "data/connection/sqlite/GenericSqliteConnection";
import type { QueryResult } from "kysely";
export type LibsqlConnection = GenericSqliteConnection<Client>;
export type LibSqlCredentials = Config;
function getClient(clientOrCredentials: Client | LibSqlCredentials): Client {
@@ -15,39 +19,50 @@ function getClient(clientOrCredentials: Client | LibSqlCredentials): Client {
return clientOrCredentials as Client;
}
export class LibsqlConnection extends SqliteConnection<Client> {
override name = "libsql";
protected override readonly supported = {
batching: true,
softscans: true,
};
export function libsql(config: LibSqlCredentials | Client) {
const db = getClient(config);
constructor(clientOrCredentials: Client | LibSqlCredentials) {
const client = getClient(clientOrCredentials);
super({
excludeTables: ["libsql_wasm_func_table"],
dialect: LibsqlDialect,
dialectArgs: [{ client }],
additionalPlugins: [new FilterNumericKeysPlugin()],
});
this.client = client;
}
override async executeQueries<O extends ConnQuery[]>(...qbs: O): Promise<ConnQueryResults<O>> {
const compiled = this.getCompiled(...qbs);
const stms: InStatement[] = compiled.map((q) => {
return {
sql: q.sql,
args: q.parameters as any[],
return genericSqlite(
"libsql",
db,
(utils) => {
const mapResult = (result: ResultSet): QueryResult<any> => ({
insertId: result.lastInsertRowid,
numAffectedRows: BigInt(result.rowsAffected),
rows: result.rows,
});
const execute = async (sql: string, parameters?: any[] | readonly any[]) => {
const result = await db.execute({ sql, args: [...(parameters || [])] });
return mapResult(result);
};
});
return this.withTransformedRows(await this.client.batch(stms)) as any;
}
}
export function libsql(credentials: Client | LibSqlCredentials): LibsqlConnection {
return new LibsqlConnection(credentials);
return {
db,
batch: async (stmts) => {
const results = await db.batch(
stmts.map(({ sql, parameters }) => ({
sql,
args: parameters as any[],
})),
);
return results.map(mapResult);
},
query: utils.buildQueryFn({
all: async (sql, parameters) => {
return (await execute(sql, parameters)).rows;
},
run: execute,
}),
close: () => db.close(),
};
},
{
supports: {
batching: true,
softscans: true,
},
additionalPlugins: [new FilterNumericKeysPlugin()],
excludeTables: ["libsql_wasm_func_table"],
},
);
}

View File

@@ -1,145 +0,0 @@
import type { Client, Transaction, InValue } from "@libsql/client";
import {
SqliteAdapter,
SqliteIntrospector,
SqliteQueryCompiler,
type Kysely,
type Dialect,
type DialectAdapter,
type Driver,
type DatabaseIntrospector,
type QueryCompiler,
type TransactionSettings,
type DatabaseConnection,
type QueryResult,
type CompiledQuery,
} from "kysely";
export type LibsqlDialectConfig = {
client: Client;
};
export class LibsqlDialect implements Dialect {
#config: LibsqlDialectConfig;
constructor(config: LibsqlDialectConfig) {
this.#config = config;
}
createAdapter(): DialectAdapter {
return new SqliteAdapter();
}
createDriver(): Driver {
let client: Client;
let closeClient: boolean;
if ("client" in this.#config) {
client = this.#config.client;
closeClient = false;
} else {
throw new Error("Please specify either `client` or `url` in the LibsqlDialect config");
}
return new LibsqlDriver(client, closeClient);
}
createIntrospector(db: Kysely<any>): DatabaseIntrospector {
return new SqliteIntrospector(db);
}
createQueryCompiler(): QueryCompiler {
return new SqliteQueryCompiler();
}
}
export class LibsqlDriver implements Driver {
client: Client;
#closeClient: boolean;
constructor(client: Client, closeClient: boolean) {
this.client = client;
this.#closeClient = closeClient;
}
async init(): Promise<void> {}
async acquireConnection(): Promise<LibsqlConnection> {
return new LibsqlConnection(this.client);
}
async beginTransaction(
connection: LibsqlConnection,
_settings: TransactionSettings,
): Promise<void> {
await connection.beginTransaction();
}
async commitTransaction(connection: LibsqlConnection): Promise<void> {
await connection.commitTransaction();
}
async rollbackTransaction(connection: LibsqlConnection): Promise<void> {
await connection.rollbackTransaction();
}
async releaseConnection(_conn: LibsqlConnection): Promise<void> {}
async destroy(): Promise<void> {
if (this.#closeClient) {
this.client.close();
}
}
}
export class LibsqlConnection implements DatabaseConnection {
client: Client;
#transaction?: Transaction;
constructor(client: Client) {
this.client = client;
}
async executeQuery<R>(compiledQuery: CompiledQuery): Promise<QueryResult<R>> {
const target = this.#transaction ?? this.client;
const result = await target.execute({
sql: compiledQuery.sql,
args: compiledQuery.parameters as Array<InValue>,
});
return {
insertId: result.lastInsertRowid,
numAffectedRows: BigInt(result.rowsAffected),
rows: result.rows as Array<R>,
};
}
async beginTransaction() {
if (this.#transaction) {
throw new Error("Transaction already in progress");
}
this.#transaction = await this.client.transaction();
}
async commitTransaction() {
if (!this.#transaction) {
throw new Error("No transaction to commit");
}
await this.#transaction.commit();
this.#transaction = undefined;
}
async rollbackTransaction() {
if (!this.#transaction) {
throw new Error("No transaction to rollback");
}
await this.#transaction.rollback();
this.#transaction = undefined;
}
// biome-ignore lint/correctness/useYield: <explanation>
async *streamQuery<R>(
_compiledQuery: CompiledQuery,
_chunkSize: number,
): AsyncIterableIterator<QueryResult<R>> {
throw new Error("Libsql Driver does not support streaming yet");
}
}