Merge pull request #186 from bknd-io/feat/unify-connections

feat/unify-connections
This commit is contained in:
dswbx
2025-06-13 17:20:05 +02:00
committed by GitHub
86 changed files with 1816 additions and 1779 deletions

View File

@@ -13,30 +13,32 @@ describe("cf adapter", () => {
const DB_URL = ":memory:";
const $ctx = (env?: any, request?: Request, ctx?: ExecutionContext) => ({
request: request ?? (null as any),
env: env ?? { DB_URL },
env: env ?? { url: DB_URL },
ctx: ctx ?? (null as any),
});
it("makes config", async () => {
expect(
makeConfig(
{
connection: { url: DB_URL },
},
$ctx({ DB_URL }),
),
).toEqual({ connection: { url: DB_URL } });
const staticConfig = makeConfig(
{
connection: { url: DB_URL },
initialConfig: { data: { basepath: DB_URL } },
},
$ctx({ DB_URL }),
);
expect(staticConfig.initialConfig).toEqual({ data: { basepath: DB_URL } });
expect(staticConfig.connection).toBeDefined();
expect(
makeConfig(
{
app: (env) => ({
connection: { url: env.DB_URL },
}),
},
$ctx({ DB_URL }),
),
).toEqual({ connection: { url: DB_URL } });
const dynamicConfig = makeConfig(
{
app: (env) => ({
initialConfig: { data: { basepath: env.DB_URL } },
connection: { url: env.DB_URL },
}),
},
$ctx({ DB_URL }),
);
expect(dynamicConfig.initialConfig).toEqual({ data: { basepath: DB_URL } });
expect(dynamicConfig.connection).toBeDefined();
});
adapterTestSuite<CloudflareBkndConfig, CfMakeConfigArgs<any>>(bunTestRunner, {

View File

@@ -9,6 +9,7 @@ import { makeConfig as makeAdapterConfig } from "bknd/adapter";
import type { Context, ExecutionContext } from "hono";
import { $console } from "core";
import { setCookie } from "hono/cookie";
import { sqlite } from "bknd/adapter/sqlite";
export const constants = {
exec_async_event_id: "cf_register_waituntil",
@@ -98,54 +99,70 @@ export function makeConfig<Env extends CloudflareEnv = CloudflareEnv>(
const appConfig = makeAdapterConfig(config, args?.env);
if (args?.env) {
const bindings = config.bindings?.(args?.env);
// if connection instance is given, don't do anything
// other than checking if D1 session is defined
if (D1Connection.isConnection(appConfig.connection)) {
if (config.d1?.session) {
// we cannot guarantee that db was opened with session
throw new Error(
"D1 session don't work when D1 is directly given as connection. Define it in bindings instead.",
);
}
// if connection is given, try to open with unified sqlite adapter
} else if (appConfig.connection) {
appConfig.connection = sqlite(appConfig.connection);
// if connection is not given, but env is set
// try to make D1 from bindings
} else if (args?.env) {
const bindings = config.bindings?.(args?.env);
const sessionHelper = d1SessionHelper(config);
const sessionId = sessionHelper.get(args.request);
let session: D1DatabaseSession | undefined;
let db: D1Database | undefined;
if (!appConfig.connection) {
let db: D1Database | undefined;
if (bindings?.db) {
$console.log("Using database from bindings");
db = bindings.db;
} else if (Object.keys(args).length > 0) {
const binding = getBinding(args.env, "D1Database");
if (binding) {
$console.log(`Using database from env "${binding.key}"`);
db = binding.value;
}
}
// if db is given in bindings, use it
if (bindings?.db) {
$console.log("Using database from bindings");
db = bindings.db;
if (db) {
if (config.d1?.session) {
session = db.withSession(sessionId ?? config.d1?.first);
appConfig.connection = new D1Connection({ binding: session });
} else {
appConfig.connection = new D1Connection({ binding: db });
}
} else {
throw new Error("No database connection given");
// scan for D1Database in args
} else {
const binding = getBinding(args.env, "D1Database");
if (binding) {
$console.log(`Using database from env "${binding.key}"`);
db = binding.value;
}
}
if (config.d1?.session) {
appConfig.options = {
...appConfig.options,
manager: {
...appConfig.options?.manager,
onServerInit: (server) => {
server.use(async (c, next) => {
sessionHelper.set(c, session);
await next();
});
// if db is found, check if session is requested
if (db) {
if (config.d1?.session) {
session = db.withSession(sessionId ?? config.d1?.first);
appConfig.connection = new D1Connection({ binding: session });
appConfig.options = {
...appConfig.options,
manager: {
...appConfig.options?.manager,
onServerInit: (server) => {
server.use(async (c, next) => {
sessionHelper.set(c, session);
await next();
});
},
},
},
};
};
} else {
appConfig.connection = new D1Connection({ binding: db });
}
}
}
if (!D1Connection.isConnection(appConfig.connection)) {
throw new Error("Couldn't find database connection");
}
return appConfig;
}

View File

@@ -1,65 +1,42 @@
/// <reference types="@cloudflare/workers-types" />
import { KyselyPluginRunner, SqliteConnection, SqliteIntrospector } from "bknd/data";
import type { QB } from "data/connection/Connection";
import { type DatabaseIntrospector, Kysely, ParseJSONResultsPlugin } from "kysely";
import { SqliteConnection } from "bknd/data";
import type { ConnQuery, ConnQueryResults } from "data/connection/Connection";
import { D1Dialect } from "kysely-d1";
export type D1ConnectionConfig<DB extends D1Database | D1DatabaseSession = D1Database> = {
binding: DB;
};
class CustomD1Dialect extends D1Dialect {
override createIntrospector(db: Kysely<any>): DatabaseIntrospector {
return new SqliteIntrospector(db, {
excludeTables: ["_cf_KV", "_cf_METADATA"],
});
}
}
export class D1Connection<
DB extends D1Database | D1DatabaseSession = D1Database,
> extends SqliteConnection {
> extends SqliteConnection<DB> {
override name = "sqlite-d1";
protected override readonly supported = {
batching: true,
softscans: false,
};
constructor(private config: D1ConnectionConfig<DB>) {
const plugins = [new ParseJSONResultsPlugin()];
const kysely = new Kysely({
dialect: new CustomD1Dialect({ database: config.binding as D1Database }),
plugins,
super({
excludeTables: ["_cf_KV", "_cf_METADATA"],
dialect: D1Dialect,
dialectArgs: [{ database: config.binding as D1Database }],
});
super(kysely, {}, plugins);
}
get client(): DB {
return this.config.binding;
}
override async executeQueries<O extends ConnQuery[]>(...qbs: O): Promise<ConnQueryResults<O>> {
const compiled = this.getCompiled(...qbs);
protected override async batch<Queries extends QB[]>(
queries: [...Queries],
): Promise<{
[K in keyof Queries]: Awaited<ReturnType<Queries[K]["execute"]>>;
}> {
const db = this.config.binding;
const res = await db.batch(
queries.map((q) => {
const { sql, parameters } = q.compile();
compiled.map(({ sql, parameters }) => {
return db.prepare(sql).bind(...parameters);
}),
);
// let it run through plugins
const kyselyPlugins = new KyselyPluginRunner(this.plugins);
const data: any = [];
for (const r of res) {
const rows = await kyselyPlugins.transformResultRows(r.results);
data.push(rows);
}
return data;
return this.withTransformedRows(res, "results") as any;
}
}

View File

@@ -64,7 +64,7 @@ export class DurableBkndApp extends DurableObject {
"type" in config.connection &&
config.connection.type === "libsql"
) {
config.connection.config.protocol = "wss";
//config.connection.config.protocol = "wss";
}
this.app = await createRuntimeApp({

View File

@@ -1,32 +0,0 @@
import { createWriteStream, readFileSync } from "node:fs";
import { test } from "node:test";
import { Miniflare } from "miniflare";
import { StorageR2Adapter } from "./StorageR2Adapter";
import { adapterTestSuite } from "media";
import { nodeTestRunner } from "adapter/node/test";
import path from "node:path";
// https://github.com/nodejs/node/issues/44372#issuecomment-1736530480
console.log = async (message: any) => {
const tty = createWriteStream("/dev/tty");
const msg = typeof message === "string" ? message : JSON.stringify(message, null, 2);
return tty.write(`${msg}\n`);
};
test("StorageR2Adapter", async () => {
const mf = new Miniflare({
modules: true,
script: "export default { async fetch() { return new Response(null); } }",
r2Buckets: ["BUCKET"],
});
const bucket = (await mf.getR2Bucket("BUCKET")) as unknown as R2Bucket;
const adapter = new StorageR2Adapter(bucket);
const basePath = path.resolve(import.meta.dirname, "../../../../__test__/_assets");
const buffer = readFileSync(path.join(basePath, "image.png"));
const file = new File([buffer], "image.png", { type: "image/png" });
await adapterTestSuite(nodeTestRunner, adapter, file);
await mf.dispose();
});

View File

@@ -0,0 +1,32 @@
import { readFileSync } from "node:fs";
import { Miniflare } from "miniflare";
import { StorageR2Adapter } from "./StorageR2Adapter";
import { adapterTestSuite } from "media/storage/adapters/adapter-test-suite";
import path from "node:path";
import { describe, afterAll, test, expect } from "vitest";
import { viTestRunner } from "adapter/node/vitest";
let mf: Miniflare | undefined;
describe("StorageR2Adapter", async () => {
mf = new Miniflare({
modules: true,
script: "export default { async fetch() { return new Response(null); } }",
r2Buckets: ["BUCKET"],
});
const bucket = (await mf?.getR2Bucket("BUCKET")) as unknown as R2Bucket;
test("test", () => {
expect(bucket).toBeDefined();
});
const adapter = new StorageR2Adapter(bucket);
const basePath = path.resolve(import.meta.dirname, "../../../../__test__/_assets");
const buffer = readFileSync(path.join(basePath, "image.png"));
const file = new File([buffer], "image.png", { type: "image/png" });
await adapterTestSuite(viTestRunner, adapter, file);
});
afterAll(async () => {
await mf?.dispose();
});

View File

@@ -0,0 +1,14 @@
import { defineWorkersConfig } from "@cloudflare/vitest-pool-workers/config";
export default defineWorkersConfig({
test: {
poolOptions: {
workers: {
miniflare: {
compatibilityDate: "2025-06-04",
},
},
},
include: ["**/*.vi-test.ts", "**/*.vitest.ts"],
},
});