Skip to content

Queue Implementation Guide

1. Overview

This document provides architectural guidance for implementing the embedded FIFO queue system within Service_Plan objects. The queues handle both external messages (from IoT devices and ovAPP) and internal messages (between Z-Agents).

2. Queue Architecture

2.1 Service_Plan Integration

Each Service_Plan object contains embedded queue structures:

// Conceptual structure - implementation details flexible
interface ServicePlanMessaging {
  // Queue Buffers
  external_queue: GATTMessage[];
  internal_queue: GATTMessage[];

  // Configuration
  max_buffer_size: number;
  priority: 'low' | 'median' | 'high'; // DEFAULT: 'median'

  // Processing State
  processing_round: 'external' | 'internal';
  last_processed_tick: number;
  ticks_since_last_process: number;
  interrupt_pending: boolean;

  // Statistics
  external_messages_processed: number;
  internal_messages_processed: number;
  external_messages_dropped: number;
  internal_messages_dropped: number;
  processing_time_ms: number;
}

2.2 Queue Characteristics

  • Type: Simple FIFO buffers (JavaScript arrays)
  • Persistence: Embedded within Service_Plan serialization
  • Size Limits: Configurable per Service_Plan
  • Overflow Behavior: Drop oldest messages (FIFO)
  • Processing: Alternating read pattern

3. Queue Operations

3.1 Message Enqueue

// Conceptual enqueue logic - implementation details flexible
class ServicePlanQueueManager {
  pushToExternalQueue(servicePlan: ServicePlan, message: GATTMessage): void {
    // Check buffer size limit
    if (servicePlan.messaging.external_queue.length >= servicePlan.messaging.max_buffer_size) {
      // Drop oldest message (FIFO behavior)
      servicePlan.messaging.external_queue.shift();
      servicePlan.messaging.external_messages_dropped++;
    }

    // Add new message to end of queue
    servicePlan.messaging.external_queue.push(message);
  }

  pushToInternalQueue(servicePlan: ServicePlan, message: GATTMessage): void {
    // Similar logic for internal queue
    if (servicePlan.messaging.internal_queue.length >= servicePlan.messaging.max_buffer_size) {
      servicePlan.messaging.internal_queue.shift();
      servicePlan.messaging.internal_messages_dropped++;
    }

    servicePlan.messaging.internal_queue.push(message);
  }
}

3.2 Message Dequeue

// Conceptual dequeue logic - implementation details flexible
class EventTickProcessor {
  private clockInterval: number = 100; // 100ms base tick
  private globalTickCounter: number = 0;

  startClock(): void {
    setInterval(() => {
      this.globalTickCounter++;
      this.processAllServicePlans();
    }, this.clockInterval);
  }

  private processAllServicePlans(): void {
    for (const [planId, servicePlan] of this.servicePlans) {
      if (this.shouldProcessServicePlan(servicePlan)) {
        this.processServicePlanMessages(servicePlan);
      }
    }
  }

  private shouldProcessServicePlan(servicePlan: ServicePlan): boolean {
    // Check if Service_Plan has unprocessed messages
    const hasMessages = servicePlan.messaging.external_queue.length > 0 || 
                       servicePlan.messaging.internal_queue.length > 0;

    if (!hasMessages) return false;

    // Apply priority-based processing frequency
    const ticksSinceLastProcess = this.globalTickCounter - servicePlan.messaging.last_processed_tick;

    switch (servicePlan.messaging.priority) {
      case 'low':
        return ticksSinceLastProcess >= 3; // Every 300ms
      case 'median':
        return ticksSinceLastProcess >= 1; // Every 100ms (default)
      case 'high':
        return ticksSinceLastProcess >= 1 || servicePlan.messaging.interrupt_pending;
      default:
        return ticksSinceLastProcess >= 1;
    }
  }

  private processServicePlanMessages(servicePlan: ServicePlan): void {
    const startTime = Date.now();
    const maxProcessingTime = 50; // 50ms max per Service_Plan per tick

    while (this.hasUnprocessedMessages(servicePlan) && 
           (Date.now() - startTime) < maxProcessingTime) {

      // Alternate between external and internal queues
      if (servicePlan.messaging.processing_round === 'external') {
        this.processExternalMessage(servicePlan);
        servicePlan.messaging.processing_round = 'internal';
      } else {
        this.processInternalMessage(servicePlan);
        servicePlan.messaging.processing_round = 'external';
      }
    }

    // Update processing statistics
    servicePlan.messaging.last_processed_tick = this.globalTickCounter;
    servicePlan.messaging.processing_time_ms = Date.now() - startTime;
    servicePlan.messaging.interrupt_pending = false;
  }

