📄 7.2 Pipeline C — Enterprise Real-Time UNS (MQTT)

7.2 Pipeline C — Enterprise Real-Time UNS (MQTT)

Purpose

Pipeline C provides the Enterprise Real-Time Unified Namespace (UNS), enabling event-driven, near real-time distribution of contextualized OT data to enterprise consumers using MQTT.

This pipeline is intended for real-time awareness and integration, not historical analysis. Snowflake remains the system of record for replay and analytics.

Design Intent

Pipeline C is designed around the following principles:

  • UNS backbone first

    Establish a stable, enterprise-wide topic backbone that scales across sites and shops.

  • Flexible branching

    Allow downstream topic structures to evolve without breaking the enterprise backbone.

  • Decoupled Pub/Sub

    Producers (HighByte) publish once; multiple consumers subscribe independently.

  • Contextualized publishing

    Only governed, contextualized data (post-standardization/enrichment) is published to enterprise topics.

Role of HighByte in Pipeline C

HighByte acts as the publishing authority for Pipeline C, responsible for:

  • Selecting which contextualized data is published to MQTT

  • Enforcing the enterprise UNS backbone

  • Applying consistent payload contracts (timestamp, quality, model/version metadata where needed)

  • Publishing inference results (from Pipeline J) back into the UNS for enterprise visibility

This ensures all real-time consumers observe consistent semantics.

MQTT Broker & Transport

  • Broker: AWS IoT Core (MQTT)

  • Pattern: Publish / Subscribe

  • Delivery: Near real-time, event-driven

AWS IoT Core provides a scalable enterprise backbone for multi-consumer distribution.

Namespace Backbone and Topic Structure

Fixed enterprise backbone

Pipeline C standardizes the mandatory topic prefix as:

[company_cd]/[plant_cd]/[shop_cd]/[line_cd]/[station_cd]/[function_cd]/[cd_cd]/data

This backbone is the governed routing layer aligned with ISA-95.

Everything after {cell} is flexible and use-case dependent.

Flexible branching after {cell}

After the backbone, the topic can branch to represent:

  • Asset identity (optional, if multiple assets exist within a cell)

  • Data category (state, event, metric, decision)

  • Domain (quality, maintenance, energy, etc.)

  • Versioning (optional contract versioning)

Example topic patterns (illustrative)

1) Asset state updates

tmt/banpho/welding/deckline_zone/[station_cd]/[function_cd]/[ce_cd]/data

Payload (JSON)

