Skip to content

BSS Agent Pub/Sub Pattern Implementation

Overview

The BSS Agent v2 implements multiple pub/sub patterns based on operation criticality and requirements, following the ABS Platform's comprehensive MQTT topic conventions.

Pub/Sub Pattern Selection by Criticality

🔴 Critical Sequencing: call/rtrn Pattern

Used for: Operations requiring guaranteed responses and strict sequencing - Payment status verification (financial transactions) - Service activation/deactivation (safety-critical operations) - Asset allocation confirmation (resource management)

🟡 Weak Coupling: emit/echo Pattern

Used for: Asynchronous operations with eventual consistency - Service intent notifications (customer journey) - Usage reporting to billing systems (accounting) - Location action triggers (physical operations)

🔵 Informational: meta/stat Pattern

Used for: Configuration, monitoring, and telemetry data - Agent health monitoring (stat/) - Service configuration updates (meta/) - Performance metrics reporting (stat/)

⚡ Real-time Commands: cmd/dt Pattern

Used for: Direct device/system control and data transfer - IoT device commands (cmd/) - Sensor data ingestion (dt/) - Fleet coordination signals (cmd/)

External Requests by Pub/Sub Pattern

🔴 Critical Operations: call/rtrn Pattern

1. Payment Status Verification

Function: checkOdooPaymentStatus() (when stale data) - Pattern: call/rtrn - guaranteed response required - Call Topic: call/abs/payment_query/{planId}/status_check - Return Topic: rtrn/abs/payment_query/{planId}/status_response - Timeout: 5 seconds (financial transaction) - Retries: 3 attempts - Criticality: Financial verification requires guaranteed response

🟡 Medium Operations: emit/echo Pattern

2. Service Intent Signal Emission

Function: emitServiceIntentSignal() (W2 Workflow) - Pattern: emit/echo - eventual consistency acceptable - Emit Topic: emit/abs/service_intent/{customerId}/{targetLocationId} - Echo Topic: echo/abs/service_intent/{customerId}_{targetLocationId}/confirmed - Timeout: 30 seconds - Purpose: Customer journey notifications

3. Fleet Allocation Signal

Function: sendAssetAllocationSignal() (W4 Workflow) - Pattern: emit/echo - asynchronous ARM coordination - Emit Topic: emit/abs/fleet_allocation/{targetFleet}/{sequenceType} - Echo Topic: echo/abs/fleet_allocation/{targetFleet}/completed - Timeout: 300 seconds (ARM processing time) - Purpose: Asset allocation with ARM

4. Odoo Usage Reporting

Function: reportServiceUsageToOdoo() - Pattern: emit/echo - billing system integration - Emit Topic: emit/abs/service_usage/{servicePlanId}/{usageType} - Echo Topic: echo/abs/service_usage/{servicePlanId}/{usageType}/billing_processed - Timeout: 60 seconds - Purpose: Usage-based billing triggers

5. Location Action Triggers

Function: triggerLocationActions() - Pattern: emit/echo - physical operations coordination - Emit Topic: emit/abs/location_actions/{locationId}/request - Echo Topic: echo/abs/location_actions/{locationId}/completed - Timeout: 45 seconds - Purpose: Physical location operations

🔵 Informational Operations: meta/stat Pattern

6. Agent Health Monitoring

Function: reportAgentHealth() - Pattern: stat/ - no response expected - Stat Topic: stat/abs/agent_health/{planId}/metrics - Purpose: Performance monitoring and telemetry - Data: Execution metrics, sync status, health score

Implementation Details

Critical Design Principle: NO BLOCKING WAITS

The Emit/Echo pattern MUST NOT use blocking waits for echo responses. Instead, it uses event-driven continuation patterns:

// ❌ WRONG: Blocking wait for echo
// const response = await mqttClient.emitAndWaitForEcho(...);

// ✅ CORRECT: Emit and continue, handle echo separately
mqttClient.emit(emitPayload);
// Agent continues immediately with current state
return { signals: ['REQUEST_EMITTED'], metadata: {...} };

// Echo handled separately when received:
// Topic: echo/abs/fleet_allocation/{fleetId}/completed
// → Triggers new agent execution with echo data

Event-Driven State Management

  1. Emit Phase: Agent emits request and returns immediately
  2. State Persistence: Current state saved with correlation tracking
  3. Echo Phase: When echo arrives, triggers new agent execution
  4. State Continuation: Agent resumes from saved state with echo data

Correlation ID State Tracking

Correlation IDs enable stateless echo handling:

// Emit phase - save pending operation state
const correlationId = `fleet_${targetFleet}_${customerId}_${Date.now()}`;
const pendingState = {
  correlation_id: correlationId,
  operation: 'FLEET_ALLOCATION',
  original_request: requestData,
  timestamp: new Date().toISOString(),
  timeout_at: new Date(Date.now() + 300000).toISOString()
};

// Store in agent_state.pending_operations[correlationId]
// Return immediately without blocking

Echo Processing Pattern

