public commit

This commit is contained in:
dswbx
2024-11-16 12:01:47 +01:00
commit 90f80c4280
582 changed files with 49291 additions and 0 deletions

81
app/src/flows/AppFlows.ts Normal file
View File

@@ -0,0 +1,81 @@
import { type Static, transformObject } from "core/utils";
import { Flow, HttpTrigger } from "flows";
import { Hono } from "hono";
import { Module } from "modules/Module";
import { TASKS, flowsConfigSchema } from "./flows-schema";
export type AppFlowsSchema = Static<typeof flowsConfigSchema>;
export type TAppFlowSchema = AppFlowsSchema["flows"][number];
export type TAppFlowTriggerSchema = TAppFlowSchema["trigger"];
export type { TAppFlowTaskSchema } from "./flows-schema";
export class AppFlows extends Module<typeof flowsConfigSchema> {
private flows: Record<string, Flow> = {};
override async build() {
//console.log("building flows", this.config);
const flows = transformObject(this.config.flows, (flowConfig, name) => {
return Flow.fromObject(name, flowConfig as any, TASKS);
});
this.flows = flows;
const hono = new Hono();
hono.get("/", async (c) => {
const flowsInfo = transformObject(this.flows, (flow) => this.getFlowInfo(flow));
return c.json(flowsInfo);
});
hono.get("/flow/:name", async (c) => {
const name = c.req.param("name");
return c.json(this.flows[name]?.toJSON());
});
hono.get("/flow/:name/run", async (c) => {
const name = c.req.param("name");
const flow = this.flows[name]!;
const execution = flow.createExecution();
const start = performance.now();
await execution.start();
const time = performance.now() - start;
const errors = execution.getErrors();
return c.json({
success: errors.length === 0,
time,
errors,
response: execution.getResponse(),
flow: this.getFlowInfo(flow),
logs: execution.logs
});
});
this.ctx.server.route(this.config.basepath, hono);
// register flows
for (const [name, flow] of Object.entries(this.flows)) {
const trigger = flow.trigger;
switch (true) {
case trigger instanceof HttpTrigger:
await trigger.register(flow, this.ctx.server);
break;
}
}
this.setBuilt();
}
getSchema() {
return flowsConfigSchema;
}
private getFlowInfo(flow: Flow) {
return {
...flow.toJSON(),
tasks: flow.tasks.length,
connections: flow.connections
};
}
}

View File

@@ -0,0 +1,20 @@
import { Flow } from "../flows/Flow";
import { FetchTask } from "../tasks/presets/FetchTask";
import { LogTask } from "../tasks/presets/LogTask";
const first = new LogTask("First", { delay: 1000 });
const second = new LogTask("Second", { delay: 1000 });
const third = new LogTask("Long Third", { delay: 2500 });
const fourth = new FetchTask("Fetch Something", {
url: "https://jsonplaceholder.typicode.com/todos/1",
});
const fifth = new LogTask("Task 4", { delay: 500 }); // without connection
const simpleFetch = new Flow("simpleFetch", [first, second, third, fourth, fifth]);
simpleFetch.task(first).asInputFor(second);
simpleFetch.task(first).asInputFor(third);
simpleFetch.task(fourth).asOutputFor(third);
simpleFetch.setRespondingTask(fourth);
export { simpleFetch };

View File

@@ -0,0 +1,84 @@
import { Const, type Static, StringRecord, Type, transformObject } from "core/utils";
import { TaskMap, TriggerMap } from "flows";
export const TASKS = {
...TaskMap
} as const;
export const TRIGGERS = TriggerMap;
const taskSchemaObject = transformObject(TASKS, (task, name) => {
return Type.Object(
{
type: Const(name),
params: task.cls.schema
},
{ title: String(name), additionalProperties: false }
);
});
const taskSchema = Type.Union(Object.values(taskSchemaObject));
export type TAppFlowTaskSchema = Static<typeof taskSchema>;
const triggerSchemaObject = transformObject(TRIGGERS, (trigger, name) => {
return Type.Object(
{
type: Const(name),
config: trigger.cls.schema
},
{ title: String(name), additionalProperties: false }
);
});
const connectionSchema = Type.Object({
source: Type.String(),
target: Type.String(),
config: Type.Object(
{
condition: Type.Optional(
Type.Union([
Type.Object(
{ type: Const("success") },
{ additionalProperties: false, title: "success" }
),
Type.Object(
{ type: Const("error") },
{ additionalProperties: false, title: "error" }
),
Type.Object(
{ type: Const("matches"), path: Type.String(), value: Type.String() },
{ additionalProperties: false, title: "matches" }
)
])
),
max_retries: Type.Optional(Type.Number())
},
{ default: {}, additionalProperties: false }
)
});
// @todo: rework to have fixed ids per task and connections (and preferrably arrays)
// causes issues with canvas
export const flowSchema = Type.Object(
{
trigger: Type.Union(Object.values(triggerSchemaObject)),
tasks: Type.Optional(StringRecord(Type.Union(Object.values(taskSchemaObject)))),
connections: Type.Optional(StringRecord(connectionSchema, { default: {} })),
start_task: Type.Optional(Type.String()),
responding_task: Type.Optional(Type.String())
},
{
additionalProperties: false
}
);
export type TAppFlowSchema = Static<typeof flowSchema>;
export const flowsConfigSchema = Type.Object(
{
basepath: Type.String({ default: "/api/flows" }),
flows: StringRecord(flowSchema, { default: {} })
},
{
default: {},
additionalProperties: false
}
);