  private processExternalMessage(servicePlan: ServicePlan): void {
    const message = servicePlan.messaging.external_queue.shift();
    if (message) {
      this.routeToHandler(message, servicePlan);
      servicePlan.messaging.external_messages_processed++;
    }
  }

  private processInternalMessage(servicePlan: ServicePlan): void {
    const message = servicePlan.messaging.internal_queue.shift();
    if (message) {
      this.routeToHandler(message, servicePlan);
      servicePlan.messaging.internal_messages_processed++;
    }
  }
}

4. Configuration Guidelines

4.1 Default Configuration

// Suggested default configuration - adjustable per Service_Plan
const DEFAULT_QUEUE_CONFIG = {
  max_buffer_size: 1000,        // Messages per queue
  priority: 'median',           // Processing priority (low/median/high)
  clock_interval_ms: 100,       // Base clock tick interval
  max_processing_time_ms: 50,   // Max processing time per Service_Plan per tick
  retry_attempts: 3,            // Failed message retries
  retry_delay_ms: 1000,         // Delay between retries
};

4.2 Service_Plan Specific Configuration

// High-traffic Service_Plan (e.g., busy swap station)
const HIGH_TRAFFIC_CONFIG = {
  max_buffer_size: 2000,
  priority: 'high',              // Process every tick + interrupt capability
  max_processing_time_ms: 100,   // More processing time for busy plans
};

// Low-traffic Service_Plan (e.g., new customer)
const LOW_TRAFFIC_CONFIG = {
  max_buffer_size: 500,
  priority: 'low',               // Process every 3rd tick (300ms)
  max_processing_time_ms: 25,    // Less processing time for quiet plans
};

// Standard Service_Plan (default)
const STANDARD_CONFIG = {
  max_buffer_size: 1000,
  priority: 'median',            // Process every tick (100ms) - DEFAULT
  max_processing_time_ms: 50,    // Standard processing time
};

5. Message Routing

5.1 Handler Registration

// Conceptual handler registration - implementation details flexible
interface MessageHandler {
  id: string;
  name: string;
  canHandle(message: GATTMessage): boolean;
  handle(message: GATTMessage, servicePlan: ServicePlan): Promise<void>;
  priority: number;
}

class MessageRouter {
  private handlers: MessageHandler[] = [];

  registerHandler(handler: MessageHandler): void {
    this.handlers.push(handler);
    // Sort by priority (higher priority first)
    this.handlers.sort((a, b) => b.priority - a.priority);
  }

  async routeMessage(message: GATTMessage, servicePlan: ServicePlan): Promise<void> {
    for (const handler of this.handlers) {
      if (handler.canHandle(message)) {
        await handler.handle(message, servicePlan);
        return;
      }
    }

    // No handler found - log warning
    console.warn(`No handler found for message: ${message.topic}`);
  }
}

5.2 Agent-Specific Handlers

// Conceptual agent handler - implementation details flexible
class PaymentAgentMessageHandler implements MessageHandler {
  id = 'payment-agent-handler';
  name = 'Payment Agent Message Handler';
  priority = 100;

  canHandle(message: GATTMessage): boolean {
    return message.topic.includes('/payment/') || 
           message.intent.business_context === 'payment';
  }

  async handle(message: GATTMessage, servicePlan: ServicePlan): Promise<void> {
    // Find PaymentAgent in service plan
    const paymentAgent = servicePlan.agent_pool.find(
      agent => agent.agent_type === 'payment'
    ) as PaymentAgent;

    if (paymentAgent) {
      await paymentAgent.processEvent(message.payload);
    }
  }
}

6. Error Handling

6.1 Processing Failures

// Conceptual error handling - implementation details flexible
class EventTickProcessor {
  private async processMessageSafely(message: GATTMessage, servicePlan: ServicePlan): Promise<void> {
    try {
      await this.routeMessage(message, servicePlan);
    } catch (error) {
      console.error(`Failed to process message ${message.id}:`, error);

      // Increment error count
      servicePlan.messaging.processing_errors = 
        (servicePlan.messaging.processing_errors || 0) + 1;

      // Optionally move to dead letter queue
      if (this.shouldMoveToDeadLetter(error)) {
        await this.moveToDeadLetter(message, error);
      }
    }
  }

  private shouldMoveToDeadLetter(error: any): boolean {
    // Move to dead letter for certain error types
    return error.code === 'VALIDATION_ERROR' || 
           error.code === 'AGENT_NOT_FOUND';
  }
}

6.2 Queue Health Monitoring

