fixed limbo batching issue by disabling batching (#133)

* fixed limbo batching issue by disabling batching

* updated @libsql/client to `0.15.2`
This commit is contained in:
dswbx
2025-04-02 20:19:20 +02:00
committed by GitHub
parent aaae8d9681
commit 75e2b96344
8 changed files with 179 additions and 56 deletions

View File

@@ -6,6 +6,7 @@ import { type DatabaseIntrospector, Kysely, ParseJSONResultsPlugin } from "kysel
import type { QB } from "../Connection";
import { SqliteConnection } from "./SqliteConnection";
import { SqliteIntrospector } from "./SqliteIntrospector";
import { $console } from "core";
export const LIBSQL_PROTOCOLS = ["wss", "https", "libsql"] as const;
export type LibSqlCredentials = Config & {
@@ -33,6 +34,7 @@ export class LibsqlConnection extends SqliteConnection {
constructor(credentials: LibSqlCredentials);
constructor(clientOrCredentials: Client | LibSqlCredentials) {
let client: Client;
let batching_enabled = true;
if (clientOrCredentials && "url" in clientOrCredentials) {
let { url, authToken, protocol } = clientOrCredentials;
if (protocol && LIBSQL_PROTOCOLS.includes(protocol)) {
@@ -42,6 +44,13 @@ export class LibsqlConnection extends SqliteConnection {
}
client = createClient({ url, authToken });
// currently there is an issue in limbo implementation
// that prevents batching from working correctly
if (/\.aws.*turso\.io$/.test(url)) {
$console.warn("Using an Turso AWS endpoint currently disables batching support");
batching_enabled = false;
}
} else {
client = clientOrCredentials;
}
@@ -54,6 +63,7 @@ export class LibsqlConnection extends SqliteConnection {
super(kysely, {}, plugins);
this.client = client;
this.supported.batching = batching_enabled;
}
getClient(): Client {

View File

@@ -27,9 +27,9 @@ export type RepositoryResponse<T = EntityData[]> = RepositoryRawResponse & {
entity: Entity;
data: T;
meta: {
total: number;
count: number;
items: number;
total?: number;
count?: number;
time?: number;
query?: {
sql: string;
@@ -47,6 +47,7 @@ export type RepositoryExistsResponse = RepositoryRawResponse & {
export type RepositoryOptions = {
silent?: boolean;
includeCounts?: boolean;
emgr?: EventManager<any>;
};
@@ -59,13 +60,17 @@ export class Repository<TBD extends object = DefaultDB, TB extends keyof TBD = a
constructor(
public em: EntityManager<TBD>,
public entity: Entity,
protected options?: RepositoryOptions,
protected options: RepositoryOptions = {},
) {
this.emgr = options?.emgr ?? new EventManager(MutatorEvents);
}
private cloneFor(entity: Entity) {
return new Repository(this.em, this.em.entity(entity), { emgr: this.emgr });
private cloneFor(entity: Entity, opts: Partial<RepositoryOptions> = {}) {
return new Repository(this.em, this.em.entity(entity), {
...this.options,
...opts,
emgr: this.emgr,
});
}
private get conn() {
@@ -172,28 +177,39 @@ export class Repository<TBD extends object = DefaultDB, TB extends keyof TBD = a
if (options.limit) validated.limit = options.limit;
if (options.offset) validated.offset = options.offset;
//$console.debug("Repository: options", { entity: entity.name, options, validated });
return validated;
}
protected async performQuery(qb: RepositoryQB): Promise<RepositoryResponse> {
const entity = this.entity;
protected async executeQb(qb: RepositoryQB) {
const compiled = qb.compile();
if (this.options?.silent !== true) {
$console.debug(`Repository: query\n${compiled.sql}\n`, compiled.parameters);
}
const start = performance.now();
const selector = (as = "count") => this.conn.fn.countAll<number>().as(as);
const countQuery = qb
.clearSelect()
.select(selector())
.clearLimit()
.clearOffset()
.clearGroupBy()
.clearOrderBy();
const totalQuery = this.conn.selectFrom(entity.name).select(selector("total"));
let result: any;
try {
result = await qb.execute();
} catch (e) {
if (this.options?.silent !== true) {
if (e instanceof Error) {
$console.error("[ERROR] Repository.executeQb", e.message);
}
throw e;
}
}
return {
result,
sql: compiled.sql,
parameters: [...compiled.parameters],
};
}
protected async performQuery(qb: RepositoryQB): Promise<RepositoryResponse> {
const entity = this.entity;
const compiled = qb.compile();
const payload = {
entity,
sql: compiled.sql,
@@ -209,13 +225,53 @@ export class Repository<TBD extends object = DefaultDB, TB extends keyof TBD = a
},
};
// don't batch (add counts) if `includeCounts` is set to false
// or when explicitly set to true and batching is not supported
if (
this.options?.includeCounts === false ||
(this.options?.includeCounts === true && !this.em.connection.supports("batching"))
) {
const start = performance.now();
const res = await this.executeQb(qb);
const time = Number.parseFloat((performance.now() - start).toFixed(2));
const result = res.result ?? [];
const data = this.em.hydrate(entity.name, result);
return {
...payload,
result,
data,
meta: {
...payload.meta,
total: undefined,
count: undefined,
items: data.length,
time,
},
};
}
if (this.options?.silent !== true) {
$console.debug(`Repository: query\n${compiled.sql}\n`, compiled.parameters);
}
const selector = (as = "count") => this.conn.fn.countAll<number>().as(as);
const countQuery = qb
.clearSelect()
.select(selector())
.clearLimit()
.clearOffset()
.clearGroupBy()
.clearOrderBy();
const totalQuery = this.conn.selectFrom(entity.name).select(selector());
try {
const start = performance.now();
const [_count, _total, result] = await this.em.connection.batchQuery([
countQuery,
totalQuery,
qb,
]);
//$console.log("result", { _count, _total });
const time = Number.parseFloat((performance.now() - start).toFixed(2));
const data = this.em.hydrate(entity.name, result);
@@ -227,7 +283,7 @@ export class Repository<TBD extends object = DefaultDB, TB extends keyof TBD = a
meta: {
...payload.meta,
// parsing is important since pg returns string
total: ensureInt(_total[0]?.total),
total: ensureInt(_total[0]?.count),
count: ensureInt(_count[0]?.count),
items: result.length,
time,
@@ -435,8 +491,7 @@ export class Repository<TBD extends object = DefaultDB, TB extends keyof TBD = a
qb = WhereBuilder.addClause(qb, options.where);
}
const compiled = qb.compile();
const result = await qb.execute();
const { result, ...compiled } = await this.executeQb(qb);
return {
sql: compiled.sql,
@@ -454,15 +509,9 @@ export class Repository<TBD extends object = DefaultDB, TB extends keyof TBD = a
let qb = this.conn.selectFrom(entity.name).select(selector);
// add mandatory where
qb = WhereBuilder.addClause(qb, options.where);
qb = WhereBuilder.addClause(qb, options.where).limit(1);
// we only need 1
qb = qb.limit(1);
const compiled = qb.compile();
//$console.log("exists query", compiled.sql, compiled.parameters);
const result = await qb.execute();
//$console.log("result", result);
const { result, ...compiled } = await this.executeQb(qb);
return {
sql: compiled.sql,

View File

@@ -6,6 +6,7 @@ import type { ModuleConfigs } from "modules";
import {
BooleanField,
type BooleanFieldConfig,
type Connection,
DateField,
type DateFieldConfig,
Entity,
@@ -171,8 +172,6 @@ export class FieldPrototype {
}
}
//type Entity<Fields extends Record<string, Field<any, any>> = {}> = { name: string; fields: Fields };
export function entity<
EntityName extends string,
Fields extends Record<string, Field<any, any, any>>,
@@ -270,6 +269,10 @@ class EntityManagerPrototype<Entities extends Record<string, Entity>> extends En
) {
super(Object.values(__entities), new DummyConnection(), relations, indices);
}
withConnection(connection: Connection): EntityManager<Schema<Entities>> {
return new EntityManager(this.entities, connection, this.relations.all, this.indices);
}
}
type Chained<R extends Record<string, (...args: any[]) => any>> = {
@@ -326,6 +329,7 @@ export function em<Entities extends Record<string, Entity>>(
entities: e.__entities,
relations,
indices,
proto: e,
toJSON: () =>
e.toJSON() as unknown as Pick<ModuleConfigs["data"], "entities" | "relations" | "indices">,
};