modra40

rabbitmq-master

by @modra40 in Tools
0
0
# Install this skill:
npx skills add mOdrA40/claude-codex-skills-directory --skill "rabbitmq-master"

Install specific skill from multi-skill repository

# Description

|

# SKILL.md


name: rabbitmq-master
description: |
Ultimate RabbitMQ expertise skill for production-grade message broker architecture, implementation, and operations. Top 0.01% knowledge covering: (1) Advanced messaging patterns - Dead Letter Exchanges, Delayed Messages, Priority Queues, Consistent Hash Exchange, Sharding, (2) High Availability - Clustering, Quorum Queues, Stream Queues, Federation, Shovel, (3) Performance Engineering - prefetch tuning, connection pooling, batch publishing, memory optimization, flow control, (4) Security - TLS/mTLS, OAuth2, LDAP, certificate rotation, (5) Monitoring - Prometheus metrics, custom health checks, anomaly detection, (6) Troubleshooting - memory alarms, network partitions, queue backlogs, consumer starvation, (7) Multi-tenancy - vhost design, resource limits, isolation patterns, (8) Event-driven architectures - CQRS, Event Sourcing, Saga patterns with RabbitMQ. Use when: building messaging systems, debugging RabbitMQ issues, optimizing performance, designing HA architectures, implementing advanced patterns, production hardening, capacity planning, migration strategies.


RabbitMQ Master Skill

Expert-level RabbitMQ knowledge for building bulletproof messaging systems.

Quick Reference

Connection Best Practices

# WRONG - Connection per message (kills performance)
def send_bad(msg):
    conn = pika.BlockingConnection(params)  # 7-way TCP handshake + AMQP handshake
    ch = conn.channel()
    ch.basic_publish(...)
    conn.close()

# CORRECT - Connection pooling with heartbeat
import pika
from pika import ConnectionParameters, PlainCredentials

params = ConnectionParameters(
    host='rabbitmq.prod',
    port=5672,
    credentials=PlainCredentials('user', 'pass'),
    heartbeat=60,                    # Detect dead connections
    blocked_connection_timeout=300,  # Handle flow control
    connection_attempts=3,
    retry_delay=5,
    socket_timeout=10,
    stack_timeout=15,
    # CRITICAL: TCP keepalive untuk cloud/NAT environments
    tcp_options={'TCP_KEEPIDLE': 60, 'TCP_KEEPINTVL': 10, 'TCP_KEEPCNT': 3}
)

# Use connection pool - see scripts/connection_pool.py

Channel Best Practices

# Channels are NOT thread-safe - use 1 channel per thread
# Channels are cheap - create many, but not per message

# OPTIMAL: Dedicated channels per purpose
publish_channel = conn.channel()
publish_channel.confirm_delivery()  # Enable publisher confirms

consume_channel = conn.channel()
consume_channel.basic_qos(prefetch_count=50)  # Tuned prefetch

Core Patterns

1. Reliable Publishing (Publisher Confirms)

# Synchronous confirms (simple, slower)
channel.confirm_delivery()
try:
    channel.basic_publish(
        exchange='orders',
        routing_key='new',
        body=json.dumps(order),
        properties=pika.BasicProperties(
            delivery_mode=2,           # Persistent
            content_type='application/json',
            message_id=str(uuid4()),   # Idempotency key
            timestamp=int(time.time()),
            headers={'retry_count': 0}
        ),
        mandatory=True  # Return if unroutable
    )
except pika.exceptions.UnroutableError:
    handle_unroutable()
except pika.exceptions.NackError:
    handle_nack()

# Asynchronous confirms (complex, 10x faster) - see scripts/async_publisher.py

2. Reliable Consuming

def callback(ch, method, properties, body):
    try:
        # ALWAYS process idempotently using message_id
        if is_duplicate(properties.message_id):
            ch.basic_ack(method.delivery_tag)
            return

        process_message(body)
        mark_processed(properties.message_id)
        ch.basic_ack(method.delivery_tag)

    except RecoverableError as e:
        # Requeue with exponential backoff via DLX
        retry_count = (properties.headers or {}).get('retry_count', 0)
        if retry_count < MAX_RETRIES:
            republish_with_delay(ch, body, retry_count + 1)
            ch.basic_ack(method.delivery_tag)  # Ack original
        else:
            ch.basic_nack(method.delivery_tag, requeue=False)  # To DLQ

    except FatalError:
        # Permanent failure - dead letter immediately
        ch.basic_nack(method.delivery_tag, requeue=False)

channel.basic_qos(prefetch_count=50)  # CRITICAL - tune this!
channel.basic_consume(queue='orders', on_message_callback=callback)

3. Dead Letter Exchange Pattern

# DLX captures: rejected, expired, queue-full messages
channel.exchange_declare('dlx.exchange', 'direct', durable=True)
channel.queue_declare('dlq.orders', durable=True)
channel.queue_bind('dlq.orders', 'dlx.exchange', 'orders')

# Main queue with DLX
channel.queue_declare(
    'orders',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'dlx.exchange',
        'x-dead-letter-routing-key': 'orders',
        'x-message-ttl': 86400000,     # 24h max age
        'x-max-length': 1000000,       # Max 1M messages
        'x-overflow': 'reject-publish-dlx'  # DLX on overflow
    }
)

4. Delayed/Scheduled Messages

# Method 1: Plugin (rabbitmq_delayed_message_exchange)
channel.exchange_declare(
    'delayed.exchange',
    'x-delayed-message',
    arguments={'x-delayed-type': 'direct'}
)

channel.basic_publish(
    exchange='delayed.exchange',
    routing_key='scheduled',
    body=payload,
    properties=pika.BasicProperties(
        headers={'x-delay': 60000}  # 60 seconds delay
    )
)

