RRabbitMQ Handbook

ORTA

Prefetch & Consumer Scaling

Bir queue'dan mesaj okurken iki kritik sorunun cevabını bilmeniz gerekir: her consumer'a aynı anda kaç mesaj gönderilecek (prefetch) ve kaç consumer çalışacak (scaling).

Kod örneği görünümü Bu sayfadaki eşleşen örnekleri seçilen istemciye göre gösterir.
7.1 Prefetch (QoS) Kaç mesaj aynı anda? Memory vs throughput global: true/false 7.2 Slow Consumer Neden mesaj birikir? Round-robin tuzağı Çözüm stratejileri 7.3 Scaling Patterns Competing consumers Single active Consumer priority 7.4 K8s Auto-Scale KEDA + queue depth HPA custom metrics Pod lifecycle

7.1 Prefetch (QoS) Nedir?

Prefetch Count, RabbitMQ'nun bir consumer'a ack almadan göndereceği maksimum mesaj sayısıdır. Bu bir "buffer" gibi çalışır — consumer ack gönderdikçe broker buffer'ı yeniden doldurur.

RabbitMQ Broker — Queue Kuyrukta bekleyen mesajlar In-flight (prefetch=3) ① Broker prefetch kadar mesajı push eder ② Ack gelene kadar yeni mesaj göndermez push Consumer (Your App) 1 işleniyor, 2 buffer'da ③ İşlem bitti → Ack gönder ④ Broker 1 yeni mesaj push eder ack Özet Prefetch ↑ = Throughput ↑ Prefetch ↑ = Memory ↑ Prefetch ↑ = Dengesiz dağılım ↑ Altın kural: işlem süresiyle ters orantılı tut
Kullan Kullanma
Her consumer için BasicQosAsync çağır Prefetch'i hiç ayarlama (default=0 → unlimited)
İşlem süresine göre prefetch değerini belirle Tüm queue'lara aynı prefetch ver
global: false kullan (per-consumer) global: true kullan (quorum queue desteklemez)
Monitoring ile prefetch'i tune et Bir kez ayarla, bir daha bakma

global: false vs global: true

Parametre Davranış Quorum Queue Kullanım
global: false Her consumer kendi prefetch limit'ine sahip Destekler Her zaman bunu kullan
global: true Channel'daki TÜM consumer'lar toplam limit'i paylaşır Desteklemez Legacy — kullanma

Junior Hatası: global: true ile quorum queue kullanırsanız, RabbitMQ sessizce ignore eder — hata almaz ama prefetch çalışmaz. Tüm mesajlar limitsiz push edilir ve consumer memory patlar.

Prefetch Değeri Seçim Rehberi

İşlem Tipi Önerilen Prefetch Neden Gerçek Hayat
Hızlı işlem (<10ms) 50-100 Pipeline'ı dolu tut, RTT overhead azalt Cache invalidation, log forwarding
Orta işlem (10-100ms) 10-30 Yeterli buffer, memory kontrollü DB write + notification gönderimi
Yavaş işlem (>100ms) 1-5 Az mesaj = dengeli dağılım PDF generation, video transcode
Değişken süre 5-10 Orta yol, slow consumer'ı aç bırakmaz 3rd party API calls (rate limit var)
Batch processing 50-200 Toplu commit/flush verimli olsun Elasticsearch bulk indexing

Prefetch Hesaplama Formülü

Tahmini bir tablo yerine, sisteminize özel prefetch değerini hesaplamak daha güvenilirdir. Aşağıdaki formül, consumer'ın boşta kalmamasını (throughput) VE memory'nin şişmemesini (güvenlik) dengeler:

İdeal Prefetch Hesaplama (3 Adım) ADIM 1: Throughput Formülü (pipeline dolu kalsın) Prefetch(throughput) = RTT / T(process) RTT = ağ gecikme (ms) T = işlem süresi (ms) Consumer ack gönderip yeni mesaj alana kadar boş kalmasın ADIM 2: Memory Koruması (RAM şişmesin) Prefetch(memory) = MaxRAM / AvgMsgSize MaxRAM = consumer'a ayırdığın buffer Msg = ort. mesaj In-flight mesajlar toplam RAM'i aşmasın ADIM 3: Final Değer (ikisinin minimumu) Prefetch = MIN(throughput, memory) × 0.75 × 0.75 = güvenlik marjı (spike'lara karşı) Örnek: E-ticaret Sipariş İşleme Değişkenler: RTT (broker↔consumer ağ): 2ms (aynı cluster) T(process): 50ms (DB write + cache invalidate) AvgMsgSize: 4KB (JSON sipariş payload) MaxRAM (buffer): 50MB (pod'un 256MB'ından) Hesaplama: ① Throughput: 2ms / 50ms = 0.04 → MIN 1 (çok düşük!) RTT çok düşük → throughput formülü faydasız Bu durumda: concurrent processing hedefle ② Memory: 50MB / 4KB = 12,500 (çok yüksek!) ③ Pratik: 20-30 arası (CPU-bound limit) Pratik Kural (RTT düşükse formül yetersiz kalır) Prefetch = MIN( TargetConcurrency, MemoryBudget/MsgSize ) × 0.75 TargetConcurrency = aynı anda kaç mesajı paralel işleyebilirsin (CPU/IO bound'a göre)

Formülün 3 bileşeni:

Bileşen Formül Ne Korur Nasıl Ölçersin
Throughput RTT ÷ ProcessingTime Consumer boşta kalmasın ping rabbitmq-host → RTT; stopwatch ile T(process)
Memory MaxRAMBuffer ÷ AvgMessageSize Consumer OOM olmasın Pod memory limit × %20 = buffer; ortalama mesaj boyutu
Fairness QueueDepth ÷ ConsumerCount Yük dengeli dağılsın Management API: messages_ready / consumers
Final MIN(hepsi) × 0.75 Spike'lara karşı marj

Önemli: RTT çok düşükse (aynı cluster, <5ms) throughput formülü anlamsız sonuç verir (0-1 arası). Bu durumda TargetConcurrency yaklaşımını kullanın: "Bu consumer aynı anda kaç iş yapabilir?" sorusunun cevabı prefetch'iniz olur.

Pratik karar ağacı:

İşlemin CPU-bound mu, IO-bound mu?
│
├─ CPU-bound (image resize, crypto, parse):
│   └─ Prefetch = vCPU sayısı × 2
│       Örnek: 2 vCPU pod → prefetch = 4
│
├─ IO-bound (DB call, HTTP API, file write):
│   └─ Prefetch = concurrent IO kapasitesi × 0.75
│       Örnek: DB connection pool = 20 → prefetch = 15
│
└─ Mixed (DB + hesaplama):
    └─ Prefetch = MIN(vCPU×2, connPool) × 0.75
        Örnek: 4 vCPU, pool=10 → MIN(8,10) × 0.75 = 6

Memory güvenlik kontrolü (her zaman uygula):

MaxPrefetch = (Pod Memory Limit × 0.20) ÷ AvgMessageSize
Eğer hesaplanan prefetch > MaxPrefetch → MaxPrefetch kullan

Örnek:
  Pod limit: 512MB
  Buffer (%20): 102MB
  Avg mesaj: 8KB
  MaxPrefetch: 102MB ÷ 8KB = 12,750 → sorun yok, memory darboğaz değil

Ama eğer mesajlar büyükse (video metadata, 500KB):
  MaxPrefetch: 102MB ÷ 500KB = 204 → bu limit'e dikkat!
Senaryo CPU IO Pool Mesaj Hesaplama → Prefetch
Sipariş işleme 2 vCPU DB pool=10 4KB MIN(4, 10) × 0.75 3
Email gönderim 1 vCPU SMTP pool=5 2KB MIN(2, 5) × 0.75 2-4
Log forwarding 2 vCPU Elastic bulk 1KB Throughput: batch=100 100
PDF generation 4 vCPU File IO 50KB CPU-bound: 4×2 × 0.75 6
Payment (bank API) 2 vCPU HTTP pool=3 3KB MIN(4, 3) × 0.75 2
Video transcode 8 vCPU Disk IO 200KB CPU-bound: 8×1 (ağır) 4-8