View File

@@ -0,0 +1,248 @@
import { Event, EventManager, type ListenerHandler } from "core/events";
import type { EmitsEvents } from "core/events";
import type { Task, TaskResult } from "../tasks/Task";
import type { Flow } from "./Flow";
export type TaskLog = TaskResult & {
task: Task;
end: Date;
};
export type InputsMap = Map<string, TaskResult>;
export class ExecutionEvent extends Event<{
task: Task;
result?: TaskResult;
end?: Date;
}> {
static override slug = "flow-execution-event";
task() {
return this.params.task;
}
getState() {
if (this.succeeded()) return "success";
if (this.failed()) return "failed";
if (this.isStart()) return "running";
return "idle";
}
isStart() {
return this.params.end === undefined;
}
isEnd() {
return !this.isStart();
}
succeeded() {
return this.isEnd() && this.params.result?.success;
}
failed() {
return this.isEnd() && !this.params.result?.success;
}
}
export class ExecutionState extends Event<{ execution: Execution; state: "started" | "ended" }> {
static override slug = "flow-execution-state";
}
type UnionFromRecord<T> = T[keyof T];
type ExecutionEvents = UnionFromRecord<typeof Execution.Events>;
export class Execution implements EmitsEvents {
flow: Flow;
started_at?: Date;
finished_at?: Date;
logs: TaskLog[] = [];
inputs: InputsMap = new Map();
// next tasks to execute
protected queue: Task[] = [];
emgr: EventManager;
static Events = { ExecutionEvent, ExecutionState };
constructor(flow: Flow) {
this.flow = flow;
this.logs = [];
this.queue = [this.flow.startTask];
this.emgr = new EventManager(Execution.Events);
}
subscribe(handler: ListenerHandler<ExecutionEvent | ExecutionState>) {
this.emgr.onAny(handler as any);
}
async onDone(task: Task, result: TaskResult) {
//console.log("Execution: resolved", task.name, result.success);
const end = new Date();
this.logs.push({ ...result, task, end });
this.inputs.set(task.name, result);
// if responding task completed
if (this.flow.respondingTask === task) {
this.queue = [];
return;
}
// clear task from queue
this.queue = this.queue.filter((t) => t !== task);
// check outgoing tasks and add to queue if all in-tasks are finished
/*console.log(
"Out tasks that matches",
this.flow
.task(task)
.getOutConnections(result)
.map((c) => [c.target.name, c.max_retries])
);*/
const nextTasks = this.flow
.task(task)
.getOutConnections(result)
.filter((c) => {
const t = c.target;
// @todo: potentially filter on "end" instead of "success"
// @todo: behaves weird
const target_runs = this.logs.filter((log) => log.task === t && log.success).length;
// max retry is set to the IN connection
const max_retries =
this.flow
.task(t)
.getInConnections()
.find((c) => c.source === task)?.max_retries ?? 0;
/*console.log(`tried ${task.name}->${t.name}`, {
target_runs,
max_retries
});*/
if (target_runs > max_retries) {
//console.log("*** Task reached max retries", t.name);
throw new Error(
`Task "${t.name}" reached max retries (${target_runs}/${max_retries})`
);
}
/*console.log(
"tasks?",
this.flow
.task(t)
.getInTasks(true)
.map((t) => t.name)
);*/
return this.flow
.task(t)
.getInTasks(true) // only lower
.every((t) => this.logs.some((log) => log.task === t && log.end !== undefined));
})
.map((c) => c.target);
/*console.log(
"--- next tasks",
nextTasks.map((t) => t.name)
);*/
//console.log("------");
this.queue.push(...nextTasks);
//await new Promise((resolve) => setTimeout(resolve, 1000));
}
__getLastTaskLog(task: Task) {
for (let i = this.logs.length - 1; i >= 0; i--) {
if (this.logs[i]?.task === task) {
return this.logs[i];
}
}
return null;
}
private async run() {
const tasks = this.queue;
if (tasks.length === 0) {
return;
}
//const promises = tasks.map((t) => t.run());
const promises = tasks.map(async (t) => {
await this.emgr.emit(new ExecutionEvent({ task: t }));
const result = await t.run(this.inputs);
await this.emgr.emit(new ExecutionEvent({ task: t, result, end: new Date() }));
await this.onDone(t, result);
return result;
});
try {
await Promise.all(promises);
return this.run();
} catch (e) {
console.log("RuntimeExecutor: error", e);
// for now just throw
// biome-ignore lint/complexity/noUselessCatch: @todo: add error task on flow
throw e;
}
}
async start(input?: any) {
await this.emgr.emit(new ExecutionState({ execution: this, state: "started" }));
// set initial input
this.inputs.set("flow", {
start: new Date(),
output: input, // @todo: remove
error: undefined,
success: true,
params: input
});
//graceful && (await new Promise((resolve) => setTimeout(resolve, 100)));
this.started_at = new Date();
await this.run();
this.finished_at = new Date();
await this.emgr.emit(new ExecutionState({ execution: this, state: "ended" }));
}
finished(): boolean {
return this.finished_at !== undefined;
}
errorCount(): number {
return this.logs.filter((log) => !log.success).length;
}
hasErrors(): boolean {
return this.errorCount() > 0;
}
getErrorLogs(): TaskLog[] {
return this.logs.filter((log) => !log.success);
}
getErrors(): any[] {
return this.getErrorLogs().map((log) => log.error);
}
getResponse() {
let respondingTask = this.flow.respondingTask;
if (!respondingTask) {
respondingTask = this.flow.tasks[this.flow.tasks.length - 1];
}
const lastLog = this.__getLastTaskLog(respondingTask!);
if (!lastLog) {
return;
}
return lastLog.output;
}
}