// When echo received via MQTT listener:
// Topic: echo/abs/fleet_allocation/{fleetId}/completed
const handleFleetAllocationEcho = async (echoPayload) => {
  const correlationId = echoPayload.correlation_id;
  const pendingOp = agent_state.pending_operations[correlationId];

  if (pendingOp && !isExpired(pendingOp)) {
    // Process echo and update state
    const result = processFleetAllocationResponse(echoPayload);

    // Trigger new agent execution with echo results
    await executeAgent({
      action: 'PROCESS_FLEET_ALLOCATION_ECHO',
      echo_data: result,
      original_request: pendingOp.original_request
    });

    // Clean up pending operation
    delete agent_state.pending_operations[correlationId];
  }
};

Weak Coupling Benefits

  1. Fault Tolerance: External system failures don't block agent execution
  2. Eventual Consistency: Systems synchronize when available
  3. Non-Blocking: Agent continues processing while awaiting responses
  4. Scalability: Supports asynchronous processing patterns

Correlation ID Pattern

All emit/echo pairs use correlation IDs for request tracking:

const correlationId = `${operation}_${planId}_${timestamp}`;

Timeout Strategy

  • Location Actions: 30 seconds (physical operations)
  • ARM Fleet Allocation: 300 seconds (asset resolution complexity)
  • Odoo Billing: 60 seconds (billing processing)
  • Odoo Payment Queries: 10 seconds (quick status lookup)

Echo Topic Structure

Following consistent pattern: echo/{application}/{context}/{identifier}/{result_type}

MQTT Topic Summary by Pattern

🔴 Critical: call/rtrn Topics

# Call Topics (ABS → External Systems)
call/abs/payment_query/{planId}/status_check

# Return Topics (External Systems → ABS)
rtrn/abs/payment_query/{planId}/status_response

🟡 Medium: emit/echo Topics

# Emit Topics (ABS → External Systems)
emit/abs/service_intent/{customerId}/{locationId}
emit/abs/fleet_allocation/{fleetId}/{sequenceType}
emit/abs/service_usage/{planId}/{usageType}
emit/abs/location_actions/{locationId}/request

# Echo Topics (External Systems → ABS)
echo/abs/service_intent/{customerId}_{locationId}/confirmed
echo/abs/fleet_allocation/{fleetId}/completed
echo/abs/service_usage/{planId}/{usageType}/billing_processed
echo/abs/location_actions/{locationId}/completed

🔵 Informational: stat Topics

# Stat Topics (ABS → Monitoring Systems)
stat/abs/agent_health/{planId}/metrics

Pattern Selection Criteria

Pattern Criticality Response Required Timeout Use Cases
call/rtrn 🔴 Critical Guaranteed 3-15s Payment verification, safety operations
emit/echo 🟡 Medium Expected 30-300s Customer journey, billing, coordination
stat 🔵 Info No N/A Health monitoring, telemetry
meta 🔵 Info No N/A Configuration updates

Mock Implementation Notes

Current implementation includes mock responses for development/testing: - All external requests simulate successful delivery - Echo responses are mocked for immediate testing - Real MQTT client integration comments provided for production deployment

Production Implementation

MQTT Listener Integration

For production deployment, the MQTT listener must be configured to route echo responses to agent execution:

// MQTT Listener Configuration Example
const echoTopicHandlers = {
  'echo/abs/fleet_allocation/+/completed': (topic, payload) => {
    const fleetId = extractFleetIdFromTopic(topic);
    const correlationId = payload.correlation_id;

    // Trigger new agent execution for echo processing
    executeAgent({
      plan_id: payload.service_plan_id,
      action: 'PROCESS_FLEET_ALLOCATION_ECHO',
      echo_data: payload
    });
  },

  'echo/abs/service_usage/+/+/billing_processed': (topic, payload) => {
    executeAgent({
      plan_id: extractPlanIdFromTopic(topic),
      action: 'PROCESS_ODOO_BILLING_ECHO',
      echo_data: payload
    });
  }
};

State Persistence Requirements

  1. Pending Operations Storage: Agent state must persist pending_operations between executions
  2. Correlation ID Tracking: Enable echo responses to find original requests
  3. Timeout Management: Clean up expired pending operations
  4. State Consistency: Ensure agent state updates are atomic

Timeout and Cleanup Strategy

// Timeout cleanup handler (separate process)
const cleanupExpiredOperations = async () => {
  const expiredOps = findExpiredPendingOperations();

  for (const op of expiredOps) {
    await executeAgent({
      plan_id: op.service_plan_id,
      action: 'HANDLE_ECHO_TIMEOUT',
      timeout_data: op
    });
  }
};

// Run cleanup periodically
setInterval(cleanupExpiredOperations, 60000); // Every minute

Architecture Compliance

Weak Coupling: External systems communicate via MQTT only ✅ Eventual Consistency: No blocking external calls ✅ Fault Tolerance: Agent continues operation during external system failures ✅ Scalability: Supports asynchronous, distributed processing ✅ Federation Principles: Clean separation between ABS and external systems

This implementation ensures the BSS Agent maintains high availability while integrating with external systems through loosely coupled, event-driven architecture.