Gerçek hayat senaryosu: SaaS multi-tenant notification servisi: Her tenant farklı boyutta payload gönderiyor (1KB - 200KB). Sabit prefetch=50 ile büyük payload'lu tenant'lar memory spike yapıyordu. Çözüm: prefetch = MIN(30, 50MB / avgMsgSizeFromLastHour) — her saat ortalama mesaj boyutuna göre dinamik prefetch ayarı. Monitoring'den gelen metric ile runtime'da BasicQosAsync tekrar çağrılabilir.

Anti-Pattern: "Daha yüksek prefetch = daha hızlı" düşüncesi — Prefetch=1000 ayarlayan ekipler genelde şu sorunları yaşar: (1) Consumer crash'inde 1000 mesaj requeue olur → duplicate storm, (2) Slow consumer'da 1000 mesaj stuck kalır → diğerleri aç, (3) Memory spike → OOM kill → daha fazla requeue. Hesaplamaya güvenin, hırs yapmayın.

Gerçek hayat senaryosu: Fintech ödeme sistemi: her ödeme 50-150ms (banka API). Prefetch=3 ile consumer'da en fazla 3 concurrent ödeme. Neden düşük? Çünkü banka rate-limit uyguluyor ve hata durumunda retry queue'ya gidecek mesaj sayısı kontrollü kalıyor.

.NET — Prefetch Ayarı
// ✅ DOĞRU: Per-consumer prefetch
await channel.BasicQosAsync(
    prefetchSize: 0,      // byte limit yok (genelde 0 bırakılır)
    prefetchCount: 20,    // max 20 unacked message per consumer
    global: false);       // ⚠️ HER ZAMAN false — quorum queue'lar global:true desteklemez

// Consumer başlat
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (_, ea) =>
{
    try
    {
        await ProcessMessageAsync(ea);
        await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
    }
    catch (Exception ex)
    {
        // requeue: false → DLX'e gider (retry pattern için)
        await channel.BasicNackAsync(ea.DeliveryTag, multiple: false, requeue: false);
    }
};

await channel.BasicConsumeAsync(
    queue: "payment-processing",
    autoAck: false,       // ⚠️ autoAck: true ise prefetch anlamsız — mesaj anında ack'lenir
    consumer: consumer);
.NET — Yanlış Prefetch Kullanımları
// ❌ YANLIŞ 1: Prefetch hiç ayarlanmamış (default = 0 = unlimited)
// Broker tüm mesajları bir anda push eder → memory patlar
var consumer = new AsyncEventingBasicConsumer(channel);
await channel.BasicConsumeAsync("orders", autoAck: false, consumer: consumer);
// ↑ BasicQosAsync çağrılmadı → prefetch = 0 (unlimited)!

// ❌ YANLIŞ 2: autoAck: true ile prefetch — boşuna ayar
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 10, global: false);
await channel.BasicConsumeAsync("orders", autoAck: true, consumer: consumer);
// ↑ autoAck=true mesaj gelir gelmez ack gönderir, prefetch limiti hiçbir işe yaramaz

// ❌ YANLIŞ 3: global: true + quorum queue
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 50, global: true);
// ↑ Quorum queue'lar global QoS'u desteklemez, sessizce ignore edilir

7.2 Slow Consumer Problemi

RabbitMQ mesajları round-robin dağıtır: sırayla her consumer'a 1 mesaj. Ama eğer bir consumer yavaşsa ve prefetch dolmuşsa, broker o consumer'a yeni mesaj göndermez — mesaj diğerlerine yönlenir. Sorun: prefetch yüksek ayarlandığında yavaş consumer'ın buffer'ında çok mesaj birikir ve diğer consumer'lar bunlara erişemez.