214
app/src/flows/flows/Flow.ts Normal file
View File

@@ -0,0 +1,214 @@
import { objectTransform, transformObject } from "core/utils";
import { type TaskMapType, TriggerMap } from "../index";
import type { Task } from "../tasks/Task";
import { Condition, TaskConnection } from "../tasks/TaskConnection";
import { Execution } from "./Execution";
import { FlowTaskConnector } from "./FlowTaskConnector";
import { Trigger } from "./triggers/Trigger";
type Jsoned<T extends { toJSON: () => object }> = ReturnType<T["toJSON"]>;
export class Flow {
name: string;
trigger: Trigger;
/**
* The tasks that are part of the flow
*/
tasks: Task[] = [];
/**
* The connections between tasks
*/
connections: TaskConnection[] = [];
/**
* The task that should mark the flow response.
* If none given, then the flow has no response.
*/
respondingTask?: Task;
startTask: Task;
// sequence of tasks
sequence: Task[][];
constructor(name: string, tasks: Task[], connections?: TaskConnection[], trigger?: Trigger) {
this.name = name;
this.trigger = trigger ?? new Trigger();
tasks.map((t) => this.addTask(t));
this.connections = connections || [];
// defaulting to the first given
this.startTask = tasks[0]!;
this.sequence = this.getSequence();
}
setStartTask(task: Task) {
this.startTask = task;
this.sequence = this.getSequence();
return this;
}
getSequence(sequence: Task[][] = []): Task[][] {
//console.log("queue", queue.map((step) => step.map((t) => t.name)));
// start task
if (sequence.length === 0) {
sequence.push([this.startTask]);
return this.getSequence(sequence);
}
const tasks = sequence[sequence.length - 1];
const nextStep: Task[] = [];
tasks?.forEach((task) => {
const outTasks = this.task(task).getOutTasks();
outTasks.forEach((outTask) => {
// check if task already in one of queue steps
// this is when we have a circle back
if (sequence.some((step) => step.includes(outTask))) {
//console.log("Task already in queue", outTask.name);
return;
}
nextStep.push(outTask);
});
});
// if no next steps, break out
if (nextStep.length === 0) {
return sequence;
}
sequence.push(nextStep);
return this.getSequence(sequence);
}
addTask(task: Task) {
// check if task exists
if (this.tasks.includes(task)) {
throw new Error("Task already defined");
}
if (this.tasks.some((t) => t.name === task.name)) {
throw new Error(`Task with name "${task.name}" already defined. Use a unique name.`);
}
this.tasks.push(task);
return this;
}
setRespondingTask(task: Task) {
// check if task exists
if (!this.tasks.includes(task)) {
throw new Error(`Cannot set task "${task.name}" as responding, not registered.`);
}
this.respondingTask = task;
return this;
}
/*getResponse() {
if (!this.respondingTask) {
return;
}
return this.respondingTask.log.output;
}*/
// @todo: check for existence
addConnection(connection: TaskConnection) {
// check if connection already exists
const exists = this.connections.some((c) => {
return (
c.source === connection.source &&
c.target === connection.target &&
// @todo: should it check for condition at all?
c.condition[0] === connection.condition[0] &&
c.condition[1] === connection.condition[1]
);
});
if (exists) {
throw new Error("Connection already defined");
}
this.connections.push(connection);
return this;
}
task(source: Task) {
return new FlowTaskConnector(this, source);
}
createExecution() {
this.sequence = this.getSequence();
return new Execution(this);
}
/**
* Shorthand for creating and starting an execution
*/
async start(input: any = undefined) {
const execution = this.createExecution();
await execution.start(input);
return execution;
}
toJSON() {
return {
trigger: this.trigger.toJSON(),
tasks: Object.fromEntries(this.tasks.map((t) => [t.name, t.toJSON()])),
connections: Object.fromEntries(this.connections.map((c) => [c.id, c.toJSON()])),
start_task: this.startTask.name,
responding_task: this.respondingTask ? this.respondingTask.name : null
};
}
static fromObject(name: string, obj: Jsoned<Flow>, taskMap: TaskMapType) {
const tasks = transformObject(obj.tasks ?? {}, (obj, name) => {
const taskClass = taskMap[obj.type];
if (!taskClass) {
throw new Error(`Task ${name} not found in taskMap`);
}
try {
const cls = taskClass.cls;
// @ts-ignore
return new cls(name, obj.params);
} catch (e: any) {
console.log("Error creating task", name, obj.type, obj, taskClass);
throw new Error(`Error creating task ${obj.type}: ${e.message}`);
}
});
const connections = transformObject(obj.connections ?? {}, (obj, id) => {
const condition = obj.config.condition
? Condition.fromObject(obj.config.condition)
: undefined;
return new TaskConnection(
tasks[obj.source],
tasks[obj.target],
{ ...obj.config, condition },
id as string
);
});
let trigger: Trigger | undefined;
if (obj.trigger) {
const cls = TriggerMap[obj.trigger.type as any]?.cls;
if (cls) {
trigger = new cls(obj.trigger.config);
}
}
const flow = new Flow(name, Object.values(tasks), Object.values(connections), trigger);
flow.startTask = obj.start_task ? tasks[obj.start_task] : null;
if (obj.responding_task) {
flow.respondingTask = tasks[obj.responding_task];
}
return flow;
}
}

