Architecture Pipeline Software ~20 min read

Functions That Make Up the UNS Pipeline

A complete walkthrough of the 12 functions that form the fn-uns data pipeline — from machine data capture through processing, reporting, and AI-powered analysis. Written for developers, IT engineers, and manufacturing engineers.

The Big Picture

The fn-uns pipeline is made up of 12 independent functions organised into 4 layers. Each function runs in its own Docker container, does one job, and communicates through shared infrastructure — never through direct calls to other functions.

THE UNS PIPELINE — 12 FUNCTIONS, 4 LAYERS LAYER 1 — CAPTURE uns-sim Node.js · MQTT pub MQTT Broker Mosquitto / EMQX uns-framework Go · MQTT sub Valkey Cache uns:data:* · uns:prev:* LAYER 2 — PROCESS uns-cache Node.js · HTTP reads Valkey uns-log Go · HTTP uns-historian Go · MQTT MQTT sub uns-state Go · HTTP uns-stoppage Node.js · HTTP uns-productivity Go · HTTP uns-input Node.js · HTTP PostgreSQL 6 tables writes → ← operator HTTP POST LAYER 3 — REPORT uns-kpi uns-dashboard reads all tables → JSON KPI response → 5 Grafana dashboards LAYER 4 — AI uns-ai — reads Valkey + PostgreSQL → AI provider → publishes to MQTT + stores audit trail
Key insight: No function calls another function directly. They all read from and write to shared infrastructure — MQTT, Valkey, and PostgreSQL. This means you can deploy, update, or replace any function independently.

How It Works

The pipeline follows a simple principle: data flows through infrastructure, not through function calls. Machines publish to MQTT. Functions read from MQTT or Valkey, process data, and write to PostgreSQL. Reporting functions read from PostgreSQL. AI reads from everything.

This decoupled architecture means:

What This Means for You

🔧

Manufacturing Engineer

Each function maps to a real manufacturing concept you already know — machine status, production runs, stoppages, KPIs. The pipeline turns raw machine signals into the metrics you use daily: utilisation, availability, MTBF, throughput. No black box — every calculation is visible and auditable.

💻

Developer

12 microservices, each scaffolded with fnkit, each with its own Dockerfile and docker-compose.yml. Go for performance-critical paths (state tracking, historian), Node.js for flexibility (cache reader, stoppage classification, AI). All functions follow the same patterns: Valkey config, auto-create tables, JSON responses.

🖥️

IT Engineer

3 pieces of shared infrastructure: MQTT broker, Valkey cache, PostgreSQL database. 12 stateless containers on a single Docker network. No service mesh, no API gateway, no orchestrator. Each container is independently deployable and restartable. Monitoring is straightforward — check container health and database connectivity.

Layer 1: Capture

CAPTURE — Get machine data into the system

The capture layer has one job: get machine data off the factory floor and into a fast, queryable cache. Two functions handle this — a simulator (or your real machines) and a topic monitor.

uns-sim 4 CNC machines cnc-01 cnc-02 cnc-03 cnc-04 MQTT pub MQTT Broker 12 topics v1.0/enterprise/site1/# sub v1.0/# uns-framework Go · MQTT subscriber current + previous change tracking Valkey uns:topics SET uns:data:* STRING uns:prev:* STRING uns:meta:* STRING
uns-sim
Node.js

Machine Simulator. Publishes realistic CNC machine data to MQTT every 3 seconds. Simulates 4 machines (Haas VF-2, DMG Mori NLX 2500, Mazak Integrex i-200, Okuma LB3000 EX II) across 2 production areas. Each machine publishes 3 topics: /status, /program, /tool.

In production, replace with OPC UA gateway, Sparkplug B, or a custom edge agent. The rest of the pipeline works identically.

Machine → MQTT Broker (12 topics, 3s interval)
uns-framework
Go

Topic Monitor. Subscribes to v1.0/# — the entire UNS namespace. Every message is cached in Valkey with current/previous value tracking. This is the foundation — all other functions read from this cache.

Maintains a SET of all discovered topics, the latest payload, the previous payload, and metadata (last_updated, message count, first_seen).

MQTT Broker → Valkey (uns:data:*, uns:prev:*, uns:meta:*)
🔧

Manufacturing Engineer