❌ Yüksek Prefetch + Slow Consumer = Mesaj Birikimi Queue 3 bekliyor (queue dolu görünüyor ama consumer'larda 20 mesaj var) Consumer 1 (hızlı, 5ms) Prefetch=10: sadece 1 mesaj var, hemen bitiyor ✓ Consumer 2 (yavaş, 2000ms) Prefetch=10: 10 mesaj stuck! İşlem 2sn sürüyor Consumer 3 (hızlı) — boşta bekliyor! ✅ Çözüm: Düşük Prefetch Prefetch = 2 ile aynı senaryo: • Consumer 2'de max 2 mesaj birikir • Kalan mesajlar Consumer 1 ve 3'e gider • Toplam throughput: yavaş consumer'a bağımlı değil Kural: İşlem süresi değişkense prefetch'i EN YAVAŞ senaryoya göre ayarla veya consumer sayısını artır
Belirti Sebep Çözüm
Queue ready count düşük ama throughput kötü Mesajlar consumer buffer'larında stuck Prefetch'i düşür
Bir pod CPU %100, diğerleri idle Round-robin + yüksek prefetch = dengesiz yük Prefetch'i düşür + pod artır
consumer_utilisation metriği < %50 Consumer yeterince hızlı ack gönderemiyor Prefetch artır (hızlı işlemlerde)
unacked count sürekli yüksek Consumer crash veya çok yavaş Consumer health check + prefetch düşür

Anti-Pattern: Prefetch = 0 (unlimited) — Broker tüm mesajları consumer'a push eder. 100K mesajlık bir queue'da consumer başlatırsanız, 100K mesaj anında consumer memory'sine yüklenir. OOM kill garantili. Her zaman bir prefetch değeri belirleyin.

Gerçek hayat senaryosu: E-ticaret sipariş işleme: Siparişlerin %90'ı 20ms'de biter (stok kontrolü), %10'u 3sn sürer (3rd party kargo API timeout). Prefetch=50 ile bir consumer'a 50 "yavaş sipariş" geldiğinde, o consumer 2.5 dakika cevap veremez. Prefetch=3'e düşürüldüğünde, yavaş siparişler max 3'er tane dağılır ve diğer consumer'lar hızlı siparişleri işlemeye devam eder.


7.3 Consumer Scaling Patterns

Tek consumer yetmediğinde sistemi ölçeklendirmenin üç ana pattern'i vardır. Her birinin farklı garanti ve trade-off'ları vardır.

Competing Consumers Queue →→→ Consumer A Consumer B Consumer C Round-robin dağılım Sıra garantisi YOK ✓ Max throughput ✓ Yatay ölçekleme ✗ Ordering kaybı Single Active Consumer Queue Active ✓ Standby Standby Tek consumer aktif Düşerse sonraki devralır ✓ Sıra garantisi ✓ Auto failover ✗ Throughput sınırlı Consumer Priority Queue →→ Priority=10 ✓ Priority=5 Priority=1 Yüksek priority dolu olunca düşüğe geçer ✓ Preferred routing ✓ Graceful degradation ✗ Starvation riski
Pattern Ne Zaman Kullan Ne Zaman Kullanma Gerçek Hayat
Competing Consumers Sıra önemsiz, throughput kritik Aynı entity'nin mesajlarının sıralı işlenmesi gerektiğinde E-ticaret: sipariş email bildirimleri (sıra önemsiz)
Single Active Consumer Strict ordering, active-passive failover High throughput gereken yerlerde (tek consumer darboğaz) Fintech: hesap bakiye güncelleme (sıra kritik)
Consumer Priority Preferred datacenter/node, cost optimization Consumer sayısı az ve eşit yük dağılımı gerektiğinde Multi-region: local DC'ye öncelik, uzak DC fallback

Single Active Consumer + Quorum Queue: Bu pattern sadece quorum queue'larda güvenilir çalışır. Classic mirrored queue'larda network partition sırasında iki consumer aynı anda aktif olabilir (split-brain).

.NET — Competing Consumers (Paralel İşleme)
// Her pod'da aynı kodu çalıştırın — RabbitMQ round-robin dağıtır
await channel.BasicQosAsync(
    prefetchSize: 0,
    prefetchCount: 20,
    global: false);

var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (_, ea) =>
{
    await ProcessMessageAsync(ea);
    await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
};

await channel.BasicConsumeAsync(
    queue: "email-sending",
    autoAck: false,
    consumer: consumer);
// 5 pod deploy edin → 5 competing consumer → throughput 5x
.NET — Single Active Consumer
// Queue declare with single active consumer
var args = new Dictionary<string, object?>
{
    ["x-queue-type"] = "quorum",
    ["x-single-active-consumer"] = true  // sadece 1 consumer aktif
};

await channel.QueueDeclareAsync("sequential-orders", durable: true,
    exclusive: false, autoDelete: false, arguments: args);

// Tüm pod'lar consume eder ama sadece biri aktif olur
// Aktif consumer düşerse, sonraki consumer otomatik devralır (failover ~5-10sn)
.NET — Consumer Priority
// Yüksek priority consumer (local datacenter pod'ları)
var localArgs = new Dictionary<string, object?>
{
    ["x-priority"] = 10  // yüksek → önce bu consumer'a gider
};
await channel.BasicConsumeAsync("notifications", autoAck: false,
    arguments: localArgs, consumer: localConsumer);

// Düşük priority consumer (remote datacenter — fallback)
var remoteArgs = new Dictionary<string, object?>
{
    ["x-priority"] = 1   // düşük → local dolunca buraya gelir
};
await channel.BasicConsumeAsync("notifications", autoAck: false,
    arguments: remoteArgs, consumer: remoteConsumer);

Gerçek hayat senaryosu: Chat uygulaması notification servisi: Normal saatlerde 3 pod (Competing Consumers, prefetch=30) yeterli. Black Friday'de queue depth artınca KEDA 3→12 pod'a scale eder. Her pod aynı consumer kodu çalıştırır, RabbitMQ otomatik round-robin yapar.


7.4 Kubernetes'te Auto-Scaling

Seviye notu: Bu alt bölüm ileri seviye kapsamındadır. K8s ve Prometheus bilgisi gerektirir. Önce Clustering ve Node Discovery ile Monitoring ve Alerting konularını okumanız önerilir.

Queue derinliği arttığında otomatik pod eklenmesini (scale-out) ve azaldığında pod silinmesini (scale-in) sağlayan iki ana araç: KEDA (event-driven, sıfıra scale) ve HPA custom metrics (Prometheus adapter ile).

RabbitMQ Queue depth: 5000 messages_ready ↑ artıyor... metric KEDA ScaledObject queueLength: 100 5000/100 = 50 pods scale Deployment Pod 1 Pod 2 Pod 3 Pod 4 ✚ Pod 5 ✚ Pod 6 ✚ Scale-in Güvenliği Queue boşaldığında: 1. Pod yeni mesaj almayı durdurur 2. In-flight mesajları bitirir 3. Ack gönderir 4. Graceful shutdown ⚠️ terminationGracePeriod en yavaş mesajdan uzun olmalı
Araç Avantaj Dezavantaj Ne Zaman
KEDA Sıfıra scale, event-driven, RabbitMQ trigger built-in Cluster'a KEDA kurulumu gerekir Event-driven workload, maliyet optimizasyonu
HPA + Prometheus K8s native, ek kurulum az Sıfıra scale edemez (min=1), metric pipeline karmaşık Sürekli trafik olan servisler
Manual replica Basit, tahmin edilebilir Trafik spike'larına tepki yok Dev/staging, sabit yüklü servisler
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: order-processor-scaler
  namespace: production
spec:
  scaleTargetRef:
    name: order-processor          # Deployment adı
  minReplicaCount: 2               # Min pod (0 yapılabilir ama cold-start riski)
  maxReplicaCount: 50              # Max pod (kaynak limitlerine dikkat)
  cooldownPeriod: 60               # Scale-in bekleme süresi (saniye)
  pollingInterval: 15              # Metric kontrol sıklığı (saniye)
  triggers:
    - type: rabbitmq
      metadata:
        host: "amqp://user:[email protected]:5672/"  # Secret kullanın!
        queueName: order-processing
        mode: QueueLength          # QueueLength veya MessageRate
        value: "100"               # Her 100 mesaj için 1 pod
      authenticationRef:
        name: rabbitmq-credentials  # TriggerAuthentication resource
// Program.cs — IHostedService ile graceful shutdown
public class RabbitConsumerService : BackgroundService
{
    private IChannel? _channel;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _channel = await _connection.CreateChannelAsync();
        await _channel.BasicQosAsync(0, 20, false);

        var consumer = new AsyncEventingBasicConsumer(_channel);
        consumer.ReceivedAsync += async (_, ea) =>
        {
            // stoppingToken ile graceful check
            if (stoppingToken.IsCancellationRequested)
            {
                // Yeni mesaj almayı durdur, mevcut mesajı bitir
                await _channel.BasicNackAsync(ea.DeliveryTag, false, requeue: true);
                return;
            }

            await ProcessOrderAsync(ea, stoppingToken);
            await _channel.BasicAckAsync(ea.DeliveryTag, false);
        };

        await _channel.BasicConsumeAsync("orders", false, consumer,
            cancellationToken: stoppingToken);

        // Token iptal olana kadar bekle
        await Task.Delay(Timeout.Infinite, stoppingToken)
            .ContinueWith(_ => { }, TaskScheduler.Default);
    }

    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        // In-flight mesajların bitmesini bekle
        if (_channel is not null)
        {
            await _channel.CloseAsync();  // pending ack'ler gönderilir
        }
        await base.StopAsync(cancellationToken);
    }
}
Kubernetes — Pod Spec (terminationGracePeriod)
apiVersion: apps/v1
kind: Deployment
metadata:
  name: order-processor