# Method 2: TTL + DLX chain (no plugin needed) - see references/patterns.md

5. Priority Queues

# CAUTION: Priority queues have overhead, use sparingly
channel.queue_declare(
    'priority.orders',
    durable=True,
    arguments={
        'x-max-priority': 10,  # 1-10 priorities, keep low!
        'x-queue-type': 'classic'  # Not supported on quorum
    }
)

# Publishing with priority
channel.basic_publish(
    exchange='',
    routing_key='priority.orders',
    body=payload,
    properties=pika.BasicProperties(
        delivery_mode=2,
        priority=8  # Higher = more important
    )
)

High Availability

# Raft-based replication - ALWAYS use for critical queues
channel.queue_declare(
    'orders.quorum',
    durable=True,
    arguments={
        'x-queue-type': 'quorum',
        'x-quorum-initial-group-size': 3,  # Replicas
        'x-delivery-limit': 5,             # Auto-DLQ after 5 redeliveries
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-strategy': 'at-least-once'  # Safe DLQ
    }
)

Stream Queues (High-throughput, replay)

# Kafka-like streams in RabbitMQ 3.9+
channel.queue_declare(
    'events.stream',
    durable=True,
    arguments={
        'x-queue-type': 'stream',
        'x-max-length-bytes': 20_000_000_000,  # 20GB retention
        'x-max-age': '7D',                      # 7 days retention
        'x-stream-max-segment-size-bytes': 500_000_000
    }
)

# Consuming from offset
channel.basic_qos(prefetch_count=100)
channel.basic_consume(
    'events.stream',
    callback,
    arguments={
        'x-stream-offset': 'first'  # first|last|next|timestamp|offset
    }
)

Performance Tuning

Prefetch Optimization Formula

optimal_prefetch = (avg_processing_time_ms / avg_network_rtt_ms) * consumer_count * 1.5

Examples:
- Same datacenter (1ms RTT), 50ms processing, 1 consumer: (50/1) * 1 * 1.5 = 75
- Cross-region (50ms RTT), 50ms processing, 1 consumer: (50/50) * 1 * 1.5 = 2
- Batch processing (500ms), local: (500/1) * 1 * 1.5 = 750

Batch Publishing (10x throughput)

# Single publish: ~2000 msg/s
# Batch publish: ~20000+ msg/s

def batch_publish(channel, messages, batch_size=100):
    channel.confirm_delivery()

    for i in range(0, len(messages), batch_size):
        batch = messages[i:i+batch_size]
        for msg in batch:
            channel.basic_publish(
                exchange='batch.exchange',
                routing_key=msg['key'],
                body=msg['body'],
                properties=pika.BasicProperties(delivery_mode=2)
            )
        # Confirm entire batch
        channel.wait_for_confirms(timeout=30)

Memory Management

%% rabbitmq.conf - Production settings
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.75
disk_free_limit.absolute = 5GB

%% Queue memory limits
queue_index_embed_msgs_below = 4096
lazy_queue_explicit_gc_run_operation_threshold = 1000

%% Flow control tuning
credit_flow_default_credit = {400, 200}

Monitoring & Alerting

Critical Metrics

# Prometheus alerts - see references/monitoring.md for full config
- alert: RabbitMQHighMemory
  expr: rabbitmq_process_resident_memory_bytes / rabbitmq_resident_memory_limit_bytes > 0.8

- alert: RabbitMQQueueBacklog
  expr: rabbitmq_queue_messages_ready > 100000

- alert: RabbitMQConsumerUtilization
  expr: rabbitmq_queue_consumer_utilisation < 0.5  # Consumers idle = problem

- alert: RabbitMQUnackedMessages
  expr: rabbitmq_queue_messages_unacknowledged > 10000

- alert: RabbitMQDiskAlarm
  expr: rabbitmq_alarms_free_disk_space_watermark == 1

Health Check Script

# See scripts/health_check.sh for complete implementation
rabbitmqctl node_health_check
rabbitmqctl cluster_status
rabbitmq-diagnostics check_port_connectivity
rabbitmq-diagnostics check_running
rabbitmq-diagnostics check_local_alarms

Anti-Patterns to Avoid

Anti-Pattern Problem Solution
Connection per message 1000x overhead Connection pool
No prefetch (unlimited) Memory explosion Tune prefetch_count
auto_ack=True Message loss Manual ack after processing
Classic queues for HA Split-brain risk Use Quorum queues
Polling with basic_get CPU waste, latency Use basic_consume
Giant messages (>128KB) Memory pressure External storage + reference
No message TTL Queue bloat Set x-message-ttl
Unbounded queue growth Disk/memory full x-max-length + overflow policy

File Reference

  • scripts/connection_pool.py - Production-grade connection pooling
  • scripts/async_publisher.py - High-throughput async publisher with confirms
  • scripts/consumer_template.py - Robust consumer with retry logic
  • scripts/health_check.sh - Comprehensive health check script
  • scripts/queue_migrate.py - Zero-downtime queue migration tool
  • scripts/dlq_processor.py - Dead letter queue reprocessing
  • references/patterns.md - Advanced messaging patterns deep-dive
  • references/clustering.md - HA clustering configuration
  • references/security.md - Security hardening guide
  • references/monitoring.md - Full monitoring setup
  • references/troubleshooting.md - Problem diagnosis guide
  • references/performance.md - Performance tuning deep-dive
  • assets/rabbitmq.conf - Production configuration template
  • assets/docker-compose.yml - Development cluster setup
  • assets/k8s/ - Kubernetes deployment manifests

# Supported AI Coding Agents

This skill is compatible with the SKILL.md standard and works with all major AI coding agents:

Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.