ABS Platform Messaging & Queue Architecture¶
1. Overview¶
This document defines the messaging architecture for the ABS Platform, focusing on how Z-Agents, customers, and IoT assets communicate through a hybrid MQTT + embedded queue system.
2. Architecture Intent¶
2.1 Core Principles¶
- MQTT for Transport: All external communications use MQTT with GATT topic naming
- Embedded Queues: Simple FIFO buffers within each Service_Plan object
- Alternating Processing: Fair distribution between external and internal messages
- Service_Plan Isolation: Each plan manages its own message queues independently
2.2 High-Level Flow¶
External Entities (IoT/ovAPP) → MQTT Broker → Message Handler → Service_Plan Queues → Event Tick Processor → Z-Agents
3. GATT Topic Naming Convention¶
3.1 Topic Structure¶
gatt/{domain}/{service_plan_id}/{entity_type}/{entity_id}/{intent}/{action}
3.2 Domain Examples¶
gatt/abs/- Asset-Based Subscription (general)gatt/bss/- Battery Swap Servicegatt/crs/- Charger Rental Service
3.3 Entity Types¶
customer- Customer interactions (ovAPP)asset- IoT asset communicationsagent- Z-Agent communicationsservice_plan- Service plan eventssystem- System-level events
3.4 Intent Categories¶
request- Request-response patternresponse- Response to requestssignal- State change signalsnotification- Informational messagescommand- Direct commands
3.5 Example Topics¶
# Customer requests battery swap
gatt/abs/bss-plan-001/customer/cust-123/request/battery_swap
# Asset reports ready status
gatt/abs/bss-plan-001/asset/battery-456/signal/ready_for_swap
# Payment agent signals quota exceeded
gatt/abs/bss-plan-001/agent/payment-agent-001/signal/quota_exceeded
# Internal agent communication
gatt/abs/bss-plan-001/agent/payment-agent-001/agent/quota-agent-002/request/check_quota
4. Service_Plan Queue Structure¶
4.1 Queue Types¶
Each Service_Plan contains two FIFO message buffers:
External Queue: - Purpose: Messages from external entities (customers, IoT assets) - Source: MQTT messages filtered by Service_Plan ID - Processing: Handled by event tick processor
Internal Queue: - Purpose: Messages between Z-Agents within the same Service_Plan - Source: Agent-generated messages, system events - Processing: Handled by event tick processor
4.2 Queue Characteristics¶
- Type: Simple FIFO buffers (arrays)
- Persistence: Embedded within Service_Plan object
- Size Limits: Configurable per Service_Plan
- Overflow Behavior: Drop oldest messages (FIFO)
- Processing: Alternating read (external → internal → external...)
4.3 Suggested Data Structure¶
// Conceptual structure - implementation details left to developers
interface ServicePlanMessaging {
// Queue Buffers
external_queue: GATTMessage[];
internal_queue: GATTMessage[];
// Configuration
max_buffer_size: number;
priority: 'low' | 'median' | 'high'; // DEFAULT: 'median'
// Processing State
processing_round: 'external' | 'internal';
last_processed_tick: number;
ticks_since_last_process: number;
interrupt_pending: boolean;
// Statistics
external_messages_processed: number;
internal_messages_processed: number;
external_messages_dropped: number;
internal_messages_dropped: number;
processing_time_ms: number;
}
5. Message Handler Architecture¶
5.1 Responsibilities¶
- MQTT Subscription: Listen to relevant GATT topics
- Message Filtering: Route messages to appropriate Service_Plan
- Queue Management: Push messages to external or internal queues
- Topic Parsing: Parse GATT topics to extract routing information
5.2 Message Routing Logic¶
1. Parse GATT topic to extract service_plan_id
2. Determine if message is external or internal
3. Find corresponding Service_Plan
4. Push to appropriate queue (external or internal)
5. Handle queue overflow if necessary
5.3 External vs Internal Classification¶
External Messages:
- Topics containing customer/ or asset/
- Messages from IoT devices
- Messages from ovAPP
Internal Messages:
- Topics containing agent/ or service_plan/
- Messages between Z-Agents
- System-generated messages
6. Event Tick Processing¶
6.1 Processing Pattern¶
Clock Tick (100ms) → Check Service_Plan Queues → Process Messages → Throttle → Next Tick
Processing Logic: - Clock-Driven: System clock provides base 100ms tick interval - Queue-Driven: Only process Service_Plans with unprocessed messages - Throttled: Each Service_Plan gets processing opportunity per tick - Priority-Based: Different tick frequencies based on Service_Plan priority
6.2 Processing Steps¶
- Clock Tick: System clock triggers every 100ms
- Queue Check: Identify Service_Plans with unprocessed messages
- Priority Filter: Apply priority-based processing frequency
- Message Processing: Process messages from both queues (alternating)
- Throttling: Ensure fair distribution across all Service_Plans
- Cleanup: Update statistics and state
6.3 Priority-Based Processing¶
// Conceptual priority configuration - implementation details flexible
enum ServicePlanPriority {
LOW = 'low', // Process every 3rd tick (300ms)
MEDIAN = 'median', // Process every tick (100ms) - DEFAULT
HIGH = 'high' // Process every tick (100ms) + interrupt capability
}
interface ServicePlanTickConfig {
priority: ServicePlanPriority;
last_processed_tick: number;
ticks_since_last_process: number;
interrupt_pending: boolean;
}
6.4 Error Handling¶
- Processing Failures: Log error, continue with next message
- Queue Empty: Skip processing for that round
- Agent Errors: Handle gracefully, don't block queue processing
- Clock Interrupts: Handle forced processing for high-priority Service_Plans
7. Z-Agent Integration¶
7.1 Message Publishing¶
Z-Agents can publish messages to internal queue:
// Conceptual example - implementation details flexible
class PaymentAgent {
async signalQuotaExceeded(): Promise<void> {
const message = {
topic: `gatt/abs/${this.servicePlanId}/agent/${this.id}/signal/quota_exceeded`,
payload: { current_usage: this.state.credits_used },
priority: 'high'
};
// Publish to internal queue
await this.publishToInternalQueue(message);
}
}
7.2 Message Consumption¶
Z-Agents receive messages through event tick processing:
// Conceptual example - implementation details flexible
class QuotaAgent {
async handleMessage(message: GATTMessage): Promise<void> {
if (message.topic.includes('/payment/')) {
await this.handlePaymentSignal(message);
}
}
}
8. Event Tick System¶
8.1 Clock-Driven Processing¶
The event tick system uses a clock-driven approach with intelligent queue management:
// Conceptual tick system - implementation details flexible
class EventTickProcessor {
private clockInterval: number = 100; // 100ms base tick
private globalTickCounter: number = 0;
private servicePlans: Map<string, ServicePlan> = new Map();
startClock(): void {
setInterval(() => {
this.globalTickCounter++;
this.processAllServicePlans();
}, this.clockInterval);
}
private processAllServicePlans(): void {
for (const [planId, servicePlan] of this.servicePlans) {
if (this.shouldProcessServicePlan(servicePlan)) {
this.processServicePlanMessages(servicePlan);
}
}
}
private shouldProcessServicePlan(servicePlan: ServicePlan): boolean {
// Check if Service_Plan has unprocessed messages
const hasMessages = servicePlan.messaging.external_queue.length > 0 ||
servicePlan.messaging.internal_queue.length > 0;
if (!hasMessages) return false;
// Apply priority-based processing frequency
const ticksSinceLastProcess = this.globalTickCounter - servicePlan.messaging.last_processed_tick;
switch (servicePlan.messaging.priority) {
case 'low':
return ticksSinceLastProcess >= 3; // Every 300ms
case 'median':
return ticksSinceLastProcess >= 1; // Every 100ms (default)
case 'high':
return ticksSinceLastProcess >= 1 || servicePlan.messaging.interrupt_pending;
default:
return ticksSinceLastProcess >= 1;
}
}
}
8.2 Interrupt-Driven Processing¶
High-priority Service_Plans can trigger immediate processing:
// Conceptual interrupt system - implementation details flexible
class ServicePlanInterruptManager {
triggerInterrupt(servicePlanId: string): void {
const servicePlan = this.getServicePlan(servicePlanId);
if (servicePlan && servicePlan.messaging.priority === 'high') {
servicePlan.messaging.interrupt_pending = true;
// Force immediate processing
this.processServicePlanImmediately(servicePlan);
}
}
private processServicePlanImmediately(servicePlan: ServicePlan): void {
// Process messages immediately, bypassing clock tick
this.processServicePlanMessages(servicePlan);
servicePlan.messaging.interrupt_pending = false;
}
}
8.3 Throttling and Fair Distribution¶
The system ensures fair processing across all Service_Plans:
// Conceptual throttling logic - implementation details flexible
class ServicePlanThrottler {
private maxProcessingTimePerTick: number = 50; // 50ms max per Service_Plan per tick
private totalProcessingTime: number = 0;
private processServicePlanMessages(servicePlan: ServicePlan): void {
const startTime = Date.now();
// Process messages with time limit
while (this.hasUnprocessedMessages(servicePlan) &&
(Date.now() - startTime) < this.maxProcessingTimePerTick) {
this.processNextMessage(servicePlan);
}
// Update processing statistics
servicePlan.messaging.last_processed_tick = this.globalTickCounter;
servicePlan.messaging.processing_time_ms = Date.now() - startTime;
}
}
9. Configuration Guidelines¶
9.1 Queue Size Limits¶
Suggested Defaults: - External Queue: 1000 messages per Service_Plan - Internal Queue: 500 messages per Service_Plan - Configurable: Per Service_Plan based on expected traffic
9.2 Event Tick Configuration¶
Suggested Defaults: - Base Clock Frequency: Every 100ms - Priority Processing Frequencies: - LOW: Every 300ms (3 ticks) - MEDIAN: Every 100ms (1 tick) - DEFAULT - HIGH: Every 100ms (1 tick) + interrupt capability - Max Processing Time: 50ms per Service_Plan per tick - Configurable: Per Service_Plan based on business requirements
9.3 MQTT Configuration¶
- QoS Levels: Use QoS 1 for reliable delivery
- Retain Messages: Disable for real-time messaging
- Clean Sessions: Enable for fresh state management
10. Monitoring & Observability¶
10.1 Key Metrics¶
- Queue Sizes: Current messages in external/internal queues
- Processing Rates: Messages processed per second
- Drop Rates: Messages dropped due to overflow
- Processing Latency: Time from queue entry to processing
10.2 Health Checks¶
- Queue Health: Monitor for stuck or overflowing queues
- Processing Health: Monitor for failed message processing
- MQTT Health: Monitor connection and subscription status
11. Implementation Considerations¶
11.1 Technology Choices¶
- MQTT Broker: Existing infrastructure (Mosquitto/AWS IoT)
- Queue Storage: In-memory arrays within Service_Plan objects
- Persistence: Service_Plan serialization includes queue state
- Processing: Synchronous or asynchronous based on requirements
11.2 Scalability Considerations¶
- Horizontal Scaling: Each Service_Plan is independent
- Vertical Scaling: Adjust queue sizes and processing frequency
- Load Distribution: MQTT topics naturally distribute load
11.3 Security Considerations¶
- MQTT Authentication: Use existing authentication mechanisms
- Topic Authorization: Validate topic access per Service_Plan
- Message Validation: Validate message structure and content
12. Migration Strategy¶
12.1 Phase 1: Foundation¶
- Implement basic queue structure in Service_Plan
- Create MQTT message handler
- Implement event tick processor
12.2 Phase 2: Integration¶
- Integrate with existing Z-Agents
- Add message publishing capabilities
- Implement monitoring and metrics
12.3 Phase 3: Optimization¶
- Tune queue sizes and processing frequency
- Add advanced error handling
- Implement performance optimizations
13. Success Criteria¶
13.1 Functional Requirements¶
- ✅ Messages are routed to correct Service_Plan queues
- ✅ Alternating processing between external and internal queues
- ✅ Queue overflow handled gracefully
- ✅ Z-Agents can publish and consume messages
13.2 Performance Requirements¶
- ✅ Message processing latency < 500ms
- ✅ Queue overflow rate < 1%
- ✅ System handles expected message volume
- ✅ No message loss under normal conditions
13.3 Operational Requirements¶
- ✅ Monitoring and alerting in place
- ✅ Error handling and recovery mechanisms
- ✅ Configuration management for queue parameters
- ✅ Documentation for operations team
Note: This document provides architectural intent and design guidance. Implementation details, specific code patterns, and technology choices are left to the development team based on their expertise and project requirements.