spec:
  template:
    spec:
      terminationGracePeriodSeconds: 120   # En yavaş mesajın işlenme süresinden UZUN olmalı
      containers:
        - name: consumer
          image: myapp/order-processor:latest
          resources:
            requests:
              memory: "256Mi"
              cpu: "250m"
            limits:
              memory: "512Mi"
              cpu: "1000m"
          lifecycle:
            preStop:
              exec:
                command: ["sh", "-c", "sleep 5"]  # LB'nin pod'u listeden çıkarması için

Anti-Pattern: terminationGracePeriod < işlem süresi — Pod silinirken in-flight mesajlar kesilir. Mesaj requeue olur ve başka consumer'a gider ama idempotent değilse duplicate processing oluşur. Grace period'u her zaman en yavaş mesajın 2x süresine ayarlayın.

Gerçek hayat senaryosu: SaaS video transcoding servisi: Her video 30-180sn arası. KEDA queueLength: 5 ile her 5 video için 1 pod ekler. terminationGracePeriodSeconds: 300 (5dk) ile en yavaş video bile tamamlanır. Gece 0 pod'a düşer (maliyet ~%70 tasarruf). minReplicaCount: 0 + cooldownPeriod: 300 ile cold-start kabul edilebilir.


Karar Matrisi: Prefetch + Consumer + Scaling