This is where your machine signals enter the digital world. Every status change, program update, and tool measurement is captured. The simulator mimics real CNC behaviour — ACTIVE, IDLE, SETUP, ALARM, WAITING, OFFLINE — so you can validate the entire pipeline before connecting real equipment.

💻

Developer

uns-framework is written in Go for performance — it handles every MQTT message with minimal latency. The current/previous pattern in Valkey enables change detection downstream without any function needing to maintain its own state. Check uns:meta:* for message counts and timing.

🖥️

IT Engineer

Two containers plus the MQTT broker and Valkey. uns-sim is optional in production (replaced by real PLCs). uns-framework is the only MQTT subscriber that writes to Valkey — single point of ingestion. Monitor Valkey memory usage and MQTT connection status.

Topics Published by uns-sim

v1.0/enterprise/site1/area1/cnc-01/status   → { "state": "ACTIVE", "spindle_speed": 8500, ... }
v1.0/enterprise/site1/area1/cnc-01/program   → { "name": "PART-A-001", "parts_made": 42, ... }
v1.0/enterprise/site1/area1/cnc-01/tool      → { "id": "T01", "life_remaining": 78, ... }
v1.0/enterprise/site1/area1/cnc-02/status    ...
v1.0/enterprise/site1/area1/cnc-02/program   ...
v1.0/enterprise/site1/area1/cnc-02/tool      ...
v1.0/enterprise/site1/area2/cnc-03/status    ...
v1.0/enterprise/site1/area2/cnc-03/program   ...
v1.0/enterprise/site1/area2/cnc-03/tool      ...
v1.0/enterprise/site1/area2/cnc-04/status    ...
v1.0/enterprise/site1/area2/cnc-04/program   ...
v1.0/enterprise/site1/area2/cnc-04/tool      ...

12 topics total — 4 machines × 3 data tags — published every 3 seconds

Layer 2: Process

PROCESS — Transform raw data into structured records

Seven functions read from the capture layer and write structured records to PostgreSQL. They run in parallel — each watching for different things in the same data.

Valkey MQTT uns-cache Change detection Node.js · HTTP uns-log Snapshot logger Go · HTTP uns-historian Real-time logger Go · MQTT MQTT uns-state Duration tracker Go · HTTP uns-stoppage Classify reasons Node.js · HTTP reads uns_state table uns-productivity Run tracking Go · HTTP uns-input Operator entry Node.js · HTTP ↑ operator POST PostgreSQL uns_log uns_historian uns_state uns_stoppage uns_productivity uns_input TRIGGER TYPES: HTTP (polled on timer): uns-cache, uns-log, uns-state, uns-stoppage, uns-productivity, uns-input MQTT (real-time): uns-historian

The 7 Processing Functions

uns-cache
Node.js

Cache Reader API. Reads all cached UNS topics from Valkey and returns JSON. Compares current vs previous values to detect which topics have changed since the last read. Returns both the full topic list and a filtered changes-only list.

Valkey → JSON response { topic_count, changed_count, topics[], changes[] }
uns-log
Go

Change Logger. Compares current vs previous values. When any value changes, inserts a complete snapshot row into PostgreSQL — all values, not just the changed ones. Unchanged values are copied forward. This gives you a full picture at any point in time.

HTTP-triggered, polled on a timer. May miss rapid changes between polls.

Valkey → PostgreSQL (uns_log table) — snapshot on change
uns-historian
Go

MQTT Historian. Subscribes directly to MQTT topics and logs every single value change to PostgreSQL in real-time. In-memory change detection skips duplicate values. Unlike uns-log, no values are ever missed between polls — this captures the complete history.