View File

@@ -0,0 +1,118 @@
import type { Task, TaskResult } from "../tasks/Task";
import { type Condition, TaskConnection } from "../tasks/TaskConnection";
import type { Flow } from "./Flow";
// @todo: make singleton
export class FlowTaskConnector {
flow: Flow;
source: Task;
constructor(flow: Flow, source: Task) {
this.flow = flow;
this.source = source;
}
// helper function to use itself
private task(task: Task) {
return new FlowTaskConnector(this.flow, task);
}
asInputFor(target: Task, condition?: Condition, max_retries?: number) {
const ownDepth = this.getDepth();
const outConnections = this.getOutConnections();
const definedOutConditions = outConnections.map((c) => c.condition);
const hasOutGoingBack = outConnections.some(
(c) => this.task(c.target).getDepth() <= ownDepth
);
if (definedOutConditions.length > 0 && hasOutGoingBack) {
if (this.getOutConnections().some((c) => c.condition.sameAs(condition))) {
throw new Error("Task cannot be connected to a deeper task with the same condition");
}
}
/*const targetDepth = this.task(target).getDepth();
console.log("depth", ownDepth, targetDepth);
// if target has a lower depth
if (targetDepth > 0 && ownDepth >= targetDepth) {
// check for unique out conditions
console.log(
"out conditions",
this.source.name,
this.getOutConnections().map((c) => [c.target.name, c.condition])
);
if (
this.getOutConnections().some(
(c) =>
c.condition[0] === condition[0] &&
c.condition[1] === condition[1]
)
) {
throw new Error(
"Task cannot be connected to a deeper task with the same condition"
);
}
}*/
this.flow.addConnection(new TaskConnection(this.source, target, { condition, max_retries }));
}
asOutputFor(target: Task, condition?: Condition) {
this.task(target).asInputFor(this.source, condition);
//new FlowTaskConnector(this.flow, target).asInputFor(this.source);
//this.flow.addConnection(new TaskConnection(target, this.source));
}
getNext() {
return this.flow.connections.filter((c) => c.source === this.source).map((c) => c.target);
}
getDepth(): number {
return this.flow.getSequence().findIndex((s) => s.includes(this.source));
}
getInConnections(lower_only: boolean = false): TaskConnection[] {
if (lower_only) {
const depth = this.getDepth();
return this.getInConnections().filter(
(c) => c.target === this.source && this.task(c.source).getDepth() < depth
);
}
return this.flow.connections.filter((c) => c.target === this.source);
}
getInTasks(lower_only: boolean = false): Task[] {
if (lower_only) {
const depth = this.getDepth();
return this.getInConnections()
.map((c) => c.source)
.filter((t) => this.task(t).getDepth() < depth);
}
return this.getInConnections().map((c) => c.source);
}
getOutConnections(result?: TaskResult): TaskConnection[] {
if (result) {
return this.flow.connections.filter(
(c) => c.source === this.source && c.condition.isMet(result)
);
}
return this.flow.connections.filter((c) => c.source === this.source);
}
getOutTasks(result?: TaskResult): Task[] {
return this.getOutConnections(result).map((c) => c.target);
}
/*getNextRunnableConnections() {
return this.getOutConnections().filter((c) => c.source.log.success);
}
getNextRunnableTasks() {
return this.getNextRunnableConnections().map((c) => c.target);
}*/
}