Senaryo → Konfigürasyon Eşleştirmesi Senaryo Prefetch Consumer Pattern Scaling Örnek Hızlı + Yüksek hacim 50-100 Competing (N pod) KEDA queueLength:200 Log ingestion Yavaş + Değişken süre 3-5 Competing (N pod) KEDA queueLength:10 3rd party API call Sıralı işlem gerekli 1 Single Active Manual (min=2 HA) Bakiye güncelleme Spike trafikli + maliyet 10-30 Competing (0-N pod) KEDA min:0 + cooldown Video transcode ❌ prefetch=0 + autoAck ← Bu kombinasyonlardan KAÇININ: memory patlar, mesaj kaybolur

Karar Tablosu

Soru Cevap A → Config A Cevap B → Config B
İşlem süresi? <10ms Prefetch: 50-100 >100ms Prefetch: 1-5
Sıra önemli mi? Hayır Competing Consumers Evet Single Active Consumer
Trafik tahmini? Sabit HPA/manual Spike'lı KEDA (min:0 mümkün)
Mesaj kaybedilebilir mi? Hayır autoAck:false + prefetch Evet (log vs.) autoAck:true + yüksek prefetch
Cost-sensitive? Hayır min:3-5 her zaman çalışsın Evet KEDA min:0, cooldown:300

Gerçek hayat senaryosu: E-ticaret Black Friday hazırlığı: Normal gün 3 pod (prefetch=20, Competing). KEDA queueLength:50 → her 50 mesaj için 1 pod. Black Friday: queue depth 10K → KEDA 200 pod'a çıkarır (maxReplica ile sınırla!). Gece 02:00 queue boşalır → cooldown sonrası 3 pod'a döner. Toplam maliyet: sadece trafik olduğunda ödeme.