"{
""event_ts"": ""2026-03-06T10:00:00.000Z"",
""asset_path"": ""tmt/banpho/welding/deckline_zone/[station_cd]/[function_cd]/[ce_cd]/data"",
""machine_type"": [function_cd],
""data"": {
""error_log"": """",
""error_status"": 0,
""execution_program"": ""PRG_001"",
""io_internal_signal"": false,
""io_robot_cycle_start"": true,
""io_robot_during_tip_forming"": false,
""io_robot_home_position"": false,
""io_robot_motor_power_on"": true,
""io_robot_operation_completed"": false,
""motor_power_state"": 1,
""operating_mode"": 2,
""program_execution_state"": 1,
""joint_angle_value"": [12.1, -19.0, -32.3, 0.0, -76.8, 16.0, 75.4],
""joint_command_value"": [12.1, -19.0, -32.3, 0.0, -76.8, 16.0, 75.4],
""joint_deviation"": [0.01, 0.02, 0.01, 0.00, 0.03, 0.01, 0.02],
""joint_encoder_value"": [12.1, -19.0, -32.3, 0.0, -76.8, 16.0, 75.4],
""joint_speed_value"": [0.5, 0.3, 0.2, 0.1, 0.4, 0.2, 0.1],
""monitor_speed"": 0.4,
""motor_current"": [18.6, 14.2, 12.1, 8.4, 11.3, 9.7, 7.2],
""motor_current_cmd_arms"": [20.0, 15.0, 13.0, 9.0, 12.0, 10.0, 8.0],
""program_accuracy"": 100.0,
""program_always_speed"": 100.0,
""program_speed"": 100.0,
""transformation_value"": [277.2, 2428.8, 874.8, 0.0, 0.0, 0.0, 0]
},
""quality"": ""GOOD"",
""quality_detail"": { ""stale_tags"": [], ""bad_tags"": [] },
""source"": ""HighByte""
}"

Intent

  • Event-driven state update

  • Near real-time operational visibility

  • Not meant for history or ML training

2) /status — 10 state/IO variables

Topic: tmt/banpho/welding/deckline_zone/[station_cd]/[function_cd]/[ce_cd]/status

data Trigger: On any state/IO tag change

{
"event_ts": "2026-03-06T10:00:00.000Z",
"asset_path": "tmt/banpho/welding/deckline_zone/w_rbt_651",
"machine_type": "kawasaki_robot_controller",
"data": {
"error_status": 0,
"motor_power_state": 1,
"operating_mode": 2,
"program_execution_state": 1,
"io_internal_signal": false,
"io_robot_cycle_start": true,
"io_robot_during_tip_forming": false,
"io_robot_home_position": false,
"io_robot_motor_power_on": true,
"io_robot_operation_completed": false
},
"quality": "GOOD",
"quality_detail": { "stale_tags": [], "bad_tags": [] },
"source": "HighByte"
}


3) /counter — 4 execution variables

Topic: tmt/banpho/welding/deckline_zone/[station_cd]/[function_cd]/[ce_cd]/counter

of data Trigger: On any counter tag change

{
"event_ts": "2026-03-06T10:00:00.000Z",
"asset_path": "tmt/banpho/welding/deckline_zone/w_rbt_651",
"machine_type": "kawasaki_robot_controller",
"data": {
"execution_program": "PRG_001",
"program_accuracy": 100.0,
"program_always_speed": 100.0,
"program_speed": 100.0
},
"quality": "GOOD",
"quality_detail": { "stale_tags": [], "bad_tags": [] },
"source": "HighByte"
}


4) /alarms — 2 fault variables

Topic: tmt/banpho/welding/{line}/{machine_id}/alarms

Source: alarms instance — reference view of

{
"event_ts": "2026-03-06T10:00:00.000Z",
"asset_path": "tmt/banpho/welding/deckline_zone/w_rbt_651",
"machine_type": "kawasaki_robot_controller",
"data": {
"error_status": 1,
"error_log": "E-001 Overload detected J3"
},
"quality": "GOOD",
"quality_detail": { "stale_tags": [], "bad_tags": [] },
"source": "HighByte"
}

data Trigger: On error_status change


5).  /metadata — 8 machine registry variables

Topic: tmt/banpho/welding/{line}/{machine_id}/metadata

Source: metadata instance — populated

from Machine Registry at startup Trigger: On metadata update (typically once at startup; periodic 60s refresh)

{
"event_ts": "2026-03-06T10:00:00.000Z",
"asset_path": "tmt/banpho/welding/deckline_zone/w_rbt_651",
"machine_type": "kawasaki_robot_controller",
"data": {
"machine_id": "w_rbt_651",
"machine_type": "kawasaki_robot_controller",
"line": "deckline_zone",
"hotcode": "DFR2-1",
"site": "ban_pho",
"shop": "welding",
"rank": "1",
"active_flag": true
},
"quality": "GOOD",
"quality_detail": { "stale_tags": [], "bad_tags": [] },
"source": "HighByte"
}


6 /feature — 24 variables grouped for ML input

Topic: tmt/banpho/welding/{line}/{machine_id}/feature

Source: feature namespace — Pipeline

Feature Aggregate output Trigger: On Pipeline Feature Aggregate evaluation cycle
Why feature_version is here: This payload is the direct input to ML inference. Greengrass must
validate that the feature set it receives matches the version used to train the model. See §4.2 for full
explanation.
Why feature_event_id is here: This ID is the traceability key between the feature event and the
resulting inference. It is generated at the time the /feature payload is produced by HighByte (Option
A — see §6.4 open decision) and carried forward into ml/inference, ML_LANDING.FEATURE_EVENTS,
and PRED_LANDING.INFERENCE_EVENTS. This enables a direct JOIN without relying on Snowflake-side
ID generation.

Grouped structure: The 24 raw OPC variables are grouped into state_features (12 tg_onchange
tags — robot state and IO signals) and kinematic_features (12 tg_fast_1s tags — joint angles,
speeds, currents, and motion parameters). This grouping reflects the OPC collection rate boundary and
makes the ML signal type explicit for both Greengrass (inference) and Snowflake (training). The same
grouping is applied identically in the Snowflake ML_LANDING PAYLOAD — see §6.3.
{
"event_ts": "2026-03-06T10:00:00.000Z",
"asset_path": "tmt/banpho/welding/deckline_zone/w_rbt_651",
"machine_type": "kawasaki_robot_controller",
"feature_version": "v1.0",
"feature_event_id": "TBD",
"data": {
"state_features": {
"error_log": "",
"error_status": 0,
"execution_program": "PRG_001",
"io_internal_signal": false,
"io_robot_cycle_start": true,
"io_robot_during_tip_forming": false,
"io_robot_home_position": false,
"io_robot_motor_power_on": true,
"io_robot_operation_completed": false,
"motor_power_state": 1,
"operating_mode": 2,
"program_execution_state": 1
},
"kinematic_features": {
"joint_angle_value": [12.1, -19.0, -32.3, 0.0, -76.8, 16.0, 75.4],
"joint_command_value": [12.1, -19.0, -32.3, 0.0, -76.8, 16.0, 75.4],
"joint_deviation": [0.01, 0.02, 0.01, 0.00, 0.03, 0.01, 0.02],
"joint_encoder_value": [12.1, -19.0, -32.3, 0.0, -76.8, 16.0, 75.4],
"joint_speed_value": [0.5, 0.3, 0.2, 0.1, 0.4, 0.2, 0.1],
"monitor_speed": 0.4,
"motor_current": [18.6, 14.2, 12.1, 8.4, 11.3, 9.7, 7.2],
"motor_current_cmd_arms": [20.0, 15.0, 13.0, 9.0, 12.0, 10.0, 8.0],
"program_accuracy": 100.0,
"program_always_speed": 100.0,
"program_speed": 100.0,
"transformation_value": [277.2, 2428.8, 874.8, 0.0, 0.0, 0.0, 0]
}
},
"quality": "GOOD",
"quality_detail": { "stale_tags": [], "bad_tags": [] },
"source": "HighByte"
}


7) /ml — inference result

Topic: tmt/banpho/welding/{line}/{machine_id}/ml

Source: ml instance — written by Pipeline ML

Result (#4) after ml/decide received Trigger: After each Greengrass inference cycle completes
feature_event_id: Included in the /ml payload to enable direct JOIN between the inference result
and the originating feature event in Snowflake ML_LANDING.FEATURE_EVENTS. See §4.4 open decision
for ID generation options.
{
"event_ts": "2026-03-06T10:00:00.001Z",
"asset_path": "tmt/banpho/welding/deckline_zone/w_rbt_651",
"machine_type": "kawasaki_robot_controller",
"data": {
"inference_ts": "2026-03-06T10:00:00.001Z",
"model_name": "robot_health_classifier",
"model_version": "v1.0",
"prediction": "DEFECT_RISK",
"score": 0.87,
"threshold": 0.75,
"decision": "ALERT",
"feature_event_id": "TBD"
},
"quality": "GOOD",
"quality_detail": { "stale_tags": [], "bad_tags": [] },
"source": "HighByte"
}


Typical Consumers

Pipeline C supports enterprise consumers such as:

  • Real-time dashboards

  • Alerting / notification services

  • Event-driven applications

  • Integration services that require near real-time state and decision visibility

Consumers subscribe to only the topics they need.

Relationship to Other Pipelines

  • Pipeline B (Snowflake):

    Pipeline C is real-time; Pipeline B is historical and replayable. They are complementary.

  • Pipeline J (On-Prem Inference):

    Inference results are fed back into Pipeline C as decisions/events, enabling real-time operational visibility and response.

What Pipeline C Does Not Do

To avoid misuse:

  • Not a historical store

  • Not a replay mechanism

  • Not the feature engineering layer

  • Not a replacement for Snowflake or historians

Pipeline C is strictly real-time enterprise distribution.

Governance & Change Control

  • The UNS backbone is strictly governed

  • Topic branches beyond {cell} are:

    • Allowed to evolve

    • Must be documented (topic catalogue)

    • Should follow controlled versioning when payload contracts change

  • Publishing rights are controlled to avoid multiple “truth publishers”

Summary

Pipeline C establishes a governed enterprise UNS backbone ({enterprise}/{site}/{area}/{line}/{cell}) for real-time MQTT distribution, while allowing flexible branching for evolving operational, analytics-adjacent, and AI decision use cases.