View File

@@ -0,0 +1,28 @@
import type { Task } from "../../tasks/Task";
export class RuntimeExecutor {
async run(
nextTasks: () => Task[],
onDone?: (task: Task, result: Awaited<ReturnType<Task["run"]>>) => void
) {
const tasks = nextTasks();
if (tasks.length === 0) {
return;
}
//const promises = tasks.map((t) => t.run());
const promises = tasks.map(async (t) => {
const result = await t.run();
onDone?.(t, result);
return result;
});
try {
await Promise.all(promises);
} catch (e) {
console.log("RuntimeExecutor: error", e);
}
return this.run(nextTasks, onDone);
}
}

View File

@@ -0,0 +1,41 @@
import type { EventManager } from "core/events";
import { Type } from "core/utils";
import type { Flow } from "../Flow";
import { Trigger } from "./Trigger";
export class EventTrigger extends Trigger<typeof EventTrigger.schema> {
override type = "event";
static override schema = Type.Composite([
Trigger.schema,
Type.Object({
event: Type.String()
// add match
})
]);
override async register(flow: Flow, emgr: EventManager<any>) {
if (!emgr.eventExists(this.config.event)) {
throw new Error(`Event ${this.config.event} is not registered.`);
}
emgr.on(
this.config.event,
async (event) => {
console.log("event", event);
/*if (!this.match(event)) {
return;
}*/
const execution = flow.createExecution();
this.executions.push(execution);
try {
await execution.start(event.params);
} catch (e) {
console.error(e);
}
},
this.config.mode
);
}
}

View File

@@ -0,0 +1,49 @@
import { StringEnum, Type } from "core/utils";
import type { Context, Hono } from "hono";
import type { Flow } from "../Flow";
import { Trigger } from "./Trigger";
const httpMethods = ["GET", "POST", "PUT", "PATCH", "DELETE"] as const;
export class HttpTrigger extends Trigger<typeof HttpTrigger.schema> {
override type = "http";
static override schema = Type.Composite([
Trigger.schema,
Type.Object(
{
path: Type.String({ pattern: "^/.*$" }),
method: StringEnum(httpMethods, { default: "GET" }),
response_type: StringEnum(["json", "text", "html"], { default: "json" })
}
//{ additionalProperties: false }
)
]);
override async register(flow: Flow, hono: Hono<any>) {
const method = this.config.method.toLowerCase() as any;
hono[method](this.config.path, async (c: Context) => {
const params = c.req.raw;
const respond = c[this.config.response_type] as any;
const execution = flow.createExecution();
this.executions.push(execution);
if (this.config.mode === "sync") {
await execution.start(params);
const response = execution.getResponse();
const errors = execution.getErrors();
if (errors.length > 0) {
return c.json({ success: false, errors });
}
return respond(response);
}
execution.start(params);
return c.json({ success: true });
});
//console.log("--registered flow", flow.name, "on", method, this.config.path);
}
}

View File

