Data flows
This document describes the key data flows through the KiCI architecture: webhook delivery, job execution, developer-initiated remote runs, dependency caching, re-run and cancel, trace ID propagation, internal event routing, and generic webhook ingestion.
Lock file schema version: The lock file uses schema version 17, which widens per-job init to typed presets (
mise/{ mise }) andautodetection on top of v16’s normalized approval config, v15’s per-job init config, v14’s declarative cache specs, v11’sLockInlineValuefor pure function inline evaluation, v10’s simplified negative patterns (! prefix in repos/paths arrays), v9’s global workflow repos matching, and v8’s runsOn polymorphic type support.
Webhook delivery flow
Section titled “Webhook delivery flow”A webhook event from a provider (e.g., GitHub) travels through three tiers before execution begins.
GitHub --> Platform Relay --> Orchestrator --> Agent 1. Webhook 2. WebSocket 3. Job dispatch POST relay + executionStep by step
Section titled “Step by step”- Provider sends webhook to the Platform relay endpoint.
- Platform routes the webhook to the right orchestrator over WebSocket and forwards the body bytes verbatim. Platform never sees customer HMAC secrets — signature verification happens entirely on the orchestrator after reassembly.
- Orchestrator verifies signature (HMAC-SHA256 against per-source webhook secret, with dual-secret rotation support).
- Orchestrator dedup check against dual-layer
DedupCache(in-memory set +dedup_cacheDB table). - Orchestrator resolves provider by looking up the provider bundle from the
ProviderRegistryusinggetByRoutingKey()(exact match first, falls back to provider type prefix for backward compatibility). Skips processing if the provider is unknown. - Orchestrator normalizes the webhook via the provider’s
WebhookNormalizer(extracts branch, event type, action, sender). - Orchestrator extracts repo and credentials from payload (repository identifier from
repository.full_name, provider credentials such as GitHub installation ID). - Orchestrator handles /kici commands in
issue_commentevents: intercepts/kici approveand/kici rejectapproval commands before trigger matching, delegating tohandleApprovalComment()for security hold management. - Orchestrator resolves trust for PR events (determines lock file source: head vs base branch).
- Orchestrator fetches lock file via the provider’s
LockFileFetcher(cached with LRU). For untrusted PR events, fetches both base and head lock files in parallel; for trusted PRs and pushes, fetches from head SHA. - Orchestrator detects workflow modifications for untrusted PR events by comparing base and head lock files via
detectWorkflowModifications(), applying security holds when non-trusted contributors modify workflow files. - Orchestrator extracts registrations on default-branch pushes: persists registerable workflows (event, schedule, lifecycle triggers) for cluster-wide event matching.
- Orchestrator notifies the event router on default-branch pushes: after the registrations are persisted, emits a
registration.updatedevent viaeventRouter.emit()(if event routing is active). Workflow event subscriptions are the persisted registrations themselves, matched at emit time through the registration index. - Orchestrator fetches changed files via the provider’s
ChangedFilesFetcherfor path-based trigger filtering (skipped when no workflow uses path filters). - Orchestrator matches triggers against lock file using
matchAllWorkflows()from@kici-dev/engine. - Orchestrator checks caches for source tarballs and dependency tarballs.
- Orchestrator dispatches jobs to agents via the job queue and WebSocket.
- Orchestrator persists a delivery row keyed by
(org_id, delivery_id)to its ownevent_log, including a pointer to the gzipped payload in object storage. The orchestrator’s delivery log is surfaced in the dashboard’s Settings → Event log tab. Seewebhook-delivery.md.
Job execution flow
Section titled “Job execution flow”Once the orchestrator has matched triggers and resolved caches, jobs are dispatched to agents.
Orchestrator Agent Sandbox (child process) | | | |-- job.dispatch (WS) ------------>| | | (jobConfig, sourceTarUrl, | | | sourceTarHash, depsUrl, |-- Create sandbox ------->| | depsHash) | (container/bare-metal/ | | | firecracker) | | | |-- Restore .kici/ source (tarball) | | |-- Restore deps (tarball) | | |-- Load workflow (TS loader hook) | | |-- Evaluate rules | | |-- Execute steps | |<-- IPC (step status, ----| | | log lines, events) | |<-- job.status (WS) -------------| | | (step progress, completion) |-- Teardown sandbox ----->| | | |Agent pipeline
Section titled “Agent pipeline”The agent delegates job execution to an ExecutionSandbox (container, bare-metal, or firecracker). The sandbox runs customer code in an isolated child process — never in the agent’s V8 isolate. Four job types are handled: execution jobs (sandbox), build-only jobs (in-process, cache population), init-only jobs (in-process, dynamic field resolution), and DynamicJobFn evaluation jobs (in-process, runtime job generation). See Job execution lifecycle for details.
- Report running — Send
job.status: runningimmediately upon accepting the dispatch - Sandbox selection — Determine execution mode (container, bare-metal, firecracker) from job config and environment
- Sandbox setup — Create and start the execution environment (container:
docker create/start; bare-metal: validate; firecracker: detect) - Context emission — Send
job.contextto orchestrator with runtime details (Node version, OS, arch, sandbox type) - Sandbox execution — The sandbox child process handles the inner pipeline:
.kici/source tarball restore, deps tarball restore, workflow loading (dynamic-import.tsvia the shared TypeScript ESM loader hook), step extraction, rule evaluation, step execution sequentially with timeout and abort support. There is no runtime bundling step — workflow TS is transformed on import, not ahead of time. - IPC callbacks — Step status, log lines, event emissions, and concurrency reports flow from the sandbox to the agent via IPC, then to the orchestrator via WebSocket
- Report — Send final
job.statusback to orchestrator with step results and timing - Cleanup — Tear down sandbox and remove work directory
Remote run flow (kici run remote)
Section titled “Remote run flow (kici run remote)”A developer running kici run remote from a working tree initiates a run through the three-tier relay without a provider webhook. The flow splits into two independent planes: a control plane through the Platform relay, and a data plane that uploads the working-tree overlay straight to object storage.
Developer machine Platform relay Orchestrator Object store | | | | |-- upload-init (control) ----->|--- WS relay ----------->| | | (org, cluster, overlay | |-- mint presigned | | metadata, inline lock) | | PUT URL ---------->| |<-- presigned PUT URL ---------|<--- WS relay -----------| | | | | | |== overlay tarball PUT (data plane) ===========================================>| | | | | |-- trigger (control) --------->|--- WS relay ----------->|-- dispatch jobs | | | | (agents fetch | | | | overlay) | |-- poll logs + status -------->|--- WS relay ----------->| | |<-- log chunks + status -------|<--- WS relay -----------| |Control plane
Section titled “Control plane”Run initiation (upload-init), the trigger, status polling, log retrieval, and cancellation all flow from the developer machine to the Platform, which relays them over its WebSocket connection to the org’s orchestrator. The developer machine never talks to the orchestrator’s HTTP API directly. Logs are delivered by the CLI polling the Platform for log chunks — tracked by a monotonic line cursor — and run status until the run reaches a terminal state; there is no direct streaming socket between the developer machine and the orchestrator.
Data plane
Section titled “Data plane”The working-tree overlay tarball uploads directly from the developer machine to the orchestrator’s object store via a pre-signed PUT URL minted during upload-init. The overlay bytes never pass through the Platform. Because of this split, only the object store needs to be reachable from the developer machine — the orchestrator can sit behind a private network. See Storage layout for the upload-endpoint configuration.
Org anchor
Section titled “Org anchor”The run is dispatched to the developer’s active organization (selected with kici org use, or overridden per-run). The orchestrator anchors its bound organization with a system-managed remote source (routing key remote:<orgId>) that it auto-provisions — no manual webhook source is required, so a zero-source org is immediately routable for remote runs. The Platform forces the run’s routing key to remote:<orgId> server-side; the developer never sets a routing key. When an org has more than one connected orchestrator cluster, the CLI selects the target cluster explicitly (or relies on the per-org default), and a single connected cluster is auto-selected.
Remote runs are offered by the Platform; an orchestrator with no Platform connection cannot serve them. Executing workflow steps on the developer machine with no orchestrator is the separate kici run local path.
Source and dependency caching flow
Section titled “Source and dependency caching flow”KiCI runs two orchestrator-side caches — the source tarball cache (raw .kici/ directory minus node_modules/) and the dependency tarball cache (packed node_modules/). Both use a build-then-execute pattern: the orchestrator checks the caches before dispatching execution jobs, and if the source cache is cold a build agent populates both in one pass.
With the shared TypeScript loader hook plus source tarball, execution agents do not run git clone or compile anything at runtime — they perform exactly two S3 GETs (source + deps) and extract.
Cache miss flow
Section titled “Cache miss flow”When the source cache is cold and the dep cache is also missing:
Webhook | vTrigger Match | vCache Check (source: MISS, deps: MISS) | vBuild Job Dispatch --> Build Agent (kici:role:builder + matching kici:os:/kici:arch:) | | | |-- git clone + checkout SHA | |-- npm ci in .kici/ | |-- Pack .kici/ source (portable tar.gz, excludes node_modules) | |-- Pack .kici/node_modules (portable tar.gz) | |-- Upload source tarball to cache (source/{contentHash}.tar.gz) | |-- Upload deps tarball to cache (deps/{plat}-{arch}/{lockfileHash}.tar.gz) | |-- Upload deps companion .hash file | |-- Report success (cache.upload.complete × 2) | | v vBuild Complete <---------+ | vGet sourceTarUrl + depsUrl from cache (pre-signed S3 GETs) | vExecution Job Dispatch --> Execution Agent | | | |-- Download source tarball (sourceTarUrl) -> extract to workDir/.kici/ | |-- Download deps tarball (depsUrl) -> verify SHA-256 -> extract to .kici/node_modules/ | |-- Register @kici-dev/shared/ts-loader-hook | |-- Verify workflow contentHash against lock file (drift guard) | |-- Dynamic-import workflow .ts | |-- Execute steps | |-- Report result | | v vDone <-----------------------+The execution agent never clones the repo. The source tarball IS the workflow repo’s .kici/ directory.
Cache hit flow
Section titled “Cache hit flow”When both caches have valid entries (the common case after the first run at a commit SHA):
Webhook | vTrigger Match | vCache Check (source: HIT, deps: HIT) | vGet sourceTarUrl + depsUrl from cache (pre-signed S3 GETs) | vExecution Job Dispatch --> Execution Agent | | | |-- Download source tarball (sourceTarUrl) -> extract | |-- Download deps tarball (depsUrl) -> verify SHA-256 -> extract | |-- Register TS loader hook | |-- Dynamic-import workflow .ts | |-- Execute steps | |-- Report result | | v vDone <-----------------------+No build job is dispatched. The execution agent performs exactly two S3 GETs and extracts.
Partial cache hit
Section titled “Partial cache hit”The source cache and dep cache are independent. Four combinations are possible:
| Source | Deps | Behavior |
|---|---|---|
| HIT | HIT | Direct execution dispatch (fastest, two S3 GETs) |
| HIT | MISS | No build job; execution agent falls back to inline npm ci after restoring source |
| MISS | HIT | Build job for source only (agent still packs deps opportunistically); then execution |
| MISS | MISS | Build job for source + deps (single job packs both), then execution |
Dep cache misses alone do not trigger a build job. Deps are platform-specific (deps/{platform}-{arch}/{hash}.tar.gz) so a build job would need a builder agent matching the target platform, which may not exist (e.g., an arm64 builder when only x64 builders are available). When the source cache misses, the dispatched build job piggy-backs dep packing if deps are also missing. A single build job handles both artifacts when both miss, avoiding duplicate builds.
Cross-source / no-contentHash workflows
Section titled “Cross-source / no-contentHash workflows”- Lock files without
contentHash(schema v1) skip the source cache entirely; agents compile from source. Regenerate lock files withkici compileto enable caching. The current lock file schema version is 17. - Cross-source / global-workflow dispatch (a workflow registered against source A fired by a webhook on source B) bypasses both caches. The registration’s lock file entry still carries
contentHash, but the cross-source path always clone-and-installs — the eval temp dir doesn’t ship@kici-dev/sdk. The execution agent still verifiescontentHashagainst the cloned source for drift detection.
Build deduplication
Section titled “Build deduplication”When multiple webhooks trigger simultaneously for the same repository state, the BuildCoordinator coalesces concurrent build requests using a combined key (contentHash:lockfileHash). Only one build job runs; all waiting dispatches share the result.
Graceful degradation
Section titled “Graceful degradation”If cache storage is unavailable or a download fails:
- Source tarball download failure: Hard failure today — the agent does not fall back to
git cloneon the execution path. (The build path is where cloning happens.) In practice this is rare because the same orchestrator that issued the pre-signed URL controls the cache backend. - Dep tarball download failure: Agent falls back to running
npm ci/npm installinline. - Dep tarball hash mismatch: Agent retries the download twice (3 total attempts), then fails the job (no fallback for integrity failures).
- Source tarball drift (extracted
contentHash≠ lock file): Hard failure with “Lock file is out of date: workflow source changed without regenerating kici.lock.json” — see Lock file and drift. - Build failure: Execution is skipped entirely with a “Build failed” check status. Workflows that contain dynamic job entries (DynamicJobFn) are allowed to proceed with their dynamic eval jobs since those compile from source.
- No cache configured: Agent runs inline install for every job (pre-caching behavior).
Cache storage architecture
Section titled “Cache storage architecture”Both source and dep caches use S3CacheStorage as the sole backend. The CacheStorage interface provides a consistent API, but S3 (or any S3-compatible service: SeaweedFS, MinIO, LocalStack) is the only supported implementation.
+------------------+ | CacheStorage | | (interface) | +--------+---------+ | +---------+---------+ | S3CacheStorage | | (AWS S3, SeaweedFS,| | MinIO, LocalStack)| +--------------------+Cache key design
Section titled “Cache key design”Cache keys reflect that source tarballs and deps have different platform characteristics:
- Source:
source/{contentHash}.tar.gz— platform-agnostic. Raw TypeScript source is identical regardless of CPU architecture, so one entry is shared across all platforms.contentHashis the per-workflow hash from the lock file (SHA-256(schemaVersion + ":" + rawSource [+ "\0" + assetDigest])). - Deps:
deps/{platform}-{arch}/{lockfileHash}.tar.gz(e.g.,deps/linux-arm64/def456.tar.gz) — platform-specific. Native dependencies innode_modulesdiffer across architectures, so each platform/arch combination gets its own cache entry.
The orchestrator derives the target platform/arch for dep cache lookups by probing AgentRegistry.findAvailable() with the workflow’s first job’s runsOn labels to find a representative matching agent, then using that agent’s platform and arch. Falls back to linux/x64 if no matching agents are registered.
TTL and eviction (touch-on-read)
Section titled “TTL and eviction (touch-on-read)”Both caches refresh TTL on read via touch-on-read. An entry’s lifetime is reset every time an orchestrator issues a pre-signed GET URL for it. Default TTL is KICI_CACHE_TTL_DAYS=30; entries unused for 30 days expire at the storage level. Actively used sources and deps stay in cache indefinitely as long as they continue to be referenced by inbound webhooks or reruns. See docs/operator/dependency-caching.md for configuration.
For the full per-package bucket and prefix inventory — cache, logs, cold-store, and the observability sidecar buckets — see orchestrator storage layout.
Pre-signed URL upload flow
Section titled “Pre-signed URL upload flow”Agents upload artifacts directly to S3 using pre-signed PUT URLs. This eliminates the orchestrator as a data proxy — only coordination messages flow through WebSocket.
Agent Orchestrator S3 | | | |-- cache.upload.request ------->| | | { type: "source"|"dep", | | | key: "source/..." or | | | "deps/..." } | | | |-- getUploadUrl(key) ----->| | | (PutObject pre-sign) | |<-- cache.upload.response ------| | | { url: "https://s3.../..." } | | | | | |-- HTTP PUT (artifact body) --------------------------->| | (direct S3 upload) | | | | | |-- cache.upload.complete ------>| | | { type, key, depsHash? } |-- initMeta(key) -------->| | | (CopyObject to set | | | TTL metadata) | | |-- put(hashKey) --------->| | | (companion .hash file | | | for deps integrity) |The two-phase metadata approach (upload via PUT then initMeta via CopyObject) works around the limitation that S3 pre-signed URLs cannot include custom metadata headers. For dependency tarballs, the agent also reports the SHA-256 content hash in cache.upload.complete; the orchestrator stores it as a companion .hash file alongside the tarball. When dispatching execution jobs, the orchestrator reads this hash and includes it as depsHash in job.dispatch, enabling agent-side integrity verification on download. Source tarballs do not use a companion .hash file — the workflow contentHash carried in sourceTarHash is used to verify the extracted source against the lock file after extraction, which covers drift end-to-end.
URL delivery (downloads)
Section titled “URL delivery (downloads)”Agents receive pre-signed S3 GET URLs (15-minute expiry) directly in job.dispatch messages. Agents download artifacts from S3, bypassing the orchestrator for all data transfer.
User-facing cache flow
Section titled “User-facing cache flow”The source/dep cache above is internal: the orchestrator owns its keys and decides when to hit or build. The user-facing cache is driven by the workflow author — the declarative cache: { key, paths, restoreKeys? } on a job/step, or the imperative ctx.cache.restore() / ctx.cache.save() API (see SDK caching reference). It reuses the same object-storage backend and the same direct-to-storage presigned-URL transport, but the agent — not the orchestrator — initiates each restore and save over WebSocket.
The agent’s cache module archives paths into a gzipped tarball (computing a SHA-256 over the bytes) and streams downloads back through a checksum-verified extract pipeline. The orchestrator’s UserCache owns the cache/<orgId>/<repoId>/<scope>/<key> namespacing, the immutable first-save check, the restoreKeys prefix scan, the two-phase atomic save, and per-org quota/TTL eviction.
Restore flow
Section titled “Restore flow”Agent Orchestrator (UserCache) Object storage | | | |-- cache.user.restore.request ------->| | | { key, restoreKeys? } |-- exact key in read prefixes ->| | | (isolated: iso/<runId>/ | | | then shared/; trusted: | | | shared/ only) | | |-- restoreKeys prefix scan ---->| | | (newest match wins) | | |-- getUrl(matched) + touch ---->| |<-- cache.user.restore.response ------| | | { hit, matchedKey?, | | | downloadUrl?, tarHash? } | | | | | |-- HTTP GET (tarball body) -------------------------------------->| | (direct download; verify tarHash, extract paths) |The restore resolves the exact key across the ref’s read prefixes first, then each restoreKeys prefix in order (newest matching entry wins). A trusted ref reads only shared/; an untrusted/fork ref reads its own iso/<runId>/ scope and then falls back to shared/. On a hit the response carries a presigned GET URL plus the tarball’s tarHash, which the agent verifies before extracting.
Save flow (two-phase atomic)
Section titled “Save flow (two-phase atomic)”Agent Orchestrator (UserCache) Object storage | | | |-- cache.user.save.request --------->| | | { key } |-- has(final key)? ------------>| | | (immutable: skip if exists) | | |-- getUploadUrl(.tmp-<uuid>) -->| |<-- cache.user.save.response ---------| | | { uploadUrl?, skip } | | | | | |-- HTTP PUT (tarball body) ----------------------------------->| | (direct upload to temp object) | | | | |-- cache.user.save.complete -------->| | | { key, tarHash, sizeBytes } |-- copy(temp -> final) -------->| | |-- delete(temp) --------------->| | |-- initMeta(final) ------------>| | |-- put(.hash) + put(.size) ---->| | |-- enforce per-org quota ------>|The save is immutable and atomic. The orchestrator declines (skip: true) up front if the exact key already exists. Otherwise the agent uploads to a .tmp-<uuid> object via a presigned PUT, then cache.user.save.complete triggers a server-side copy temp→final, a delete of the temp, an initMeta to stamp TTL metadata, and .hash / .size companion writes. Because the final key only appears after the copy, a crashed upload never leaves a corrupt committed entry. The committing save then enforces the per-org byte quota, evicting oldest entries until the org is back under KICI_USER_CACHE_QUOTA_BYTES.
Trust → scope mapping
Section titled “Trust → scope mapping”The orchestrator threads a cacheRefScope onto each job.dispatch. A trusted ref (the repo’s own branches, default branch) maps to the shared write scope; any other ref (a fork PR) maps to isolated, writing to a per-run iso/<runId>/ scope. This is the cache-isolation model: a fork can restore from the trusted shared/ cache but can never write into it, so it cannot poison the entries a trusted branch later restores. The org segment of the key namespace (cache/<orgId>/) is the per-tenant boundary — no tenant can read another tenant’s cache. See orchestrator storage layout for the full prefix map and quota/TTL knobs.
Internal event routing flow
Section titled “Internal event routing flow”Internal events (custom events from ctx.emit() and system events from workflow/job completion) flow through the event router for fan-out delivery to matching workflows.
Step ctx.emit('event-name', payload) | vAgent IPC (fork channel or stdout JSON-lines) | vAgent -> event.emit WS message -> Orchestrator | vEventRouter.emit() |-- CircuitBreaker check (chain depth, rate limit) — fail-fast, in-memory |-- BEGIN TRANSACTION | |-- EventStore.writeWith(tx) -> INSERT into kici_events table | |-- pg_notify('kici_event_channel', eventId) (queued; fires on commit) |-- COMMIT (rollback discards both insert and notify atomically) | vAll Orchestrators LISTEN on 'kici_event_channel' channel | vEventRouter.onNotification(eventId) [private] |-- EventStore.tryLeaseForProcessing(eventId, nodeId, leaseDurationMs) | (atomic UPDATE: claim only if processed=false AND dlq_at IS NULL | AND (claimed_at IS NULL OR claimed_at < NOW() - leaseDurationMs); | increments attempts and records claimed_at/claimed_by atomically) |-- If lease acquired: | |-- processSubscriptions(event): | | |-- If RegistrationIndex available: | | | Look up registrations by trigger type | | | TrustStore.isTrusted() (for cross-repo events) | | | matchAllWorkflows() against registered workflows | | |-- Else (no RegistrationIndex): | | | TrustStore.isTrusted() (for cross-routing-key events) | | | matchAllWorkflows() against in-memory lock file subscriptions | | |-- For each match: onEventMatched(event, lockFile, matchedWorkflows) | | | |-- On success: markProcessed (commits processed=true, clears lease) | |-- On failure (any onEventMatched throws): | |-- If attempts >= maxDispatchAttempts: markDlq('exhausted_retries') | |-- Else: recordDispatchFailure (sets next_retry_at via exponential | backoff with full jitter; clears lease) | vJob dispatch to agents (standard pipeline)At-least-once delivery + DLQ
Section titled “At-least-once delivery + DLQ”Two invariants keep events from being silently lost:
- Cron-fire atomicity:
tryClaimFire(advancescron_last_fired) and the event-row INSERT +pg_notifyexecute inside the same database transaction. If the leader process is killed between the two writes, the transaction rolls back and nolast_fired_atadvance leaks. The next tick re-evaluates and fires cleanly. - Dispatch retries: the lease pattern (
tryLeaseForProcessing) marks an event as in-flight without committing it as processed. When a handler throws, the lease wrapper records the failure, schedules a retry, and on the leader’s retry-scanner tick the event is re-published viapg_notify. AftermaxDispatchAttempts(default 5) the event lands in the DLQ (dlq_atset,dlq_reason='exhausted_retries') and is surfaced via Prometheus (kici_orch_event_dlq_*), Grafana (event-deliverydashboard), and the kici-admin CLI (kici-admin event-dlq {list,count,retry,discard}). - Crash detection: when a node crashes mid-dispatch, its lease ages out
after
leaseDurationMs(default 60 s). The leader’sEventRetryScannerreleases the expired lease and re-publishespg_notifyso a healthy node picks the event up. Each release incrementskici_orch_event_lease_expirations_total— a steady > 0 rate is the visible signal that an orchestrator instance is dying mid-dispatch.
System events
Section titled “System events”The orchestrator auto-emits system events after execution completes:
workflow_complete— emitted when all jobs in a workflow finish (carries workflow name, status, duration)job_complete— emitted when a single job finishes (carries workflow name, job name, status, duration)
These events are stored in the same kici_events table and matched against workflowComplete() and jobComplete() triggers in the lock file.
Event.emit WS protocol
Section titled “Event.emit WS protocol”Agent Orchestrator | | |-- event.emit ----------------->| | { jobId, requestId, | | eventName, payload, | | target? } | | |-- store event | |-- NOTIFY |<-- event.emit.response --------| | { requestId, deliveryId? } | | |Registration extraction flow
Section titled “Registration extraction flow”When code is pushed to the default branch, the orchestrator extracts event-triggered workflows from the lock file and stores them as registrations for cluster-wide event matching.
Git Push to Default Branch==========================
GitHub Webhook -> Platform Relay -> Orchestrator Processor
Processor (on default-branch push): |-- lockFileCache.get() (fetch/cache lock file by blob SHA) |-- extractRegisterableWorkflows(fullLockFile) | |-- For each workflow entry in lock file: | | Check if any trigger type is registerable | | (kici_event, workflow_complete, job_complete, | | generic_webhook, schedule, lifecycle) | |-- Return array of registerable workflows | |-- globalWorkflowPolicy.isWorkflowRepoAllowed() (if policy configured) | |-- Filter out global workflows from repos not on the allow-list | |-- registrationStore.replaceAll(repoIdentifier, workflows, routingKey, credentials, { commitSha }) | |-- BEGIN TRANSACTION | |-- DELETE FROM workflow_registrations WHERE routing_key AND repo_identifier | |-- INSERT new registrations (with commit SHA for lock file pinning) | |-- COMMIT | |-- registrationStore.bumpVersion() | |-- UPDATE registry_versions SET version = version + 1 | |-- registrationIndex.refreshIfNeeded(newVersion) | |-- If local version != remote version: | | Load all registrations from DB | | Rebuild primary index (by customer:repo) | | Rebuild secondary index (by trigger type) | | Update local version | |-- cronScheduler.refreshCache() (defense-in-depth) | |-- eventRouter.emit('registration.updated', { repo, workflows })Cron schedule evaluation flow
Section titled “Cron schedule evaluation flow”Cron schedules are evaluated periodically by the Raft leader only.
Cron Schedule Evaluation========================
CronScheduler (runs every 30 seconds, Raft leader only): |-- registrationIndex.getCronSchedules() |-- For each schedule: | |-- new Cron(cronExpression, { timezone }) | |-- cron.previousRuns(1) -> most recent past scheduled time | |-- Check last-fired cache (prevent double-fire) | |-- If due and not recently fired: | |-- BEGIN TRANSACTION | | |-- cronStore.tryClaimFire(registrationId, previousRun, tx) | | | (atomic DB claim — prevents duplicate fires in | | | multi-orchestrator clusters via WHERE last_fired_at < | | | firedAt guard) | | |-- If claim successful: | | |-- eventRouter.emitInTx(__schedule_fire, tx) | | |-- EventStore.writeWith(tx) -> INSERT kici_events | | |-- pg_notify('kici_event_channel', id) on tx | |-- COMMIT (rollback discards both writes; pg_notify fires on commit) | |-- On commit: | |-- Update local last-fired cache | |-- EventRouter matches against registered workflows | |-- Matched workflows dispatched via standard pipelineRecovery on leader election loads the cron_last_fired table into the
last-fired cache and fires once per missed schedule. Because the claim and
the event-row insert now share a transaction, a crash between the two no
longer leaves last_fired_at advanced with no event row — the rollback
discards both writes and the next tick fires cleanly.
Timing characteristics
Section titled “Timing characteristics”- Tick interval: Hardcoded at 30 s (
evaluationIntervalMsdefaults to30_000inCronSchedulerand is not exposed via orchestrator config or env vars). Changing it requires a code change. - Fire jitter: A schedule due at time
Tfires at the first tick>= T, i.e.0–30 safter the scheduled moment, never before. The event payload’sscheduledAtcarries the cron-computed time (not the dispatch time), so downstream consumers see the intended schedule. - Per-tick concurrency: All schedules are processed serially in a single
forloop on the leader (packages/orchestrator/src/cron/cron-scheduler.ts,evaluate()). Each registration costs one in-memory cron computation plus two DB writes (tryClaimFireupsert +eventStore.write+pg_notify). Throughput is therefore bounded by sequential DB write latency: at ~5–15 ms per registration, 50 schedules firing in the same tick complete in well under a second between the first and last fire. - Recovery semantics: On leader election,
recoverMissedSchedules()callscron.previousRuns(1)per schedule — it fires at most one event per schedule regardless of how long the cluster was leaderless. There is no backfill for multiple missed scheduled instants. - Multi-node deduplication: During cluster startup multiple nodes may transiently self-elect (dormant mode). The atomic
tryClaimFireupsert with alast_fired_at < firedAtWHEREguard ensures only one node’s emit succeeds; losing nodes update their local cache and skip emit. - Sub-minute crons: Supported but bounded by the 30 s tick.
* * * * *fires roughly once per minute with up to 30 s of drift; sub-30-second cadences are not achievable without lowering the interval in code.
Generic webhook flow
Section titled “Generic webhook flow”Generic webhooks from non-GitHub sources follow a parallel ingestion path. The webhook can arrive directly at the orchestrator or be relayed through the Platform.
External Service (ArgoCD, Jenkins, Grafana, etc.) | vPOST /webhook/:orgId/generic/:sourceId | +--> Platform path: | |-- Resolve source by routing key generic:<orgId>:<sourceId> | |-- Relay via WebSocket to orchestrator (see internal/platform/data-flows.md) | v +--> Orchestrator path (direct or via Platform relay): |-- GenericSourceManager.getByOrgAndName(orgId, sourceId) |-- Payload size check (per-source maxPayloadBytes) |-- Rate limit check (per-source rateLimitRpm) |-- Verify signature (HMAC-SHA256, bearer token, IP allowlist, or none) |-- Deduplication check (idempotency key within dedup window) |-- GenericWebhookNormalizer.normalizeEvent() -> SimulatedEvent |-- Match against lock file triggers (genericWebhook type) |-- Dispatch matched jobs to agentsGeneric vs GitHub webhook differences
Section titled “Generic vs GitHub webhook differences”| Aspect | GitHub Webhooks | Generic Webhooks |
|---|---|---|
| Signature | HMAC-SHA256 (always) | Configurable (HMAC, bearer, IP, none) |
| Event type | X-GitHub-Event header | Configurable header or payload field |
| Delivery ID | X-GitHub-Delivery header | Configurable header or auto-generated UUID |
| Lock file | Fetched from repo | Cached from lock file subscription |
| Git operations | Clone, fetch, changed files | None (optional — non-repo workflows supported) |
Database topology
Section titled “Database topology”The orchestrator owns its own PostgreSQL database, with the authoritative execution_runs, execution_jobs, execution_steps, dispatch_queue, dedup_cache, workflow_registrations, environments / scoped_secrets / environment_bindings, agent_tokens, cluster_meta, and related tables. Each orchestrator deployment uses its own KICI_DATABASE_URL; database users are scoped per service.
Execution reporting flow
Section titled “Execution reporting flow”After job execution, results flow back through the tiers:
Agent Orchestrator Platform GitHub | | | | |-- job.status ----------->| | | | (completed/failed) | | | | |-- execution.status ->| | | | (run metadata) |-- upsert | | | | execution_runs | | |-- job.status.forward>| | | | (job metadata) |-- upsert | | | | execution_jobs | | |-- GitHub Checks API ---------------------->| | | (check run update) | | | | | |The orchestrator updates:
- GitHub Check Runs via the Checks API (conclusion, summary, duration)
- Execution runs in the orchestrator’s own database (authoritative source)
- Platform execution status via WebSocket (
execution.statusandjob.status.forwardmessages, which the Platform upserts into its own projection tables)
Re-run and cancel flows
Section titled “Re-run and cancel flows”The dashboard enables users to re-run completed workflows and cancel running workflows. Both flows use a REST-over-WS proxy pattern: the Platform receives a REST request from the dashboard, forwards it to the orchestrator via WebSocket, and returns the orchestrator’s response.
Re-run flow
Section titled “Re-run flow”Dashboard Platform Orchestrator | | | |-- POST /orgs/:id/runs/ ->| | | :runId/rerun (auth) |-- Cooldown check | | | (last_rerun_at < 5s ago?) | | | | | |-- run.rerun.request (WS) --->| | | { runId, triggeredBy } | | | |-- Load original run from DB | | |-- Read webhook payload from storage | | |-- Re-fetch lock file at original SHA | | |-- Dispatch new jobs via Dispatcher | | |-- Record execution with parent_run_id + original_run_id | | |-- execution.status (WS, via callback) | | | { parentRunId, originalRunId, triggeredBy } | |<- run.rerun.response (WS) ---| | | { newRunId } | | | | |<- 200 { newRunId } ------| | | |-- UPDATE last_rerun_at | |-- Navigate to new run | | | | |Key design points:
- Cooldown enforcement: The Platform enforces a 5-second cooldown per original run via the
last_rerun_atcolumn. Rapid re-run attempts receive 429 Too Many Requests. - Payload reuse: The orchestrator reads the original webhook payload from filesystem/object storage and stores a copy for the new run (enabling re-run of re-runs).
- Lock file at original SHA: The lock file is re-fetched at the original commit SHA, ensuring the re-run uses the same workflow definition.
- Lineage tracking: The new run has
parent_run_idpointing to the immediate parent run,original_run_idpointing to the root ancestor run (for chain traversal), andtriggered_byrecording the user identity. - No trigger matching: Re-runs skip deduplication, normalization, and trigger matching. They go directly from lock file parse to job dispatch.
Cancel flow
Section titled “Cancel flow”Dashboard Platform Orchestrator Agent(s) | | | | |-- POST /orgs/:id/runs/ ->| | | | :runId/cancel (auth) | | | | |-- run.cancel.request (WS) -->| | | | { runId, cancelledBy } | | | | |-- Find active jobs | | | | from dispatch queue | | |-- job.cancel (WS) ->| | | | (for each agent) |-- Abort step | | | |-- Cleanup | |<- run.cancel.response (WS) --| | | | { cancelledJobs: N } | | | | |<- job.status -------| |<- 200 { cancelledJobs } -| | (cancelled) | | |-- UPDATE cancelled_by | | | | | |The cancel flow is asynchronous: the orchestrator sends job.cancel to agents and immediately responds with the count. Agents asynchronously abort their current step, clean up, and report job.status: cancelled back to the orchestrator.
Payload storage flow
Section titled “Payload storage flow”Webhook payloads are stored during initial processing and retrieved later for re-runs and the payload viewer.
Webhook arrives Payload retrieved | | v vprocessWebhook() GET /orgs/:id/runs/:runId/payload | | v vlogStorage.append( Platform -> dashboard.payload (WS) executions/{runId}/ | webhook-payload.json, v JSON.stringify(payload) Orchestrator -> logStorage.read() executions/{runId}/ | webhook-payload.json v )Filesystem or object storage | v dashboard.payload.response (WS) { payload: {...} }Event-log payload streaming flow
Section titled “Event-log payload streaming flow”The dashboard’s event-log detail panel reads webhook bodies through a chunked transport so the dashboard can render progress as bytes arrive. The orchestrator slices the payload into 64 KiB chunks and streams them up to the browser.
Lineage query
Section titled “Lineage query”The lineage endpoint (GET /orgs/:customerId/runs/:runId/reruns) returns all runs with parent_run_id matching the given run ID.
Trace ID propagation
Section titled “Trace ID propagation”Every webhook event is assigned a trace ID (requestId) at ingestion. A second ID (runId) is added at dispatch time. Both propagate through the three tiers via WebSocket protocol messages and are automatically injected into every log line using AsyncLocalStorage.
GitHub Platform Orchestrator Agent | | | | |-- webhook -->| | | | |-- generate requestId | | | |-- requestContext.run() | | | | (requestId) | | | | | | | |-- webhook.relay (WS) ->| | | | { ..., requestId } | | | | |-- requestContext.run() | | | | (requestId) | | | | | | | |-- generate runId | | | |-- enrichRequestContext | | | | ({ runId }) | | | | | | | |-- job.dispatch (WS) ->| | | | { ..., requestId } | | | | |-- requestContext.run() | | | | (requestId, runId, | | | | jobId) | | | | | | | |-- log: "Run: X | Trace: Y" | | | |-- execute steps | | | |How it works
Section titled “How it works”-
Platform ingestion: The webhook handler generates a
requestId(UUID) and wraps the entire request inrequestContext.run(). All log lines within this async scope automatically includerequestId. -
WebSocket relay: The
requestIdis included in thewebhook.relaymessage sent to the orchestrator. For cross-instance relay (via Valkey pub/sub), therequestIdis serialized in the notification payload. -
Orchestrator processing: The orchestrator wraps webhook processing in
requestContext.run()using therequestIdfrom the relay message (falling back to a new UUID for backward compatibility). When arunIdis generated for job dispatch, it is enriched into the existing context viaenrichRequestContext(). -
Job dispatch: Both
requestIdandrunIdare included in thejob.dispatchWebSocket message to the agent. -
Agent execution: The agent wraps each
onJobDispatchcallback inrequestContext.run()withrequestId,runId, andjobId. A trace header is printed once at job start. All subsequent log lines carry all trace fields automatically. -
Check run summaries: GitHub Check Run updates include
Trace: <requestId> | Run: <runId>in the summary text, giving operators a direct link from GitHub UI to Loki queries.
Implementation
Section titled “Implementation”Trace propagation uses Node.js AsyncLocalStorage from @kici-dev/shared. A logger format reads the current context and injects fields into every JSON log line — no changes needed at individual call sites.
Tier identification is handled at the infrastructure level: the service Loki label (set by Grafana Alloy from the systemd unit / log source) identifies which service produced the log (platform, orchestrator, agent, etc.). For agent logs forwarded through the orchestrator’s stdout, the parsed JSON also carries an inner service: 'agent' field — query both with {service="orchestrator"} | json | service="agent" to disambiguate.
Output chaining data flow
Section titled “Output chaining data flow”Output chaining allows steps to consume outputs from preceding steps (within a job) and jobs to consume outputs from preceding jobs (across jobs). The data flows through several phases.
Definition time
Section titled “Definition time”When workflow code runs at definition time (step(), job() calls):
step()creates anOutputProxy<T>viacreateStepOutputProxy(stepName)and attaches it as.resultjob()creates anOutputProxy<any>viacreateJobOutputProxy(jobName)and attaches it as.result- The proxy is an ES6
Proxyobject that defers all property access to a module-globalOutputsMap - No outputs exist yet — accessing
.result.fieldbefore execution throws “has not produced outputs yet”
Compile time
Section titled “Compile time”The compiler processes the workflow definition:
- Unnamed steps (bare functions and id-less
step()calls) receive counter IDs:step-1,step-2, etc. - Unnamed jobs (id-less
job()calls with UUID names) receive counter IDs:job-1,job-2, etc. - The lock file records
hasOutputs: truefor steps with Zod output schemas - Step counters are scoped per job; job counters are scoped per workflow
Execution time (local test runner)
Section titled “Execution time (local test runner)”When kici test runs a workflow:
- SDK module resolution: The runner resolves
setStepOutputsMap/setJobOutputsMapfrom the same@kici-dev/sdkmodule instance that the workflow uses (ensures the proxy reads from the same map) - Map injection: Fresh
OutputsMapandStepRefMapare created and injected viasetStepOutputsMap()/setStepRefMap()before each job - Step execution: Each step runs sequentially. If the step returns a value, it is stored in the
OutputsMapkeyed by step name - Bare function normalization: Bare functions in the steps array are assigned counter names and registered in the
StepRefMap(maps function reference to step name) - Proxy resolution: When a subsequent step accesses
stepRef.result.field, the proxy reads from theOutputsMap - ctx.outputsOf(): Resolves step outputs by reference (Step object or bare function). For bare functions, looks up the step name in the
StepRefMap
Cross-job output aggregation
Section titled “Cross-job output aggregation”After each job completes in the local test runner:
- Step outputs from the completed job are aggregated into the
jobOutputsMap - Multi-step jobs: Outputs are nested under step names:
{ stepName: { field: value }, ... } - Single-step jobs (run shorthand): Outputs are flattened directly:
{ field: value }(no step-name nesting) - The
jobOutputsMapis injected viasetJobOutputsMap(), enablingjobRef.result.stepName.fieldorjobRef.result.fieldaccess
IPC transport (agent sandbox)
Section titled “IPC transport (agent sandbox)”In the agent sandbox (remote pipeline execution):
- Step return values are captured and included in
step.completeIPC messages (optionaloutputsfield) - The agent aggregates step outputs and includes them in the
job.completeIPC message - Within-job chaining: The sandbox populates the
OutputsMapas steps complete, so.resultandctx.outputsOf()resolve correctly within a single job - Cross-job chaining: The orchestrator collects plain outputs from completed upstream jobs at dispatch time (querying the DB for jobs listed in
needs), then passes them asupstreamJobOutputsin thejob.dispatchmessage. The sandbox receives this map and populates thejobOutputsMapviasetJobOutputsMap(), enablingctx.jobOutputs()andjobRef.resultaccess across job boundaries. Secret outputs follow a separate encrypted path viaSecretOutputStore.
Within-job output flow
Section titled “Within-job output flow”Step A completes Step B accesses A.result | | v vReturn value Proxy.get('field') | | v vOutputsMap.set('A', val) OutputsMap.get('A') | | v vStored in shared map Returns val.fieldCross-job output flow
Section titled “Cross-job output flow”Job A completes Orchestrator dispatches Job B | | v vOutputs stored in DB Query upstream job outputs (needs) | v job.dispatch includes upstreamJobOutputs | v Sandbox populates jobOutputsMap | v ctx.jobOutputs('A') resolvesBrowser protocol (Platform to dashboard)
Section titled “Browser protocol (Platform to dashboard)”The Platform tier exposes a /ws/browser WebSocket endpoint for dashboard clients (auth, log subscription / streaming / gaps, run / job / step status updates, run.event / job.context for the Summary tab).
See also
Section titled “See also”- Architecture overview — three-tier model and component responsibilities
- Protocol messages — WebSocket message schemas
- Event system internals — event router, registration model, cron scheduler
- State machine — job execution state transitions
- Webhook delivery — detailed webhook processing pipeline
- Operator: dependency caching — configuration guide
- Operator: monitoring & tracing — trace fields and Loki queries
- Operator: event routing & generic webhooks — generic source setup and trust management
- SDK reference: output chaining — user-facing output chaining API