Supports MQTT wildcards (+ single level, # multi-level) for flexible topic selection.

MQTT Broker → PostgreSQL (uns_historian table) — every change, real-time
uns-state
Go

State Duration Tracker. Reads /status topics, maintains an in-memory state tracker per machine. When a machine transitions from one state to another, logs the completed state with its exact duration. This is the foundation for all time-based KPIs — utilisation, availability, MTBF, MTTR.

Valkey /status → PostgreSQL (uns_state table) — state + duration_s + next_state
uns-stoppage
Node.js

Stoppage Classifier. Reads non-ACTIVE states from the uns_state table and auto-classifies each stoppage (IDLE→NO_WORK, ALARM→FAULT, SETUP→SETUP, etc.). Provides a POST endpoint for operators to override the auto-classification with the real reason — e.g. changing FAULT to TOOL_BREAK.

PostgreSQL (uns_state) → PostgreSQL (uns_stoppage) + operator overrides
uns-productivity
Go

Production Run Logger. Reads /program topics from Valkey, tracks active production runs per machine. When a run completes (parts reached target), the program changes, or the machine stops, logs the run with throughput metrics — parts made, target, cycle time, duration.

Valkey /program → PostgreSQL (uns_productivity) — run completion events
uns-input
Node.js

Manual Data Entry. HTTP API for operators to submit data that machines can't provide automatically — scrap counts with reasons, quality measurements, free-text notes, value corrections, and manual downtime reasons. The human layer of the pipeline.

Operator HTTP POST → PostgreSQL (uns_input) — scrap, notes, quality, adjustments

uns-historian vs uns-log

These two functions both log data to PostgreSQL, but they serve different purposes:

uns-log — Snapshot Logger

HTTP-triggered, polled on a timer. Captures a complete snapshot of all values when any value changes. Good for "what did everything look like at this moment?" queries. May miss rapid changes between polls. Each row contains all tags for a machine.

uns-historian — Real-Time Logger

MQTT-triggered, captures every individual change as it happens. No values missed. Good for "show me every state transition" queries. Each row contains a single value for a single topic. Powers the Grafana dashboards.

State Machine Animation

uns-state tracks transitions like this — each completed state is logged with its exact duration:

ACTIVE 120 seconds IDLE 45 seconds SETUP 90 seconds ACTIVE running... ← logged to uns_state → ← logged → ← logged → ← in progress
i
Why duration matters. Manufacturing KPIs are time-based. You don't count how many times a machine was ACTIVE — you measure how long it was ACTIVE. uns-state gives you exact durations in seconds, which feed directly into utilisation %, availability %, MTBF, and MTTR calculations.

Layer 3: Report

REPORT — Turn structured data into actionable metrics

Two functions consume the PostgreSQL tables and present manufacturing metrics — one as a JSON API, one as visual dashboards.

PostgreSQL uns_state uns_stoppage uns_productivity uns_input uns_historian uns_log SQL queries uns-kpi Go · HTTP · read-only uns-dashboard Grafana · 5 dashboards JSON KPI Response Utilisation 87.2% Availability 94.1% MTBF 142 min MTTR 8.3 min Grafana Dashboards Machine · Production · Tooling · Status Log · KPI
uns-kpi
Go

KPI Reporter. Queries all PostgreSQL tables and computes manufacturing KPIs on demand. Pure read, no config needed — just SQL and JSON. Supports filtering by machine, area, and time range.

Computes: Utilisation %, Availability %, Throughput, Stoppage Pareto, MTBF, MTTR, Scrap totals.

PostgreSQL (all tables) → JSON { utilisation, availability, mtbf, mttr, throughput, stoppages }
uns-dashboard
Grafana

5 Grafana Dashboards. Machine Overview, Production, Tooling, Status Log, and KPI Overview. Queries PostgreSQL directly using SQL with LEAD() and LAG() window functions for accurate duration-based metrics and deduplication.

Consistent colour mapping: ACTIVE=green, IDLE=yellow, ALARM=red, OFFLINE=gray, SETUP=blue, WAITING=orange.

PostgreSQL (uns_historian, uns_log) → 5 dashboards, 51 panels total

KPI Calculations

KPISource TableCalculationWho Uses It
Utilisation %uns_stateACTIVE time / total tracked timeProduction managers, shift leads
Availability %uns_state + uns_stoppage(total - unplanned stops) / totalMaintenance, operations
Throughputuns_productivityParts/hour, target attainment %Production planning
Stoppage Paretouns_stoppageTop reasons ranked by total durationContinuous improvement
MTBFuns_stateAverage ACTIVE run before ALARMMaintenance planning
MTTRuns_stateAverage ALARM durationMaintenance response
Scrapuns_inputTotal rejected parts per machineQuality, production
🔧

Manufacturing Engineer

These are the metrics you already track — but now they're computed automatically from real machine data, not from manual spreadsheets. The stoppage pareto tells you exactly where to focus improvement efforts. MTBF and MTTR give you maintenance scheduling data.

💻

Developer

uns-kpi is a pure read function — no state, no config, no side effects. It runs SQL queries against PostgreSQL and returns JSON. The Grafana dashboards use window functions (LEAD, LAG) for accurate time-based calculations. All dashboard JSON is version-controlled.

🖥️

IT Engineer

Both reporting functions are read-only — they never write to the database. Grafana connects directly to PostgreSQL with a read-only user. uns-kpi is stateless and can be scaled horizontally. Dashboard provisioning is automated via Docker Compose.

Layer 4: AI

AI — Intelligent analysis across the entire pipeline

The AI layer reads from everything — live data from Valkey, historical data from PostgreSQL — assembles a rich context, and sends it to an AI provider for analysis.

UNS-AI — CONTEXT ASSEMBLY PIPELINE Valkey Live data PostgreSQL History uns-ai Context assembly + prompt API call AI Provider OpenAI / Anthropic MQTT Publish v1.0/.../ai/anomaly PostgreSQL Audit uns_ai table · full trail
uns-ai
Node.js

AI-Powered Analysis. Reads live machine data from Valkey and historical data from PostgreSQL, assembles a rich context for the chosen analysis type, sends it to an AI provider (OpenAI or Anthropic), publishes structured results back to MQTT, and stores a full audit trail in PostgreSQL.

7 built-in analysis types: anomaly detection, shift summaries, smart alerting, predictive maintenance, production optimisation, root cause analysis, and custom prompts.

Valkey + PostgreSQL → AI Provider → MQTT + PostgreSQL (uns_ai audit trail)

Analysis Types

TypePurposeData Sources
anomalyDetect abnormal machine behaviourLive status, state history, baseline stats, alarms, tool data
shift-summaryGenerate shift handover reportLive status, states, stoppages, production, scrap, operator notes
smart-alertContextual alert triageLive status, recent alarms, stoppages, tool condition, production
predictiveTool wear / maintenance predictionTool data, tool history, alarms, tool-related stoppages, baseline
optimisationMachine-program assignmentLive status, production aggregates, scrap rates, reliability
root-causeExplain why an alarm happenedState timeline, stoppages, tool condition, sensor history, notes
customUser-defined promptConfigurable — select which data sources to include
🔧

Manufacturing Engineer

The shift-summary replaces manual handover notes. The anomaly detector catches patterns humans miss — like a machine that's technically running but cycling abnormally. Predictive maintenance tells you when to change a tool before it breaks. Root cause analysis explains why an alarm happened, not just that it happened.

💻

Developer

Each analysis type has its own system prompt and data context assembly. The context builder queries Valkey and PostgreSQL, computes aggregates, and assembles everything into a structured prompt. Responses are structured JSON. Every call is logged with token counts, latency, and the complete response for debugging and cost tracking.

🖥️

IT Engineer

Requires an API key for OpenAI or Anthropic — set via environment variable. All AI calls are audited in PostgreSQL (uns_ai table). Results are published back to MQTT, so they flow through the same infrastructure as machine data. Monitor token usage and API costs via the audit trail.

End-to-End Data Flow

Here's what happens when a single machine event — say, cnc-01 transitions from ACTIVE to ALARM — flows through the entire pipeline:

GOLDEN PATH — ONE EVENT THROUGH THE PIPELINE 1 cnc-01 publishes status: ALARM uns-sim → MQTT topic v1.0/enterprise/site1/area1/cnc-01/status 2 uns-framework caches the new value Valkey: uns:data:* = ALARM, uns:prev:* = ACTIVE, uns:meta:* updated 3 uns-historian logs the change immediately PostgreSQL: INSERT INTO uns_historian (topic, value) — real-time, no delay 4 uns-state detects ACTIVE → ALARM transition PostgreSQL: INSERT INTO uns_state (state=ACTIVE, duration_s=120, next_state=ALARM) 5 uns-stoppage auto-classifies: ALARM → FAULT (unplanned) PostgreSQL: INSERT INTO uns_stoppage — operator can override to TOOL_BREAK later 6 uns-kpi recalculates: utilisation drops, MTTR starts Next API call returns updated KPIs reflecting the alarm event 7 uns-ai detects anomaly, publishes root cause analysis MQTT: v1.0/.../ai/anomaly — "Tool T03 offset trending +0.02mm over 4 hours before alarm"
Total time from machine event to AI insight: seconds. The entire pipeline runs continuously — no batch processing, no overnight jobs, no manual intervention.

Infrastructure Map

All 12 functions share 3 pieces of infrastructure. Here's exactly which function uses which:

FunctionMQTT BrokerValkey CachePostgreSQLExternal
uns-sim✓ publish
uns-framework✓ subscribe✓ write
uns-cache✓ read
uns-log✓ read✓ write
uns-historian✓ subscribe✓ config✓ write
uns-state✓ read✓ write
uns-stoppage✓ read/write
uns-productivity✓ read✓ write
uns-input✓ write
uns-kpi✓ read
uns-dashboard✓ read
uns-ai✓ publish✓ read✓ read/writeAI API
i
All containers run on a single Docker network (fnkit-network). Infrastructure services are addressed by container name: fnkit-mqtt, fnkit-cache, fnkit-db. No service discovery, no DNS configuration — Docker handles it.

Deployment Order

Functions have dependencies — deploy them in this order:

Phase 1 — Infrastructure (required first)
  ├── MQTT Broker (Mosquitto / EMQX)
  ├── Valkey Cache
  └── PostgreSQL

Phase 2 — Capture (needs MQTT + Valkey)
  ├── uns-framework     ← must be running before anything reads Valkey
  └── uns-sim           ← or your real machines / edge agents

Phase 3 — Process (needs Valkey + PostgreSQL)
  ├── uns-cache         ← reads Valkey only
  ├── uns-log           ← reads Valkey, writes PostgreSQL
  ├── uns-historian     ← subscribes MQTT, writes PostgreSQL
  ├── uns-state         ← reads Valkey, writes PostgreSQL
  ├── uns-productivity  ← reads Valkey, writes PostgreSQL
  ├── uns-input         ← writes PostgreSQL (independent)
  └── uns-stoppage      ← reads uns_state table (deploy after uns-state)

Phase 4 — Report (needs PostgreSQL tables populated)
  ├── uns-kpi           ← reads all tables
  └── uns-dashboard     ← reads all tables via Grafana

Phase 5 — AI (needs everything)
  └── uns-ai            ← reads Valkey + PostgreSQL, calls AI API
uns-framework must be running before any processing function. It's the only function that writes to Valkey. If it's not running, the cache is empty and all downstream functions have nothing to read.

Quick Start — Full Pipeline

# Start infrastructure (if not already running via fnkit)
# fnkit handles MQTT, Valkey, and PostgreSQL automatically

# Phase 2: Capture
cd uns-framework && cp .env.example .env && docker compose up -d && cd ..
cd uns-sim && cp .env.example .env && docker compose up -d && cd ..

# Phase 3: Process
cd uns-cache && cp .env.example .env && docker compose up -d && cd ..
cd uns-log && cp .env.example .env && docker compose up -d && cd ..
cd uns-historian && docker compose up -d && cd ..
cd uns-state && cp .env.example .env && docker compose up -d && cd ..
cd uns-productivity && cp .env.example .env && docker compose up -d && cd ..
cd uns-input && cp .env.example .env && docker compose up -d && cd ..
cd uns-stoppage && cp .env.example .env && docker compose up -d && cd ..

# Phase 4: Report
cd uns-kpi && cp .env.example .env && docker compose up -d && cd ..
cd uns-dashboard && cp .env.example .env && docker compose up -d && cd ..

# Phase 5: AI (optional — requires API key)
cd uns-ai && cp .env.example .env && docker compose up -d && cd ..

Next Steps

ResourceDescription
Components OverviewQuick reference table of all 12 functions with links to individual docs
Quick StartGet the full pipeline running in under 5 minutes
ArchitectureSystem architecture, infrastructure layout, and design decisions
Code PatternsImplementation patterns shared across all functions
Manufacturing KPIsDeep dive into KPI calculations and what they mean
ExtendingHow to add your own functions to the pipeline
Database SchemaAll PostgreSQL table definitions and relationships

Guide Version: 1.0 · Applies To: fn-uns pipeline (all 12 functions)

Last updated March 2026.