@@ -0,0 +1,35 @@
import { type Static, StringEnum, Type, parse } from "core/utils";
import type { Execution } from "../Execution";
import type { Flow } from "../Flow";
export class Trigger<Schema extends typeof Trigger.schema = typeof Trigger.schema> {
// @todo: remove this
executions: Execution[] = [];
type = "manual";
config: Static<Schema>;
static schema = Type.Object(
{
mode: StringEnum(["sync", "async"], { default: "async" })
}
//{ additionalProperties: false }
);
constructor(config?: Partial<Static<Schema>>) {
const schema = (this.constructor as typeof Trigger).schema;
// @ts-ignore for now
this.config = parse(schema, config ?? {});
}
async register(flow: Flow, ...args: any[]): Promise<void> {
// @todo: remove this
this.executions.push(await flow.start());
}
toJSON() {
return {
type: this.type,
config: this.config
};
}
}

View File

@@ -0,0 +1,13 @@
import { EventTrigger } from "./EventTrigger";
import { HttpTrigger } from "./HttpTrigger";
import { Trigger } from "./Trigger";
export { Trigger, EventTrigger, HttpTrigger };
//export type TriggerMapType = { [key: string]: { cls: typeof Trigger } };
export const TriggerMap = {
manual: { cls: Trigger },
event: { cls: EventTrigger },
http: { cls: HttpTrigger }
} as const;
export type TriggerMapType = typeof TriggerMap;

41
app/src/flows/index.ts Normal file
View File

@@ -0,0 +1,41 @@
import { FetchTask } from "./tasks/presets/FetchTask";
import { LogTask } from "./tasks/presets/LogTask";
import { RenderTask } from "./tasks/presets/RenderTask";
import { SubFlowTask } from "./tasks/presets/SubFlowTask";
export { Flow } from "./flows/Flow";
export {
Execution,
type TaskLog,
type InputsMap,
ExecutionState,
ExecutionEvent
} from "./flows/Execution";
export { RuntimeExecutor } from "./flows/executors/RuntimeExecutor";
export { FlowTaskConnector } from "./flows/FlowTaskConnector";
export {
Trigger,
EventTrigger,
HttpTrigger,
TriggerMap,
type TriggerMapType
} from "./flows/triggers";
import { Task } from "./tasks/Task";
export { type TaskResult, type TaskRenderProps } from "./tasks/Task";
export { TaskConnection, Condition } from "./tasks/TaskConnection";
// test
//export { simpleFetch } from "./examples/simple-fetch";
//export type TaskMapType = { [key: string]: { cls: typeof Task<any> } };
export const TaskMap = {
fetch: { cls: FetchTask },
log: { cls: LogTask },
render: { cls: RenderTask },
subflow: { cls: SubFlowTask }
} as const;
export type TaskMapType = typeof TaskMap;
export { Task, FetchTask, LogTask, RenderTask, SubFlowTask };

View File

