Skip to content

MQTT Topic Conventions - Developer Guide

Overview

This document provides comprehensive MQTT topic conventions for ABS Platform developers. It consolidates and standardizes all messaging patterns used across the platform, ensuring consistency with the existing EMQX deployment.

Quick Reference

Core Message Types

Type Purpose Pattern Example
dt IoT Data Transfer dt/{dom}/{app}/{rt}/{fl}/{it} dt/arm/bms/ph-mnl1/F7/DEV42
cmd IoT Commands cmd/{dom}/{app}/{rt}/{fl}/{it} cmd/arm/bms/ph-mnl1/F7/DEV42
emit Service Events emit/{dom}/{svc}/{res}/{id}/{evt} emit/abs/service/plan/P123/request
echo Response Confirmation echo/{dom}/{svc}/{res}/{id}/{evt} echo/abs/service/plan/P123/confirmed
call RPC Commands call/{dom}/{svc}/{res}/{id}/{verb} call/abs/payment/plan/P123/process
meta Schema/Config meta/{dom}/{app}/{rt}/{fl}/{it} meta/arm/bms/ph-mnl1/F7/DEV42

1. IoT Device Communication

1.1 Topic Structure

dt/{domain}/{app}/{region}/{fleet}/{item}     # Data transfer
cmd/{domain}/{app}/{region}/{fleet}/{item}    # Commands
meta/{domain}/{app}/{region}/{fleet}/{item}   # Metadata (retained)
state/{domain}/{app}/{region}/{fleet}/{item}  # State shadows (optional)

1.2 Component Definitions

Component Description Examples Notes
domain Business domain arm, abs, ems Asset Relations Management, ABS Platform, Energy Management
app Firmware/app class bms, ssc, gps Battery Management System, Swap Station Controller, GPS tracker
region Geographic/network region ph-mnl1, us-west1 Philippines Manila 1, US West 1
fleet Fleet identifier F7, FLEET_A Short, fixed-length preferred
item Device/asset ID DEV42, BAT001 Unique within fleet, short preferred

1.3 IoT Payload Structure

Data Transfer (dt/) Messages:

{
  "sid": "bms.v1",              // Schema identifier
  "ts": 1730182103,             // Unix timestamp (seconds)
  "seq": 412,                   // Monotonic sequence number
  "d": {                        // Data payload
    "v": 51.72,                 // Voltage
    "t": 32.1,                  // Temperature  
    "soc": 78                   // State of charge
  }
}

Command (cmd/) Messages:

{
  "sid": "cmd.v1",              // Command schema
  "ts": 1730182103,             // Timestamp
  "req": "lock",                // Command verb
  "id": "c-9b7f",               // Correlation ID
  "arg": {"hold_s": 30}         // Command arguments
}

Metadata (meta/) Messages (Retained):

{
  "schema_id": "bms.v1",
  "rev": 3,
  "fields": {
    "v": {"unit": "V", "desc": "Pack voltage", "type": "f32", "min": 0, "max": 120},
    "t": {"unit": "C", "desc": "Pack temperature", "type": "f32", "min": -40, "max": 85},
    "soc": {"unit": "%", "desc": "State of Charge", "type": "u8", "min": 0, "max": 100}
  },
  "wire": ["json", "cbor"]
}

1.4 IoT Examples

# Battery pack telemetry
Topic: dt/arm/bms/ph-mnl1/F7/BAT001
Payload: {"sid":"bms.v1","ts":1730182103,"seq":412,"d":{"v":51.72,"t":32.1,"soc":78}}

# Swap station command
Topic: cmd/arm/ssc/ph-mnl1/F7/STN001  
Payload: {"sid":"cmd.v1","ts":1730182103,"req":"unlock","id":"c-9b7f","arg":{"door_id":1}}

# Schema definition (retained)
Topic: meta/arm/bms/ph-mnl1/F7/BAT001
Payload: {"schema_id":"bms.v1","rev":3,"fields":{...}}

2. Service-to-Service Communication

2.1 Topic Structure

emit/{domain}/{service}/{resource}/{id}/{event}    # Events/requests
echo/{domain}/{service}/{resource}/{id}/{event}    # Confirmations/responses  
call/{domain}/{service}/{resource}/{id}/{verb}     # RPC calls

2.2 Component Definitions

Component Description Examples Notes
domain Business domain abs, arm, odoo Platform/service area
service Service name payment, service, asset Business capability
resource Resource type plan, invoice, battery Entity being operated on
id Resource identifier P123, INV-456, BAT789 Specific instance
event Event/action type request, confirmed, failed What happened

2.3 Service Payload Structure

Standard Service Message:

{
  "sid": "evt.v1",              // Schema version
  "ts": 1730182103,             // Unix timestamp
  "trace": "abc-123",           // Trace ID for distributed tracing
  "correlation_id": "corr-456", // For request/response pairing
  "d": {                        // Event-specific data
    "amount": 199.0,
    "currency": "USD",
    "status": "completed"
  }
}

2.4 Emit/Echo Pattern

The emit/echo pattern provides reliable request-response semantics:

Request (emit):

Topic: emit/abs/service/plan/P123/access_request
Payload: {
  "sid": "evt.v1",
  "ts": 1730182103,
  "trace": "trace-123",
  "correlation_id": "req-456",
  "d": {
    "customer_id": "CUST001",
    "station_id": "STN001",
    "requested_action": "battery_swap"
  }
}

Response (echo):

Topic: echo/abs/service/plan/P123/access_granted  
Payload: {
  "sid": "evt.v1",
  "ts": 1730182104,
  "trace": "trace-123", 
  "correlation_id": "req-456",
  "d": {
    "access_granted": true,
    "session_id": "sess-789",
    "expires_at": "2025-01-01T13:00:00Z"
  }
}

2.5 Service Examples

# Payment processing
Topic: emit/odoo/payment/invoice/INV-12345/process
Payload: {"sid":"evt.v1","ts":1730182103,"trace":"abc-123","d":{"amount":199.0,"ccy":"USD"}}

# Payment confirmation
Topic: echo/odoo/payment/invoice/INV-12345/completed
Payload: {"sid":"evt.v1","ts":1730182104,"trace":"abc-123","d":{"status":"paid","ref":"PAY-456"}}

# Service state change
Topic: emit/abs/fsm/plan/P123/state_transition
Payload: {"sid":"evt.v1","ts":1730182103,"d":{"from":"INITIAL","to":"ACTIVE","trigger":"CONTRACT_SIGNED"}}

3. ABS Platform Specific Conventions

3.1 ABS Message Envelope

All ABS platform messages MUST use this standardized envelope:

{
  "timestamp": "2025-01-01T12:00:00Z",    // ISO 8601 timestamp
  "plan_id": "bss-plan-001",              // Service plan ID
  "tenant_id": "tenant-xyz",              // Multi-tenant ID
  "correlation_id": "corr-123",           // Request correlation
  "idempotency_key": "idem-abc",          // Idempotency protection
  "actor": {                              // Who initiated this
    "type": "customer|agent|system",
    "id": "cust-123"
  },
  "data": {                               // Command/event specific payload
    // Event-specific data here
  }
}

3.2 ABS Topic Patterns

3.2.1 Customer Interactions

# Customer service request
emit/abs/customer/plan/{plan_id}/service_request
echo/abs/customer/plan/{plan_id}/service_granted

# Customer payment
emit/abs/customer/plan/{plan_id}/payment_submitted  
echo/abs/customer/plan/{plan_id}/payment_confirmed

3.2.2 Agent Communications

# Agent calculations
emit/abs/agent/{agent_type}/{plan_id}/calculation_request
echo/abs/agent/{agent_type}/{plan_id}/calculation_result

# Agent state changes
emit/abs/agent/{agent_type}/{plan_id}/state_changed

3.2.3 FSM Events

# FSM transitions
emit/abs/fsm/{cycle}/{plan_id}/transition
echo/abs/fsm/{cycle}/{plan_id}/transition_applied

# FSM outputs
emit/abs/fsm/{cycle}/{plan_id}/output_signal

3.3 ABS Domain Mapping

ABS Domain MQTT Domain Description
Battery Swap Service bss Battery swap operations
Charger Rental Service crs Charger rental operations
Fleet Management fms Fleet tracking and management
Energy Management ems Energy trading and optimization
Asset Relations arm IoT asset management

4. MQTT v5 Properties Usage

4.1 Standard Properties

Property Usage Example
message-expiry-interval Message TTL 300 (5 minutes)
response-topic RPC response route echo/abs/service/plan/P123/response
correlation-data Request correlation Binary correlation ID

4.2 User Properties

Property Purpose Example Required
trace_id Distributed tracing trace-abc-123 Recommended
tenant Multi-tenancy tenant-xyz If applicable
application Source application abs-platform Recommended
priority Message priority high\|normal\|low Optional

4.3 Example MQTT v5 Usage

// Publishing with properties
client.publish('emit/abs/service/plan/P123/request', payload, {
  qos: 1,
  properties: {
    messageExpiryInterval: 300,
    responseTopic: 'echo/abs/service/plan/P123/response',
    correlationData: Buffer.from('corr-123'),
    userProperties: {
      trace_id: 'trace-abc-123',
      tenant: 'tenant-xyz',
      application: 'abs-platform'
    }
  }
});

