7.2 Pipeline C — Enterprise Real-Time UNS (MQTT)
Purpose
Pipeline C provides the Enterprise Real-Time Unified Namespace (UNS), enabling event-driven, near real-time distribution of contextualized OT data to enterprise consumers using MQTT.
This pipeline is intended for real-time awareness and integration, not historical analysis. Snowflake remains the system of record for replay and analytics.

Design Intent
Pipeline C is designed around the following principles:
UNS backbone first
Establish a stable, enterprise-wide topic backbone that scales across sites and shops.
Flexible branching
Allow downstream topic structures to evolve without breaking the enterprise backbone.
Decoupled Pub/Sub
Producers (HighByte) publish once; multiple consumers subscribe independently.
Contextualized publishing
Only governed, contextualized data (post-standardization/enrichment) is published to enterprise topics.
Role of HighByte in Pipeline C
HighByte acts as the publishing authority for Pipeline C, responsible for:
Selecting which contextualized data is published to MQTT
Enforcing the enterprise UNS backbone
Applying consistent payload contracts (timestamp, quality, model/version metadata where needed)
Publishing inference results (from Pipeline J) back into the UNS for enterprise visibility
This ensures all real-time consumers observe consistent semantics.
MQTT Broker & Transport
Broker: AWS IoT Core (MQTT)
Pattern: Publish / Subscribe
Delivery: Near real-time, event-driven
AWS IoT Core provides a scalable enterprise backbone for multi-consumer distribution.
Namespace Backbone and Topic Structure