@@ -0,0 +1,235 @@
import type { StaticDecode, TSchema } from "@sinclair/typebox";
import type { NodeProps } from "@xyflow/react";
import { BkndError, SimpleRenderer } from "core";
import { type Static, type TObject, Type, Value, parse, ucFirst } from "core/utils";
import type { ExecutionEvent, InputsMap } from "../flows/Execution";
//type InstanceOf<T> = T extends new (...args: any) => infer R ? R : never;
export type TaskResult<Output = any> = {
start: Date;
output?: Output;
error?: any;
success: boolean;
params: any;
};
/*export type TaskRenderProps<T extends Task = Task> = NodeProps<{
task: T;
state: { i: number; isStartTask: boolean; isRespondingTask; event: ExecutionEvent | undefined };
}>;*/
export type TaskRenderProps<T extends Task = Task> = any;
export function dynamic<Type extends TSchema>(
type: Type,
parse?: (val: any | string) => Static<Type>
) {
const guessDecode = (val: unknown): Static<Type> => {
if (typeof val === "string") {
switch (type.type) {
case "object":
case "array":
return JSON.parse(val);
case "number":
return Number.parseInt(val);
case "boolean":
return val === "true" || val === "1";
}
}
return val as Static<Type>;
};
const decode = (val: unknown): Static<Type> => {
if (typeof val === "string") {
return parse ? parse(val) : guessDecode(val);
}
return val as Static<Type>;
};
const title = type.title ?? type.type ? ucFirst(type.type) : "Raw";
return (
Type.Transform(Type.Union([{ title, ...type }, Type.String({ title: "Template" })]))
.Decode(decode)
// @ts-ignore
.Encode((val) => val)
);
}
export abstract class Task<Params extends TObject = TObject, Output = unknown> {
abstract type: string;
name: string;
/**
* The schema of the task's parameters.
*/
static schema = Type.Object({});
/**
* The task's parameters.
*/
_params: Static<Params>;
constructor(name: string, params?: Static<Params>) {
if (typeof name !== "string") {
throw new Error(`Task name must be a string, got ${typeof name}`);
}
// @todo: should name be easier for object access?
this.name = name;
const schema = (this.constructor as typeof Task).schema;
if (
schema === Task.schema &&
typeof params !== "undefined" &&
Object.keys(params).length > 0
) {
throw new Error(
`Task "${name}" has no schema defined but params passed: ${JSON.stringify(params)}`
);
}
// @todo: string enums fail to validate
this._params = parse(schema, params || {});
/*const validator = new Validator(schema as any);
const _params = Default(schema, params || {});
const result = validator.validate(_params);
if (!result.valid) {
//console.log("---errors", result, { params, _params });
const error = result.errors[0]!;
throw new Error(
`Invalid params for task "${name}.${error.keyword}": "${
error.error
}". Params given: ${JSON.stringify(params)}`
);
}
this._params = _params as Static<Params>;*/
}
get params() {
return this._params as StaticDecode<Params>;
}
protected clone(name: string, params: Static<Params>): Task {
return new (this.constructor as any)(name, params);
}
static async resolveParams<S extends TSchema>(
schema: S,
params: any,
inputs: object = {}
): Promise<StaticDecode<S>> {
const newParams: any = {};
const renderer = new SimpleRenderer(inputs, { strictVariables: true, renderKeys: true });
//console.log("--resolveParams", params);
for (const [key, value] of Object.entries(params)) {
if (value && SimpleRenderer.hasMarkup(value)) {
//console.log("--- has markup", value);
try {
newParams[key] = await renderer.render(value as string);
} catch (e: any) {
// wrap in bknd error for better error display
if (!(e instanceof BkndError)) {
throw new BkndError(
"Failed to resolve param",
{
key,
value,
error: e.message
},
"resolve-params"
);
}
throw e;
}
continue;
} else {
//console.log("-- no markup", key, value);
}
newParams[key] = value;
}
//console.log("--beforeDecode", newParams);
const v = Value.Decode(schema, newParams);
//console.log("--afterDecode", v);
//process.exit();
return v;
}
private async cloneWithResolvedParams(_inputs: Map<string, any>) {
const inputs = Object.fromEntries(_inputs.entries());
//console.log("--clone:inputs", inputs, this.params);
const newParams = await Task.resolveParams(
(this.constructor as any).schema,
this._params,
inputs
);
//console.log("--clone:newParams", this.name, newParams);
return this.clone(this.name, newParams as any);
}
/**
* The internal execution of the flow.
* Wraps the execute() function to gather log results.
*/
async run(inputs: InputsMap = new Map()) {
const start = new Date();
let output: Output | undefined;
let error: any;
let success: boolean;
let params: any;
let time: number;
const starttime = performance.now();
try {
// create a copy with resolved params
const newTask = await this.cloneWithResolvedParams(inputs);
params = newTask.params;
output = (await newTask.execute(inputs)) as any;
success = true;
} catch (e: any) {
success = false;
//status.output = undefined;
if (e instanceof BkndError) {
error = e.toJSON();
} else {
error = {
type: "unknown",
message: (e as any).message
};
}
}
return { start, output, error, success, params, time: performance.now() - starttime };
}
protected error(message: string, details?: Record<string, any>) {
return new BkndError(message, details, "runtime");
}
abstract execute(inputs: Map<string, any>): Promise<Output>;
// that's for react flow's default node
get label() {
return this.name;
}
toJSON() {
return {
type: this.type,
params: this.params
};
}
}

View File

@@ -0,0 +1,106 @@
import { uuid } from "core/utils";
import { get } from "lodash-es";
import type { Task, TaskResult } from "./Task";
type TaskConnectionConfig = {
condition?: Condition;
max_retries?: number;
};
export class TaskConnection {
source: Task;
target: Task;
config: TaskConnectionConfig;
public id: string;
constructor(source: Task, target: Task, config?: TaskConnectionConfig, id?: string) {
this.source = source;
this.target = target;
this.config = config ?? {};
if (!(this.config.condition instanceof Condition)) {
this.config.condition = Condition.default();
}
this.id = id ?? uuid();
}
get condition(): Condition {
return this.config.condition as any;
}
get max_retries(): number {
return this.config.max_retries ?? 0;
}
toJSON() {
return {
source: this.source.name,
target: this.target.name,
config: {
...this.config,
condition: this.config.condition?.toJSON()
}
};
}
}
export class Condition {
private constructor(
public type: "success" | "error" | "matches",
public path: string = "",
public value: any = undefined
) {}
static default() {
return Condition.success();
}
static success() {
return new Condition("success");
}
static error() {
return new Condition("error");
}
static matches(path: string, value: any) {
if (typeof path !== "string" || path.length === 0) {
throw new Error("Invalid path");
}
return new Condition("matches", path, value);
}
isMet(result: TaskResult) {
switch (this.type) {
case "success":
return result.success;
case "error":
return result.success === false;
case "matches":
return get(result.output, this.path) === this.value;
//return this.value === output[this.path];
}
}
sameAs(condition: Condition = Condition.default()) {
return (
this.type === condition.type &&
this.path === condition.path &&
this.value === condition.value
);
}
toJSON() {
return {
type: this.type,
path: this.path.length === 0 ? undefined : this.path,
value: this.value
};
}
static fromObject(obj: ReturnType<Condition["toJSON"]>) {
return new Condition(obj.type, obj.path, obj.value);
}
}