5. Load Balancing and Scaling

5.1 Shared Subscriptions

Use MQTT v5 shared subscriptions for load balancing:

# Load balanced service processing
$share/abs-workers/emit/abs/+/+/+/+

# Load balanced agent processing  
$share/agent-pool/emit/abs/agent/+/+/+

# Load balanced payment processing
$share/payment-workers/emit/+/payment/+/+/+

5.2 Subscription Patterns

Pattern Purpose Example
+ Single level wildcard emit/abs/+/plan/P123/+
# Multi-level wildcard emit/abs/service/#
$share/{group}/ Load balancing $share/workers/emit/abs/+/+/+/+

6. Quality of Service (QoS)

6.1 QoS Guidelines

Message Type QoS Level Rationale
IoT telemetry (dt/) 0 or 1 Frequent, loss tolerable
IoT commands (cmd/) 1 or 2 Critical, must arrive
Service events (emit/) 1 Important, delivery required
Service responses (echo/) 1 Responses must arrive
RPC calls (call/) 1 or 2 Critical operations
Metadata (meta/) 1 + retain Configuration, must persist

6.2 Retention Guidelines

Topic Type Retain Reason
meta/ Yes Schema definitions needed by new subscribers
state/ Yes Last known state for new subscribers
dt/ No Streaming data, latest only
emit/ No Events, not state
echo/ No Responses, not state

7. Error Handling and Dead Letter Queues

7.1 Error Topics

# Processing errors
ops/error/{original_domain}/{original_service}/{error_type}

# Dead letter queue
ops/dlq/{original_domain}/{original_service}

# System alerts
ops/alert/{severity}/{component}

7.2 Error Payload Structure

{
  "error_id": "err-123",
  "timestamp": "2025-01-01T12:00:00Z",
  "original_topic": "emit/abs/service/plan/P123/request",
  "original_payload": "...",
  "error": {
    "type": "ValidationError",
    "message": "Invalid plan_id format",
    "code": "INVALID_PLAN_ID"
  },
  "retry_count": 3,
  "max_retries": 5
}

8. Developer Implementation Notes

8.1 Topic Builder Usage

DO NOT hand-craft topics. Use the provided topic builder:

import { topicForCommand } from '../core/messaging';

const topic = topicForCommand({
  kind: 'emit',
  application: 'ABS',
  context: 'BSS',
  recipientId: `plan-${planId}`,
  command: 'VALIDATE_SERVICE_ACCESS'
});
// Result: emit/ABS/BSS/plan-123/service_access

8.2 Message Validation

Always validate messages using the envelope validator:

import { validateEnvelope } from '../core/messaging';

const validation = validateEnvelope(incomingMessage);
if (!validation.valid) {
  throw new Error(`Invalid message: ${validation.errors.join(', ')}`);
}

8.3 Correlation Tracking

For request-response patterns, always include correlation IDs:

// Request
const correlationId = generateUUID();
await client.publish(emitTopic, {
  ...payload,
  correlation_id: correlationId
}, {
  properties: {
    responseTopic: echoTopic,
    correlationData: Buffer.from(correlationId)
  }
});

// Response handler  
client.on('message', (topic, payload, packet) => {
  const correlationId = packet.properties?.correlationData?.toString();
  // Match with pending requests
});

9. Migration Notes

9.1 Legacy GATT Format

OLD (deprecated):

gatt/abs/bss-plan-001/customer/cust-123/request/service_access

NEW (current):

emit/abs/customer/plan/P123/service_access

9.2 Compatibility Period

During migration, both formats may be supported temporarily. New development MUST use the new format.


10. Quick Reference Examples

10.1 Common Patterns

# Battery swap request
emit/abs/service/plan/P123/swap_request
echo/abs/service/plan/P123/swap_granted

# Payment processing
emit/odoo/payment/invoice/INV-456/process  
echo/odoo/payment/invoice/INV-456/completed

# IoT battery data
dt/arm/bms/ph-mnl1/F7/BAT001

# IoT station command
cmd/arm/ssc/ph-mnl1/F7/STN001

# Agent calculation
emit/abs/agent/payment/P123/check_quota
echo/abs/agent/payment/P123/quota_result

10.2 Subscription Examples

# Monitor all ABS service events
emit/abs/service/+/+/+

# Monitor specific plan
emit/abs/+/plan/P123/+

# Monitor all payments
emit/+/payment/+/+/+

# Load balanced processing
$share/workers/emit/abs/+/+/+/+

This guide provides comprehensive topic conventions for the ABS Platform. Always refer to this document when implementing MQTT messaging to ensure consistency across the platform.