Fixed enterprise backbone
Pipeline C standardizes the mandatory topic prefix as:
[company_cd]/[plant_cd]/[shop_cd]/[line_cd]/[station_cd]/[function_cd]/[cd_cd]/data
This backbone is the governed routing layer aligned with ISA-95.
Everything after {cell} is flexible and use-case dependent.
Flexible branching after {cell}
After the backbone, the topic can branch to represent:
Asset identity (optional, if multiple assets exist within a cell)
Data category (state, event, metric, decision)
Domain (quality, maintenance, energy, etc.)
Versioning (optional contract versioning)
Example topic patterns (illustrative)
1) Asset state updates
tmt/banpho/welding/deckline_zone/[station_cd]/[function_cd]/[ce_cd]/data
Payload (JSON)
"{
""event_ts"": ""2026-03-06T10:00:00.000Z"",
""asset_path"": ""tmt/banpho/welding/deckline_zone/[station_cd]/[function_cd]/[ce_cd]/data"",
""machine_type"": [function_cd],
""data"": {
""error_log"": """",
""error_status"": 0,
""execution_program"": ""PRG_001"",
""io_internal_signal"": false,
""io_robot_cycle_start"": true,
""io_robot_during_tip_forming"": false,
""io_robot_home_position"": false,
""io_robot_motor_power_on"": true,
""io_robot_operation_completed"": false,
""motor_power_state"": 1,
""operating_mode"": 2,
""program_execution_state"": 1,
""joint_angle_value"": [12.1, -19.0, -32.3, 0.0, -76.8, 16.0, 75.4],
""joint_command_value"": [12.1, -19.0, -32.3, 0.0, -76.8, 16.0, 75.4],
""joint_deviation"": [0.01, 0.02, 0.01, 0.00, 0.03, 0.01, 0.02],
""joint_encoder_value"": [12.1, -19.0, -32.3, 0.0, -76.8, 16.0, 75.4],
""joint_speed_value"": [0.5, 0.3, 0.2, 0.1, 0.4, 0.2, 0.1],
""monitor_speed"": 0.4,
""motor_current"": [18.6, 14.2, 12.1, 8.4, 11.3, 9.7, 7.2],
""motor_current_cmd_arms"": [20.0, 15.0, 13.0, 9.0, 12.0, 10.0, 8.0],
""program_accuracy"": 100.0,
""program_always_speed"": 100.0,
""program_speed"": 100.0,
""transformation_value"": [277.2, 2428.8, 874.8, 0.0, 0.0, 0.0, 0]
},
""quality"": ""GOOD"",
""quality_detail"": { ""stale_tags"": [], ""bad_tags"": [] },
""source"": ""HighByte""
}"Intent
Event-driven state update
Near real-time operational visibility
Not meant for history or ML training
2) /status — 10 state/IO variables
Topic: tmt/banpho/welding/deckline_zone/[station_cd]/[function_cd]/[ce_cd]/status
data Trigger: On any state/IO tag change
{
"event_ts": "2026-03-06T10:00:00.000Z",
"asset_path": "tmt/banpho/welding/deckline_zone/w_rbt_651",
"machine_type": "kawasaki_robot_controller",
"data": {
"error_status": 0,
"motor_power_state": 1,
"operating_mode": 2,
"program_execution_state": 1,
"io_internal_signal": false,
"io_robot_cycle_start": true,
"io_robot_during_tip_forming": false,
"io_robot_home_position": false,
"io_robot_motor_power_on": true,
"io_robot_operation_completed": false
},
"quality": "GOOD",
"quality_detail": { "stale_tags": [], "bad_tags": [] },
"source": "HighByte"
}3) /counter — 4 execution variables
Topic: tmt/banpho/welding/deckline_zone/[station_cd]/[function_cd]/[ce_cd]/counter
of data Trigger: On any counter tag change
{
"event_ts": "2026-03-06T10:00:00.000Z",
"asset_path": "tmt/banpho/welding/deckline_zone/w_rbt_651",
"machine_type": "kawasaki_robot_controller",
"data": {
"execution_program": "PRG_001",
"program_accuracy": 100.0,
"program_always_speed": 100.0,
"program_speed": 100.0
},
"quality": "GOOD",
"quality_detail": { "stale_tags": [], "bad_tags": [] },
"source": "HighByte"
}4) /alarms — 2 fault variables
Topic: tmt/banpho/welding/{line}/{machine_id}/alarmsSource: alarms instance — reference view of
{
"event_ts": "2026-03-06T10:00:00.000Z",
"asset_path": "tmt/banpho/welding/deckline_zone/w_rbt_651",
"machine_type": "kawasaki_robot_controller",
"data": {
"error_status": 1,
"error_log": "E-001 Overload detected J3"
},
"quality": "GOOD",
"quality_detail": { "stale_tags": [], "bad_tags": [] },
"source": "HighByte"
}data Trigger: On error_status change
5). /metadata — 8 machine registry variables
Topic: tmt/banpho/welding/{line}/{machine_id}/metadataSource: metadata instance — populated
from Machine Registry at startup Trigger: On metadata update (typically once at startup; periodic 60s refresh)
{
"event_ts": "2026-03-06T10:00:00.000Z",
"asset_path": "tmt/banpho/welding/deckline_zone/w_rbt_651",
"machine_type": "kawasaki_robot_controller",
"data": {
"machine_id": "w_rbt_651",
"machine_type": "kawasaki_robot_controller",
"line": "deckline_zone",
"hotcode": "DFR2-1",
"site": "ban_pho",
"shop": "welding",
"rank": "1",
"active_flag": true
},
"quality": "GOOD",
"quality_detail": { "stale_tags": [], "bad_tags": [] },
"source": "HighByte"
}6 /feature — 24 variables grouped for ML input
Topic: tmt/banpho/welding/{line}/{machine_id}/featureSource: feature namespace — Pipeline
{
"event_ts": "2026-03-06T10:00:00.000Z",
"asset_path": "tmt/banpho/welding/deckline_zone/w_rbt_651",
"machine_type": "kawasaki_robot_controller",
"feature_version": "v1.0",
"feature_event_id": "TBD",
"data": {
"state_features": {
"error_log": "",
"error_status": 0,
"execution_program": "PRG_001",
"io_internal_signal": false,
"io_robot_cycle_start": true,
"io_robot_during_tip_forming": false,
"io_robot_home_position": false,
"io_robot_motor_power_on": true,
"io_robot_operation_completed": false,
"motor_power_state": 1,
"operating_mode": 2,
"program_execution_state": 1
},
"kinematic_features": {
"joint_angle_value": [12.1, -19.0, -32.3, 0.0, -76.8, 16.0, 75.4],
"joint_command_value": [12.1, -19.0, -32.3, 0.0, -76.8, 16.0, 75.4],
"joint_deviation": [0.01, 0.02, 0.01, 0.00, 0.03, 0.01, 0.02],
"joint_encoder_value": [12.1, -19.0, -32.3, 0.0, -76.8, 16.0, 75.4],
"joint_speed_value": [0.5, 0.3, 0.2, 0.1, 0.4, 0.2, 0.1],
"monitor_speed": 0.4,
"motor_current": [18.6, 14.2, 12.1, 8.4, 11.3, 9.7, 7.2],
"motor_current_cmd_arms": [20.0, 15.0, 13.0, 9.0, 12.0, 10.0, 8.0],
"program_accuracy": 100.0,
"program_always_speed": 100.0,
"program_speed": 100.0,
"transformation_value": [277.2, 2428.8, 874.8, 0.0, 0.0, 0.0, 0]
}
},
"quality": "GOOD",
"quality_detail": { "stale_tags": [], "bad_tags": [] },
"source": "HighByte"
}7) /ml — inference result
Topic: tmt/banpho/welding/{line}/{machine_id}/mlSource: ml instance — written by Pipeline ML
{
"event_ts": "2026-03-06T10:00:00.001Z",
"asset_path": "tmt/banpho/welding/deckline_zone/w_rbt_651",
"machine_type": "kawasaki_robot_controller",
"data": {
"inference_ts": "2026-03-06T10:00:00.001Z",
"model_name": "robot_health_classifier",
"model_version": "v1.0",
"prediction": "DEFECT_RISK",
"score": 0.87,
"threshold": 0.75,
"decision": "ALERT",
"feature_event_id": "TBD"
},
"quality": "GOOD",
"quality_detail": { "stale_tags": [], "bad_tags": [] },
"source": "HighByte"
}Typical Consumers
Pipeline C supports enterprise consumers such as:
Real-time dashboards
Alerting / notification services
Event-driven applications
Integration services that require near real-time state and decision visibility
Consumers subscribe to only the topics they need.
Relationship to Other Pipelines
Pipeline B (Snowflake):
Pipeline C is real-time; Pipeline B is historical and replayable. They are complementary.
Pipeline J (On-Prem Inference):
Inference results are fed back into Pipeline C as decisions/events, enabling real-time operational visibility and response.
What Pipeline C Does Not Do
To avoid misuse:
Not a historical store
Not a replay mechanism
Not the feature engineering layer
Not a replacement for Snowflake or historians
Pipeline C is strictly real-time enterprise distribution.
Governance & Change Control
The UNS backbone is strictly governed
Topic branches beyond {cell} are:
Allowed to evolve
Must be documented (topic catalogue)
Should follow controlled versioning when payload contracts change
Publishing rights are controlled to avoid multiple “truth publishers”
Summary
Pipeline C establishes a governed enterprise UNS backbone ({enterprise}/{site}/{area}/{line}/{cell}) for real-time MQTT distribution, while allowing flexible branching for evolving operational, analytics-adjacent, and AI decision use cases.