View File

@@ -0,0 +1,81 @@
import { StringEnum, Type } from "core/utils";
import type { InputsMap } from "../../flows/Execution";
import { Task, dynamic } from "../Task";
const FetchMethods = ["GET", "POST", "PUT", "PATCH", "DELETE"];
export class FetchTask<Output extends Record<string, any>> extends Task<
typeof FetchTask.schema,
Output
> {
type = "fetch";
static override schema = Type.Object({
url: Type.String({
pattern: "^(http|https)://"
}),
//method: Type.Optional(Type.Enum(FetchMethodsEnum)),
//method: Type.Optional(dynamic(Type.String({ enum: FetchMethods, default: "GET" }))),
method: Type.Optional(dynamic(StringEnum(FetchMethods, { default: "GET" }))),
headers: Type.Optional(
dynamic(
Type.Array(
Type.Object({
key: Type.String(),
value: Type.String()
})
),
JSON.parse
)
),
body: Type.Optional(dynamic(Type.String())),
normal: Type.Optional(dynamic(Type.Number(), Number.parseInt))
});
protected getBody(): string | undefined {
const body = this.params.body;
if (!body) return;
if (typeof body === "string") return body;
if (typeof body === "object") return JSON.stringify(body);
throw new Error(`Invalid body type: ${typeof body}`);
}
async execute() {
//console.log(`method: (${this.params.method})`);
if (!FetchMethods.includes(this.params.method ?? "GET")) {
throw this.error("Invalid method", {
given: this.params.method,
valid: FetchMethods
});
}
const body = this.getBody();
const headers = new Headers(this.params.headers?.map((h) => [h.key, h.value]));
/*console.log("[FETCH]", {
url: this.params.url,
method: this.params.method ?? "GET",
headers,
body
});*/
const result = await fetch(this.params.url, {
method: this.params.method ?? "GET",
headers,
body
});
//console.log("fetch:response", result);
if (!result.ok) {
throw this.error("Failed to fetch", {
status: result.status,
statusText: result.statusText
});
}
const data = (await result.json()) as Output;
//console.log("fetch:response:data", data);
return data;
}
}

View File

@@ -0,0 +1,16 @@
import { Type } from "core/utils";
import { Task } from "../Task";
export class LogTask extends Task<typeof LogTask.schema> {
type = "log";
static override schema = Type.Object({
delay: Type.Number({ default: 10 })
});
async execute() {
await new Promise((resolve) => setTimeout(resolve, this.params.delay));
console.log(`[DONE] LogTask: ${this.name}`);
return true;
}
}

View File

@@ -0,0 +1,17 @@
import { Type } from "core/utils";
import { Task } from "../Task";
export class RenderTask<Output extends Record<string, any>> extends Task<
typeof RenderTask.schema,
Output
> {
type = "render";
static override schema = Type.Object({
render: Type.String()
});
async execute() {
return this.params.render as unknown as Output;
}
}

View File

@@ -0,0 +1,40 @@
import { Type } from "core/utils";
import { Flow } from "../../flows/Flow";
import { Task, dynamic } from "../Task";
export class SubFlowTask<Output extends Record<string, any>> extends Task<
typeof SubFlowTask.schema,
Output
> {
type = "subflow";
static override schema = Type.Object({
flow: Type.Any(),
input: Type.Optional(dynamic(Type.Any(), JSON.parse)),
loop: Type.Optional(Type.Boolean())
});
async execute() {
const flow = this.params.flow;
if (!(flow instanceof Flow)) {
throw new Error("Invalid flow provided");
}
if (this.params.loop) {
const _input = Array.isArray(this.params.input) ? this.params.input : [this.params.input];
const results: any[] = [];
for (const input of _input) {
const execution = flow.createExecution();
await execution.start(input);
results.push(await execution.getResponse());
}
return results;
}
const execution = flow.createExecution();
await execution.start(this.params.input);
return execution.getResponse();
}
}