ORTA
Prefetch & Consumer Scaling
Bir queue'dan mesaj okurken iki kritik sorunun cevabını bilmeniz gerekir: her consumer'a aynı anda kaç mesaj gönderilecek (prefetch) ve kaç consumer çalışacak (scaling).
7.1 Prefetch (QoS) Nedir?
Prefetch Count, RabbitMQ'nun bir consumer'a ack almadan göndereceği maksimum mesaj sayısıdır. Bu bir "buffer" gibi çalışır — consumer ack gönderdikçe broker buffer'ı yeniden doldurur.
| Kullan | Kullanma |
|---|---|
Her consumer için BasicQosAsync çağır |
Prefetch'i hiç ayarlama (default=0 → unlimited) |
| İşlem süresine göre prefetch değerini belirle | Tüm queue'lara aynı prefetch ver |
global: false kullan (per-consumer) |
global: true kullan (quorum queue desteklemez) |
| Monitoring ile prefetch'i tune et | Bir kez ayarla, bir daha bakma |
global: false vs global: true
| Parametre | Davranış | Quorum Queue | Kullanım |
|---|---|---|---|
global: false |
Her consumer kendi prefetch limit'ine sahip | Destekler | Her zaman bunu kullan |
global: true |
Channel'daki TÜM consumer'lar toplam limit'i paylaşır | Desteklemez | Legacy — kullanma |
Junior Hatası:
global: trueile quorum queue kullanırsanız, RabbitMQ sessizce ignore eder — hata almaz ama prefetch çalışmaz. Tüm mesajlar limitsiz push edilir ve consumer memory patlar.
Prefetch Değeri Seçim Rehberi
| İşlem Tipi | Önerilen Prefetch | Neden | Gerçek Hayat |
|---|---|---|---|
| Hızlı işlem (<10ms) | 50-100 | Pipeline'ı dolu tut, RTT overhead azalt | Cache invalidation, log forwarding |
| Orta işlem (10-100ms) | 10-30 | Yeterli buffer, memory kontrollü | DB write + notification gönderimi |
| Yavaş işlem (>100ms) | 1-5 | Az mesaj = dengeli dağılım | PDF generation, video transcode |
| Değişken süre | 5-10 | Orta yol, slow consumer'ı aç bırakmaz | 3rd party API calls (rate limit var) |
| Batch processing | 50-200 | Toplu commit/flush verimli olsun | Elasticsearch bulk indexing |
Prefetch Hesaplama Formülü
Tahmini bir tablo yerine, sisteminize özel prefetch değerini hesaplamak daha güvenilirdir. Aşağıdaki formül, consumer'ın boşta kalmamasını (throughput) VE memory'nin şişmemesini (güvenlik) dengeler:
Formülün 3 bileşeni:
| Bileşen | Formül | Ne Korur | Nasıl Ölçersin |
|---|---|---|---|
| Throughput | RTT ÷ ProcessingTime |
Consumer boşta kalmasın | ping rabbitmq-host → RTT; stopwatch ile T(process) |
| Memory | MaxRAMBuffer ÷ AvgMessageSize |
Consumer OOM olmasın | Pod memory limit × %20 = buffer; ortalama mesaj boyutu |
| Fairness | QueueDepth ÷ ConsumerCount |
Yük dengeli dağılsın | Management API: messages_ready / consumers |
| Final | MIN(hepsi) × 0.75 |
Spike'lara karşı marj | — |
Önemli: RTT çok düşükse (aynı cluster, <5ms) throughput formülü anlamsız sonuç verir (0-1 arası). Bu durumda TargetConcurrency yaklaşımını kullanın: "Bu consumer aynı anda kaç iş yapabilir?" sorusunun cevabı prefetch'iniz olur.
Pratik karar ağacı:
İşlemin CPU-bound mu, IO-bound mu?
│
├─ CPU-bound (image resize, crypto, parse):
│ └─ Prefetch = vCPU sayısı × 2
│ Örnek: 2 vCPU pod → prefetch = 4
│
├─ IO-bound (DB call, HTTP API, file write):
│ └─ Prefetch = concurrent IO kapasitesi × 0.75
│ Örnek: DB connection pool = 20 → prefetch = 15
│
└─ Mixed (DB + hesaplama):
└─ Prefetch = MIN(vCPU×2, connPool) × 0.75
Örnek: 4 vCPU, pool=10 → MIN(8,10) × 0.75 = 6
Memory güvenlik kontrolü (her zaman uygula):
MaxPrefetch = (Pod Memory Limit × 0.20) ÷ AvgMessageSize
Eğer hesaplanan prefetch > MaxPrefetch → MaxPrefetch kullan
Örnek:
Pod limit: 512MB
Buffer (%20): 102MB
Avg mesaj: 8KB
MaxPrefetch: 102MB ÷ 8KB = 12,750 → sorun yok, memory darboğaz değil
Ama eğer mesajlar büyükse (video metadata, 500KB):
MaxPrefetch: 102MB ÷ 500KB = 204 → bu limit'e dikkat!
| Senaryo | CPU | IO Pool | Mesaj | Hesaplama | → Prefetch |
|---|---|---|---|---|---|
| Sipariş işleme | 2 vCPU | DB pool=10 | 4KB | MIN(4, 10) × 0.75 | 3 |
| Email gönderim | 1 vCPU | SMTP pool=5 | 2KB | MIN(2, 5) × 0.75 | 2-4 |
| Log forwarding | 2 vCPU | Elastic bulk | 1KB | Throughput: batch=100 | 100 |
| PDF generation | 4 vCPU | File IO | 50KB | CPU-bound: 4×2 × 0.75 | 6 |
| Payment (bank API) | 2 vCPU | HTTP pool=3 | 3KB | MIN(4, 3) × 0.75 | 2 |
| Video transcode | 8 vCPU | Disk IO | 200KB | CPU-bound: 8×1 (ağır) | 4-8 |
Gerçek hayat senaryosu: SaaS multi-tenant notification servisi: Her tenant farklı boyutta payload gönderiyor (1KB - 200KB). Sabit prefetch=50 ile büyük payload'lu tenant'lar memory spike yapıyordu. Çözüm:
prefetch = MIN(30, 50MB / avgMsgSizeFromLastHour)— her saat ortalama mesaj boyutuna göre dinamik prefetch ayarı. Monitoring'den gelen metric ile runtime'daBasicQosAsynctekrar çağrılabilir.
Anti-Pattern: "Daha yüksek prefetch = daha hızlı" düşüncesi — Prefetch=1000 ayarlayan ekipler genelde şu sorunları yaşar: (1) Consumer crash'inde 1000 mesaj requeue olur → duplicate storm, (2) Slow consumer'da 1000 mesaj stuck kalır → diğerleri aç, (3) Memory spike → OOM kill → daha fazla requeue. Hesaplamaya güvenin, hırs yapmayın.
Gerçek hayat senaryosu: Fintech ödeme sistemi: her ödeme 50-150ms (banka API). Prefetch=3 ile consumer'da en fazla 3 concurrent ödeme. Neden düşük? Çünkü banka rate-limit uyguluyor ve hata durumunda retry queue'ya gidecek mesaj sayısı kontrollü kalıyor.
.NET — Prefetch Ayarı
// ✅ DOĞRU: Per-consumer prefetch
await channel.BasicQosAsync(
prefetchSize: 0, // byte limit yok (genelde 0 bırakılır)
prefetchCount: 20, // max 20 unacked message per consumer
global: false); // ⚠️ HER ZAMAN false — quorum queue'lar global:true desteklemez
// Consumer başlat
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (_, ea) =>
{
try
{
await ProcessMessageAsync(ea);
await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
}
catch (Exception ex)
{
// requeue: false → DLX'e gider (retry pattern için)
await channel.BasicNackAsync(ea.DeliveryTag, multiple: false, requeue: false);
}
};
await channel.BasicConsumeAsync(
queue: "payment-processing",
autoAck: false, // ⚠️ autoAck: true ise prefetch anlamsız — mesaj anında ack'lenir
consumer: consumer);
.NET — Yanlış Prefetch Kullanımları
// ❌ YANLIŞ 1: Prefetch hiç ayarlanmamış (default = 0 = unlimited)
// Broker tüm mesajları bir anda push eder → memory patlar
var consumer = new AsyncEventingBasicConsumer(channel);
await channel.BasicConsumeAsync("orders", autoAck: false, consumer: consumer);
// ↑ BasicQosAsync çağrılmadı → prefetch = 0 (unlimited)!
// ❌ YANLIŞ 2: autoAck: true ile prefetch — boşuna ayar
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 10, global: false);
await channel.BasicConsumeAsync("orders", autoAck: true, consumer: consumer);
// ↑ autoAck=true mesaj gelir gelmez ack gönderir, prefetch limiti hiçbir işe yaramaz
// ❌ YANLIŞ 3: global: true + quorum queue
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 50, global: true);
// ↑ Quorum queue'lar global QoS'u desteklemez, sessizce ignore edilir
7.2 Slow Consumer Problemi
RabbitMQ mesajları round-robin dağıtır: sırayla her consumer'a 1 mesaj. Ama eğer bir consumer yavaşsa ve prefetch dolmuşsa, broker o consumer'a yeni mesaj göndermez — mesaj diğerlerine yönlenir. Sorun: prefetch yüksek ayarlandığında yavaş consumer'ın buffer'ında çok mesaj birikir ve diğer consumer'lar bunlara erişemez.
| Belirti | Sebep | Çözüm |
|---|---|---|
Queue ready count düşük ama throughput kötü |
Mesajlar consumer buffer'larında stuck | Prefetch'i düşür |
| Bir pod CPU %100, diğerleri idle | Round-robin + yüksek prefetch = dengesiz yük | Prefetch'i düşür + pod artır |
consumer_utilisation metriği < %50 |
Consumer yeterince hızlı ack gönderemiyor | Prefetch artır (hızlı işlemlerde) |
unacked count sürekli yüksek |
Consumer crash veya çok yavaş | Consumer health check + prefetch düşür |
Anti-Pattern: Prefetch = 0 (unlimited) — Broker tüm mesajları consumer'a push eder. 100K mesajlık bir queue'da consumer başlatırsanız, 100K mesaj anında consumer memory'sine yüklenir. OOM kill garantili. Her zaman bir prefetch değeri belirleyin.
Gerçek hayat senaryosu: E-ticaret sipariş işleme: Siparişlerin %90'ı 20ms'de biter (stok kontrolü), %10'u 3sn sürer (3rd party kargo API timeout). Prefetch=50 ile bir consumer'a 50 "yavaş sipariş" geldiğinde, o consumer 2.5 dakika cevap veremez. Prefetch=3'e düşürüldüğünde, yavaş siparişler max 3'er tane dağılır ve diğer consumer'lar hızlı siparişleri işlemeye devam eder.
7.3 Consumer Scaling Patterns
Tek consumer yetmediğinde sistemi ölçeklendirmenin üç ana pattern'i vardır. Her birinin farklı garanti ve trade-off'ları vardır.
| Pattern | Ne Zaman Kullan | Ne Zaman Kullanma | Gerçek Hayat |
|---|---|---|---|
| Competing Consumers | Sıra önemsiz, throughput kritik | Aynı entity'nin mesajlarının sıralı işlenmesi gerektiğinde | E-ticaret: sipariş email bildirimleri (sıra önemsiz) |
| Single Active Consumer | Strict ordering, active-passive failover | High throughput gereken yerlerde (tek consumer darboğaz) | Fintech: hesap bakiye güncelleme (sıra kritik) |
| Consumer Priority | Preferred datacenter/node, cost optimization | Consumer sayısı az ve eşit yük dağılımı gerektiğinde | Multi-region: local DC'ye öncelik, uzak DC fallback |
Single Active Consumer + Quorum Queue: Bu pattern sadece quorum queue'larda güvenilir çalışır. Classic mirrored queue'larda network partition sırasında iki consumer aynı anda aktif olabilir (split-brain).
.NET — Competing Consumers (Paralel İşleme)
// Her pod'da aynı kodu çalıştırın — RabbitMQ round-robin dağıtır
await channel.BasicQosAsync(
prefetchSize: 0,
prefetchCount: 20,
global: false);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (_, ea) =>
{
await ProcessMessageAsync(ea);
await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
};
await channel.BasicConsumeAsync(
queue: "email-sending",
autoAck: false,
consumer: consumer);
// 5 pod deploy edin → 5 competing consumer → throughput 5x
.NET — Single Active Consumer
// Queue declare with single active consumer
var args = new Dictionary<string, object?>
{
["x-queue-type"] = "quorum",
["x-single-active-consumer"] = true // sadece 1 consumer aktif
};
await channel.QueueDeclareAsync("sequential-orders", durable: true,
exclusive: false, autoDelete: false, arguments: args);
// Tüm pod'lar consume eder ama sadece biri aktif olur
// Aktif consumer düşerse, sonraki consumer otomatik devralır (failover ~5-10sn)
.NET — Consumer Priority
// Yüksek priority consumer (local datacenter pod'ları)
var localArgs = new Dictionary<string, object?>
{
["x-priority"] = 10 // yüksek → önce bu consumer'a gider
};
await channel.BasicConsumeAsync("notifications", autoAck: false,
arguments: localArgs, consumer: localConsumer);
// Düşük priority consumer (remote datacenter — fallback)
var remoteArgs = new Dictionary<string, object?>
{
["x-priority"] = 1 // düşük → local dolunca buraya gelir
};
await channel.BasicConsumeAsync("notifications", autoAck: false,
arguments: remoteArgs, consumer: remoteConsumer);
Gerçek hayat senaryosu: Chat uygulaması notification servisi: Normal saatlerde 3 pod (Competing Consumers, prefetch=30) yeterli. Black Friday'de queue depth artınca KEDA 3→12 pod'a scale eder. Her pod aynı consumer kodu çalıştırır, RabbitMQ otomatik round-robin yapar.
7.4 Kubernetes'te Auto-Scaling
Seviye notu: Bu alt bölüm ileri seviye kapsamındadır. K8s ve Prometheus bilgisi gerektirir. Önce Clustering ve Node Discovery ile Monitoring ve Alerting konularını okumanız önerilir.
Queue derinliği arttığında otomatik pod eklenmesini (scale-out) ve azaldığında pod silinmesini (scale-in) sağlayan iki ana araç: KEDA (event-driven, sıfıra scale) ve HPA custom metrics (Prometheus adapter ile).
| Araç | Avantaj | Dezavantaj | Ne Zaman |
|---|---|---|---|
| KEDA | Sıfıra scale, event-driven, RabbitMQ trigger built-in | Cluster'a KEDA kurulumu gerekir | Event-driven workload, maliyet optimizasyonu |
| HPA + Prometheus | K8s native, ek kurulum az | Sıfıra scale edemez (min=1), metric pipeline karmaşık | Sürekli trafik olan servisler |
| Manual replica | Basit, tahmin edilebilir | Trafik spike'larına tepki yok | Dev/staging, sabit yüklü servisler |
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: order-processor-scaler
namespace: production
spec:
scaleTargetRef:
name: order-processor # Deployment adı
minReplicaCount: 2 # Min pod (0 yapılabilir ama cold-start riski)
maxReplicaCount: 50 # Max pod (kaynak limitlerine dikkat)
cooldownPeriod: 60 # Scale-in bekleme süresi (saniye)
pollingInterval: 15 # Metric kontrol sıklığı (saniye)
triggers:
- type: rabbitmq
metadata:
host: "amqp://user:[email protected]:5672/" # Secret kullanın!
queueName: order-processing
mode: QueueLength # QueueLength veya MessageRate
value: "100" # Her 100 mesaj için 1 pod
authenticationRef:
name: rabbitmq-credentials # TriggerAuthentication resource
// Program.cs — IHostedService ile graceful shutdown
public class RabbitConsumerService : BackgroundService
{
private IChannel? _channel;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_channel = await _connection.CreateChannelAsync();
await _channel.BasicQosAsync(0, 20, false);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (_, ea) =>
{
// stoppingToken ile graceful check
if (stoppingToken.IsCancellationRequested)
{
// Yeni mesaj almayı durdur, mevcut mesajı bitir
await _channel.BasicNackAsync(ea.DeliveryTag, false, requeue: true);
return;
}
await ProcessOrderAsync(ea, stoppingToken);
await _channel.BasicAckAsync(ea.DeliveryTag, false);
};
await _channel.BasicConsumeAsync("orders", false, consumer,
cancellationToken: stoppingToken);
// Token iptal olana kadar bekle
await Task.Delay(Timeout.Infinite, stoppingToken)
.ContinueWith(_ => { }, TaskScheduler.Default);
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
// In-flight mesajların bitmesini bekle
if (_channel is not null)
{
await _channel.CloseAsync(); // pending ack'ler gönderilir
}
await base.StopAsync(cancellationToken);
}
}
Kubernetes — Pod Spec (terminationGracePeriod)
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-processor
spec:
template:
spec:
terminationGracePeriodSeconds: 120 # En yavaş mesajın işlenme süresinden UZUN olmalı
containers:
- name: consumer
image: myapp/order-processor:latest
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "1000m"
lifecycle:
preStop:
exec:
command: ["sh", "-c", "sleep 5"] # LB'nin pod'u listeden çıkarması için
Anti-Pattern: terminationGracePeriod < işlem süresi — Pod silinirken in-flight mesajlar kesilir. Mesaj requeue olur ve başka consumer'a gider ama idempotent değilse duplicate processing oluşur. Grace period'u her zaman en yavaş mesajın 2x süresine ayarlayın.
Gerçek hayat senaryosu: SaaS video transcoding servisi: Her video 30-180sn arası. KEDA
queueLength: 5ile her 5 video için 1 pod ekler.terminationGracePeriodSeconds: 300(5dk) ile en yavaş video bile tamamlanır. Gece 0 pod'a düşer (maliyet ~%70 tasarruf).minReplicaCount: 0+cooldownPeriod: 300ile cold-start kabul edilebilir.
Karar Matrisi: Prefetch + Consumer + Scaling
Karar Tablosu
| Soru | Cevap A | → Config A | Cevap B | → Config B |
|---|---|---|---|---|
| İşlem süresi? | <10ms | Prefetch: 50-100 | >100ms | Prefetch: 1-5 |
| Sıra önemli mi? | Hayır | Competing Consumers | Evet | Single Active Consumer |
| Trafik tahmini? | Sabit | HPA/manual | Spike'lı | KEDA (min:0 mümkün) |
| Mesaj kaybedilebilir mi? | Hayır | autoAck:false + prefetch | Evet (log vs.) | autoAck:true + yüksek prefetch |
| Cost-sensitive? | Hayır | min:3-5 her zaman çalışsın | Evet | KEDA min:0, cooldown:300 |
Gerçek hayat senaryosu: E-ticaret Black Friday hazırlığı: Normal gün 3 pod (prefetch=20, Competing). KEDA
queueLength:50→ her 50 mesaj için 1 pod. Black Friday: queue depth 10K → KEDA 200 pod'a çıkarır (maxReplica ile sınırla!). Gece 02:00 queue boşalır → cooldown sonrası 3 pod'a döner. Toplam maliyet: sadece trafik olduğunda ödeme.