Functions That Make Up the UNS Pipeline
A complete walkthrough of the 12 functions that form the fn-uns data pipeline — from machine data capture through processing, reporting, and AI-powered analysis. Written for developers, IT engineers, and manufacturing engineers.
The Big Picture
The fn-uns pipeline is made up of 12 independent functions organised into 4 layers. Each function runs in its own Docker container, does one job, and communicates through shared infrastructure — never through direct calls to other functions.
How It Works
The pipeline follows a simple principle: data flows through infrastructure, not through function calls. Machines publish to MQTT. Functions read from MQTT or Valkey, process data, and write to PostgreSQL. Reporting functions read from PostgreSQL. AI reads from everything.
This decoupled architecture means:
- ✓ Independent deployment — update one function without touching others
- ✓ Language freedom — Go and Node.js side by side, use what fits
- ✓ Failure isolation — if uns-stoppage crashes, state tracking continues
- ✓ Easy to extend — add a new function that reads the same data
- ✓ Testable in isolation — each function has its own Docker Compose
What This Means for You
Manufacturing Engineer
Each function maps to a real manufacturing concept you already know — machine status, production runs, stoppages, KPIs. The pipeline turns raw machine signals into the metrics you use daily: utilisation, availability, MTBF, throughput. No black box — every calculation is visible and auditable.
Developer
12 microservices, each scaffolded with fnkit, each with its own Dockerfile and docker-compose.yml. Go for performance-critical paths (state tracking, historian), Node.js for flexibility (cache reader, stoppage classification, AI). All functions follow the same patterns: Valkey config, auto-create tables, JSON responses.
IT Engineer
3 pieces of shared infrastructure: MQTT broker, Valkey cache, PostgreSQL database. 12 stateless containers on a single Docker network. No service mesh, no API gateway, no orchestrator. Each container is independently deployable and restartable. Monitoring is straightforward — check container health and database connectivity.
Layer 1: Capture
The capture layer has one job: get machine data off the factory floor and into a fast, queryable cache. Two functions handle this — a simulator (or your real machines) and a topic monitor.
Machine Simulator. Publishes realistic CNC machine data to MQTT every 3 seconds. Simulates 4 machines (Haas VF-2, DMG Mori NLX 2500, Mazak Integrex i-200, Okuma LB3000 EX II) across 2 production areas. Each machine publishes 3 topics: /status, /program, /tool.
In production, replace with OPC UA gateway, Sparkplug B, or a custom edge agent. The rest of the pipeline works identically.
Topic Monitor. Subscribes to v1.0/# — the entire UNS namespace. Every message is cached in Valkey with current/previous value tracking. This is the foundation — all other functions read from this cache.
Maintains a SET of all discovered topics, the latest payload, the previous payload, and metadata (last_updated, message count, first_seen).
Manufacturing Engineer
This is where your machine signals enter the digital world. Every status change, program update, and tool measurement is captured. The simulator mimics real CNC behaviour — ACTIVE, IDLE, SETUP, ALARM, WAITING, OFFLINE — so you can validate the entire pipeline before connecting real equipment.
Developer
uns-framework is written in Go for performance — it handles every MQTT message with minimal latency. The current/previous pattern in Valkey enables change detection downstream without any function needing to maintain its own state. Check uns:meta:* for message counts and timing.
IT Engineer
Two containers plus the MQTT broker and Valkey. uns-sim is optional in production (replaced by real PLCs). uns-framework is the only MQTT subscriber that writes to Valkey — single point of ingestion. Monitor Valkey memory usage and MQTT connection status.
Topics Published by uns-sim
v1.0/enterprise/site1/area1/cnc-01/status → { "state": "ACTIVE", "spindle_speed": 8500, ... }
v1.0/enterprise/site1/area1/cnc-01/program → { "name": "PART-A-001", "parts_made": 42, ... }
v1.0/enterprise/site1/area1/cnc-01/tool → { "id": "T01", "life_remaining": 78, ... }
v1.0/enterprise/site1/area1/cnc-02/status ...
v1.0/enterprise/site1/area1/cnc-02/program ...
v1.0/enterprise/site1/area1/cnc-02/tool ...
v1.0/enterprise/site1/area2/cnc-03/status ...
v1.0/enterprise/site1/area2/cnc-03/program ...
v1.0/enterprise/site1/area2/cnc-03/tool ...
v1.0/enterprise/site1/area2/cnc-04/status ...
v1.0/enterprise/site1/area2/cnc-04/program ...
v1.0/enterprise/site1/area2/cnc-04/tool ...
12 topics total — 4 machines × 3 data tags — published every 3 secondsLayer 2: Process
Seven functions read from the capture layer and write structured records to PostgreSQL. They run in parallel — each watching for different things in the same data.
The 7 Processing Functions
Cache Reader API. Reads all cached UNS topics from Valkey and returns JSON. Compares current vs previous values to detect which topics have changed since the last read. Returns both the full topic list and a filtered changes-only list.
Change Logger. Compares current vs previous values. When any value changes, inserts a complete snapshot row into PostgreSQL — all values, not just the changed ones. Unchanged values are copied forward. This gives you a full picture at any point in time.
HTTP-triggered, polled on a timer. May miss rapid changes between polls.
MQTT Historian. Subscribes directly to MQTT topics and logs every single value change to PostgreSQL in real-time. In-memory change detection skips duplicate values. Unlike uns-log, no values are ever missed between polls — this captures the complete history.
Supports MQTT wildcards (+ single level, # multi-level) for flexible topic selection.
State Duration Tracker. Reads /status topics, maintains an in-memory state tracker per machine. When a machine transitions from one state to another, logs the completed state with its exact duration. This is the foundation for all time-based KPIs — utilisation, availability, MTBF, MTTR.
Stoppage Classifier. Reads non-ACTIVE states from the uns_state table and auto-classifies each stoppage (IDLE→NO_WORK, ALARM→FAULT, SETUP→SETUP, etc.). Provides a POST endpoint for operators to override the auto-classification with the real reason — e.g. changing FAULT to TOOL_BREAK.
Production Run Logger. Reads /program topics from Valkey, tracks active production runs per machine. When a run completes (parts reached target), the program changes, or the machine stops, logs the run with throughput metrics — parts made, target, cycle time, duration.
Manual Data Entry. HTTP API for operators to submit data that machines can't provide automatically — scrap counts with reasons, quality measurements, free-text notes, value corrections, and manual downtime reasons. The human layer of the pipeline.
uns-historian vs uns-log
These two functions both log data to PostgreSQL, but they serve different purposes:
uns-log — Snapshot Logger
HTTP-triggered, polled on a timer. Captures a complete snapshot of all values when any value changes. Good for "what did everything look like at this moment?" queries. May miss rapid changes between polls. Each row contains all tags for a machine.
uns-historian — Real-Time Logger
MQTT-triggered, captures every individual change as it happens. No values missed. Good for "show me every state transition" queries. Each row contains a single value for a single topic. Powers the Grafana dashboards.
State Machine Animation
uns-state tracks transitions like this — each completed state is logged with its exact duration:
Layer 3: Report
Two functions consume the PostgreSQL tables and present manufacturing metrics — one as a JSON API, one as visual dashboards.
KPI Reporter. Queries all PostgreSQL tables and computes manufacturing KPIs on demand. Pure read, no config needed — just SQL and JSON. Supports filtering by machine, area, and time range.
Computes: Utilisation %, Availability %, Throughput, Stoppage Pareto, MTBF, MTTR, Scrap totals.
5 Grafana Dashboards. Machine Overview, Production, Tooling, Status Log, and KPI Overview. Queries PostgreSQL directly using SQL with LEAD() and LAG() window functions for accurate duration-based metrics and deduplication.
Consistent colour mapping: ACTIVE=green, IDLE=yellow, ALARM=red, OFFLINE=gray, SETUP=blue, WAITING=orange.
KPI Calculations
| KPI | Source Table | Calculation | Who Uses It |
|---|---|---|---|
| Utilisation % | uns_state | ACTIVE time / total tracked time | Production managers, shift leads |
| Availability % | uns_state + uns_stoppage | (total - unplanned stops) / total | Maintenance, operations |
| Throughput | uns_productivity | Parts/hour, target attainment % | Production planning |
| Stoppage Pareto | uns_stoppage | Top reasons ranked by total duration | Continuous improvement |
| MTBF | uns_state | Average ACTIVE run before ALARM | Maintenance planning |
| MTTR | uns_state | Average ALARM duration | Maintenance response |
| Scrap | uns_input | Total rejected parts per machine | Quality, production |
Manufacturing Engineer
These are the metrics you already track — but now they're computed automatically from real machine data, not from manual spreadsheets. The stoppage pareto tells you exactly where to focus improvement efforts. MTBF and MTTR give you maintenance scheduling data.
Developer
uns-kpi is a pure read function — no state, no config, no side effects. It runs SQL queries against PostgreSQL and returns JSON. The Grafana dashboards use window functions (LEAD, LAG) for accurate time-based calculations. All dashboard JSON is version-controlled.
IT Engineer
Both reporting functions are read-only — they never write to the database. Grafana connects directly to PostgreSQL with a read-only user. uns-kpi is stateless and can be scaled horizontally. Dashboard provisioning is automated via Docker Compose.
Layer 4: AI
The AI layer reads from everything — live data from Valkey, historical data from PostgreSQL — assembles a rich context, and sends it to an AI provider for analysis.
AI-Powered Analysis. Reads live machine data from Valkey and historical data from PostgreSQL, assembles a rich context for the chosen analysis type, sends it to an AI provider (OpenAI or Anthropic), publishes structured results back to MQTT, and stores a full audit trail in PostgreSQL.
7 built-in analysis types: anomaly detection, shift summaries, smart alerting, predictive maintenance, production optimisation, root cause analysis, and custom prompts.
Analysis Types
| Type | Purpose | Data Sources |
|---|---|---|
anomaly | Detect abnormal machine behaviour | Live status, state history, baseline stats, alarms, tool data |
shift-summary | Generate shift handover report | Live status, states, stoppages, production, scrap, operator notes |
smart-alert | Contextual alert triage | Live status, recent alarms, stoppages, tool condition, production |
predictive | Tool wear / maintenance prediction | Tool data, tool history, alarms, tool-related stoppages, baseline |
optimisation | Machine-program assignment | Live status, production aggregates, scrap rates, reliability |
root-cause | Explain why an alarm happened | State timeline, stoppages, tool condition, sensor history, notes |
custom | User-defined prompt | Configurable — select which data sources to include |
Manufacturing Engineer
The shift-summary replaces manual handover notes. The anomaly detector catches patterns humans miss — like a machine that's technically running but cycling abnormally. Predictive maintenance tells you when to change a tool before it breaks. Root cause analysis explains why an alarm happened, not just that it happened.
Developer
Each analysis type has its own system prompt and data context assembly. The context builder queries Valkey and PostgreSQL, computes aggregates, and assembles everything into a structured prompt. Responses are structured JSON. Every call is logged with token counts, latency, and the complete response for debugging and cost tracking.
IT Engineer
Requires an API key for OpenAI or Anthropic — set via environment variable. All AI calls are audited in PostgreSQL (uns_ai table). Results are published back to MQTT, so they flow through the same infrastructure as machine data. Monitor token usage and API costs via the audit trail.
End-to-End Data Flow
Here's what happens when a single machine event — say, cnc-01 transitions from ACTIVE to ALARM — flows through the entire pipeline:
Infrastructure Map
All 12 functions share 3 pieces of infrastructure. Here's exactly which function uses which:
| Function | MQTT Broker | Valkey Cache | PostgreSQL | External |
|---|---|---|---|---|
| uns-sim | ✓ publish | — | — | — |
| uns-framework | ✓ subscribe | ✓ write | — | — |
| uns-cache | — | ✓ read | — | — |
| uns-log | — | ✓ read | ✓ write | — |
| uns-historian | ✓ subscribe | ✓ config | ✓ write | — |
| uns-state | — | ✓ read | ✓ write | — |
| uns-stoppage | — | — | ✓ read/write | — |
| uns-productivity | — | ✓ read | ✓ write | — |
| uns-input | — | — | ✓ write | — |
| uns-kpi | — | — | ✓ read | — |
| uns-dashboard | — | — | ✓ read | — |
| uns-ai | ✓ publish | ✓ read | ✓ read/write | AI API |
Deployment Order
Functions have dependencies — deploy them in this order:
Phase 1 — Infrastructure (required first) ├── MQTT Broker (Mosquitto / EMQX) ├── Valkey Cache └── PostgreSQL Phase 2 — Capture (needs MQTT + Valkey) ├── uns-framework ← must be running before anything reads Valkey └── uns-sim ← or your real machines / edge agents Phase 3 — Process (needs Valkey + PostgreSQL) ├── uns-cache ← reads Valkey only ├── uns-log ← reads Valkey, writes PostgreSQL ├── uns-historian ← subscribes MQTT, writes PostgreSQL ├── uns-state ← reads Valkey, writes PostgreSQL ├── uns-productivity ← reads Valkey, writes PostgreSQL ├── uns-input ← writes PostgreSQL (independent) └── uns-stoppage ← reads uns_state table (deploy after uns-state) Phase 4 — Report (needs PostgreSQL tables populated) ├── uns-kpi ← reads all tables └── uns-dashboard ← reads all tables via Grafana Phase 5 — AI (needs everything) └── uns-ai ← reads Valkey + PostgreSQL, calls AI API
Quick Start — Full Pipeline
# Start infrastructure (if not already running via fnkit) # fnkit handles MQTT, Valkey, and PostgreSQL automatically # Phase 2: Capture cd uns-framework && cp .env.example .env && docker compose up -d && cd .. cd uns-sim && cp .env.example .env && docker compose up -d && cd .. # Phase 3: Process cd uns-cache && cp .env.example .env && docker compose up -d && cd .. cd uns-log && cp .env.example .env && docker compose up -d && cd .. cd uns-historian && docker compose up -d && cd .. cd uns-state && cp .env.example .env && docker compose up -d && cd .. cd uns-productivity && cp .env.example .env && docker compose up -d && cd .. cd uns-input && cp .env.example .env && docker compose up -d && cd .. cd uns-stoppage && cp .env.example .env && docker compose up -d && cd .. # Phase 4: Report cd uns-kpi && cp .env.example .env && docker compose up -d && cd .. cd uns-dashboard && cp .env.example .env && docker compose up -d && cd .. # Phase 5: AI (optional — requires API key) cd uns-ai && cp .env.example .env && docker compose up -d && cd ..
Next Steps
| Resource | Description |
|---|---|
| Components Overview | Quick reference table of all 12 functions with links to individual docs |
| Quick Start | Get the full pipeline running in under 5 minutes |
| Architecture | System architecture, infrastructure layout, and design decisions |
| Code Patterns | Implementation patterns shared across all functions |
| Manufacturing KPIs | Deep dive into KPI calculations and what they mean |
| Extending | How to add your own functions to the pipeline |
| Database Schema | All PostgreSQL table definitions and relationships |
Guide Version: 1.0 · Applies To: fn-uns pipeline (all 12 functions)
Last updated March 2026.