// Conceptual health monitoring - implementation details flexible
interface QueueHealthMetrics {
  external_queue_size: number;
  internal_queue_size: number;
  external_processing_rate: number;
  internal_processing_rate: number;
  external_drop_rate: number;
  internal_drop_rate: number;
  processing_errors: number;
  last_processing_time: Date;
}

class QueueHealthMonitor {
  getQueueHealth(servicePlan: ServicePlan): QueueHealthMetrics {
    return {
      external_queue_size: servicePlan.messaging.external_queue.length,
      internal_queue_size: servicePlan.messaging.internal_queue.length,
      external_processing_rate: this.calculateProcessingRate(servicePlan, 'external'),
      internal_processing_rate: this.calculateProcessingRate(servicePlan, 'internal'),
      external_drop_rate: this.calculateDropRate(servicePlan, 'external'),
      internal_drop_rate: this.calculateDropRate(servicePlan, 'internal'),
      processing_errors: servicePlan.messaging.processing_errors || 0,
      last_processing_time: new Date()
    };
  }
}

7. Performance Considerations

7.1 Memory Management

  • Queue Size Limits: Prevent memory exhaustion
  • Message Cleanup: Remove processed messages immediately
  • Object Pooling: Reuse message objects when possible
  • Garbage Collection: Monitor GC impact of queue operations

7.2 Processing Optimization

  • Batch Processing: Process multiple messages per tick if possible
  • Async Processing: Use async/await for non-blocking operations
  • Priority Queues: Consider priority-based processing for urgent messages
  • Caching: Cache frequently accessed Service_Plan data

7.3 Scalability

  • Horizontal Scaling: Each Service_Plan is independent
  • Load Distribution: MQTT topics naturally distribute load
  • Resource Limits: Monitor CPU and memory usage per Service_Plan

8. Monitoring and Observability

8.1 Key Metrics

// Metrics to track - implementation details flexible
interface QueueMetrics {
  // Queue Sizes
  external_queue_size: number;
  internal_queue_size: number;

  // Processing Rates
  external_messages_per_second: number;
  internal_messages_per_second: number;

  // Error Rates
  external_drop_rate: number;
  internal_drop_rate: number;
  processing_error_rate: number;

  // Latency
  average_processing_latency_ms: number;
  max_processing_latency_ms: number;
}

8.2 Health Checks

// Health check logic - implementation details flexible
class QueueHealthChecker {
  isHealthy(servicePlan: ServicePlan): boolean {
    const metrics = this.getQueueMetrics(servicePlan);

    return (
      metrics.external_queue_size < servicePlan.messaging.max_buffer_size * 0.9 &&
      metrics.internal_queue_size < servicePlan.messaging.max_buffer_size * 0.9 &&
      metrics.processing_error_rate < 0.01 && // Less than 1% error rate
      metrics.average_processing_latency_ms < 500
    );
  }
}

9. Testing Guidelines

9.1 Unit Testing

  • Queue Operations: Test enqueue/dequeue operations
  • Overflow Handling: Test buffer size limits
  • Error Scenarios: Test processing failures
  • Statistics: Test metric collection

9.2 Integration Testing

  • MQTT Integration: Test message flow from MQTT to queues
  • Agent Integration: Test message routing to agents
  • Persistence: Test queue state persistence
  • Performance: Test under load conditions

9.3 Load Testing

  • Message Volume: Test with high message volumes
  • Concurrent Processing: Test multiple Service_Plans
  • Memory Usage: Monitor memory consumption
  • Processing Latency: Measure end-to-end latency

10. Migration Strategy

10.1 Phase 1: Basic Implementation

  • Implement queue structure in Service_Plan
  • Add basic enqueue/dequeue operations
  • Implement alternating processing

10.2 Phase 2: Integration

  • Integrate with MQTT message handler
  • Add message routing to agents
  • Implement error handling

10.3 Phase 3: Optimization

  • Add monitoring and metrics
  • Optimize performance
  • Add advanced features (priority, batching)

11. Success Criteria

11.1 Functional Requirements

  • ✅ Messages are correctly enqueued and dequeued
  • ✅ Alternating processing works correctly
  • ✅ Queue overflow is handled gracefully
  • ✅ Messages are routed to correct handlers

11.2 Performance Requirements

  • ✅ Processing latency < 500ms
  • ✅ Queue overflow rate < 1%
  • ✅ Memory usage stays within limits
  • ✅ System handles expected message volume

11.3 Operational Requirements

  • ✅ Monitoring and alerting in place
  • ✅ Error handling and recovery mechanisms
  • ✅ Configuration management
  • ✅ Documentation for operations team

Note: This document provides architectural guidance for queue implementation. Specific implementation details, code patterns, and technology choices are left to the development team based on their expertise and project requirements.