RRabbitMQ Handbook

ORTA

Message Reliability

Mesajın kaybolmaması için 4 katmanlı bir güvenlik zinciri kurman gerekir. Eksik olan her katman = potansiyel mesaj kaybı. Aşağıdaki şema tüm zinciri gösteriyor — sonra her adımı tek tek inceleyeceğiz.

Publisher Confirm Broker ack gönderir (replicated ise quorum ack) Persistence DeliveryMode=Persistent + Durable queue Replication Quorum queue: Raft Majority write Consumer Ack İşlem başarılı → BasicAck Başarısız → Nack + requeue ← End-to-end delivery guarantee →

5.1 Publisher Confirms — "Broker Mesajı Aldı mı?"

Mesajı publish ettiğinde broker'ın "tamam, aldım" dediğinden emin olmalısın. ConfirmSelectAsync() ile channel'ı confirm moduna al, sonra WaitForConfirmsOrDieAsync() ile onay bekle. Onay gelmezse → retry logic tetikle.

Anti-Pattern: Fire & Forget — Confirm olmadan publish etmek, mesajın broker'a ulaşıp ulaşmadığını bilmeden devam etmek demektir. Network hatası? Mesaj kayıp.

.NET — Publisher Confirms
// Channel'da confirms aktifleştir
await channel.ConfirmSelectAsync();

// Publish
await channel.BasicPublishAsync(
    exchange: "orders",
    routingKey: "order.created",
    mandatory: true,
    basicProperties: new BasicProperties
    {
        DeliveryMode = DeliveryModes.Persistent,
        MessageId = Guid.NewGuid().ToString(),
        Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds())
    },
    body: messageBody);

// Confirm bekle (batch)
await channel.WaitForConfirmsOrDieAsync(TimeSpan.FromSeconds(5));
// Eğer confirm gelmezse exception fırlatır → retry logic tetikle

Publisher Confirms & Quorum Queue: Quorum queue kullanıldığında, publisher confirm ancak mesaj majority'ye (çoğunluğa) yazıldıktan sonra gönderilir. 3 node'lu cluster'da en az 2 node'a yazılması gerekir. Bu, single-node confirm'den daha yavaş ama çok daha güvenlidir.


5.2 Persistence & Durable Queue — "Diske Yazıldı mı?"

Publisher confirm aldın — güzel. Ama broker RAM'de tutuyorsa ve crash olursa mesaj yine kaybolur. İki şeyin birlikte olması lazım:

✅ Persistent msg DeliveryMode=Persistent + ✅ Durable queue durable: true = GÜVENLİ Diske yazılır ❌ Persistent msg DeliveryMode=Persistent + ❌ Non-durable Q durable: false = KAYIP Queue silinir!
  1. Mesaj: DeliveryMode = DeliveryModes.Persistent (mesajı diske yaz)
  2. Queue: durable: true (queue tanımı restart'ta kaybolmasın)

Sadece biri yetmez! Persistent mesaj + non-durable queue = queue silinir, mesajlar kaybolur. Non-persistent mesaj + durable queue = queue kalır ama mesajlar RAM'den uçar.

.NET — Persistence Ayarları
// Mesaj: Persistent (diske yazılsın)
var props = new BasicProperties
{
    DeliveryMode = DeliveryModes.Persistent,  // ← BUNU UNUTMA
    MessageId = Guid.NewGuid().ToString()
};

// Queue: Durable (restart'ta kaybolmasın)
await channel.QueueDeclareAsync(
    queue: "orders",
    durable: true,       // ← BUNU UNUTMA
    exclusive: false,
    autoDelete: false);

Quorum Queue kullanıyorsan: Otomatik olarak durable + persistent. Ayrıca birden fazla node'a replike edilir (Raft consensus). Production'da en güvenli seçenek.


5.3 Consumer Acknowledgements — "İşlendi mi?"

Mesaj queue'da güvenle duruyor. Ama consumer aldıktan sonra crash olursa ne olur? Manual acknowledgement ile consumer "işimi bitirdim, silebilirsin" der. Bu sinyal gelene kadar mesaj queue'da kalır.

  • BasicAckAsync → "İşledim, sil"
  • BasicNackAsync(requeue: true) → "Başarısız oldum, tekrar dene" 🔄
  • BasicNackAsync(requeue: false) → "Kalıcı hata, DLX'e gönder"

Anti-Pattern: Auto-ack Production'daautoAck: true ile mesaj, consumer'a deliver edildiği anda queue'dan silinir. Consumer crash olursa mesaj kaybolur. Production'da her zaman manual ack kullanın.

.NET — Consumer Manual Ack
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (sender, ea) =>
{
    try
    {
        var body = ea.Body.ToArray();
        var order = JsonSerializer.Deserialize<OrderEvent>(body);

        // İş mantığı — idempotent olmalı
        await _orderProcessor.ProcessAsync(order!);

        // Başarılı → Ack
        await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
    }
    catch (TransientException)
    {
        // Geçici hata → requeue (tekrar denenecek)
        await channel.BasicNackAsync(ea.DeliveryTag, multiple: false, requeue: true);
    }
    catch (PermanentException)
    {
        // Kalıcı hata → DLX'e gönder (requeue: false)
        await channel.BasicNackAsync(ea.DeliveryTag, multiple: false, requeue: false);
    }
};

await channel.BasicConsumeAsync(
    queue: "payment-processing",
    autoAck: false,  // ← MANUAL ACK
    consumer: consumer);

5.4 Idempotent Consumer — "Tekrar Gelirse Ne Olur?"

"At-least-once" delivery, aynı mesajın birden fazla kez gelebileceği anlamına gelir (network retry, requeue). Consumer'ın aynı mesajı 2 kez işleyip 2 kez ödeme çekmemesi için idempotency zorunludur.

Yöntem Nasıl Ne Zaman
MessageId DB check Her mesajın ID'sini DB'ye yaz, tekrarında skip et Basit, çoğu senaryo için yeterli
Upsert (ON CONFLICT) İşlem sonucunu upsert ile yaz (var ise güncelle) DB state'i zaten unique ise
Outbox Pattern İş mantığı + mesaj aynı DB transaction'ında Distributed transaction gerektiğinde
.NET — MessageId ile Idempotency
public class IdempotentOrderConsumer
{
    private readonly AppDbContext _db;

    public async Task HandleAsync(IChannel channel, BasicDeliverEventArgs ea)
    {
        var messageId = ea.BasicProperties.MessageId;

        // 1. Daha önce işlenmiş mi kontrol et
        if (await _db.ProcessedMessages.AnyAsync(m => m.MessageId == messageId))
        {
            // Duplicate — sessizce ack'la, tekrar işleme
            await channel.BasicAckAsync(ea.DeliveryTag, false);
            return;
        }

        try
        {
            var order = JsonSerializer.Deserialize<OrderEvent>(ea.Body.Span);

            // 2. İş mantığı + idempotency kaydı AYNI TRANSACTION'da
            await using var tx = await _db.Database.BeginTransactionAsync();

            await _orderService.ProcessAsync(order!);

            _db.ProcessedMessages.Add(new ProcessedMessage
            {
                MessageId = messageId,
                ProcessedAt = DateTime.UtcNow
            });
            await _db.SaveChangesAsync();
            await tx.CommitAsync();

            // 3. Başarılıysa ack
            await channel.BasicAckAsync(ea.DeliveryTag, false);
        }
        catch (Exception)
        {
            await channel.BasicNackAsync(ea.DeliveryTag, false, requeue: true);
        }
    }
}

// Entity
public class ProcessedMessage
{
    public string MessageId { get; set; } = "";
    public DateTime ProcessedAt { get; set; }
}
// Migration: CREATE UNIQUE INDEX IX_ProcessedMessages_MessageId ON ProcessedMessages(MessageId);

Publisher tarafında MessageId zorunlu: Idempotency ancak her mesajın unique ve deterministic bir ID'si varsa çalışır. Publish ederken BasicProperties.MessageId = Guid.NewGuid().ToString() veya daha iyisi business key (order_id) kullanın.


Hangi Reliability Seviyesini Seçmeliyim?

Seviye Publisher Confirms Persistence Consumer Ack Quorum Queue Risk
Fire & Forget Auto-ack Mesaj her aşamada kaybolabilir
At-most-once Auto-ack Broker crash'te kayıp, duplicate yok
At-least-once Manual ack Duplicate olabilir, kayıp olmaz
Effectively once Manual ack + idempotent consumer Consumer idempotency ile sağlanır

Gerçek hayat senaryosu: Fintech ödeme sistemi: Publisher confirms ile mesajın broker'a ulaştığı doğrulanır. Quorum queue ile 3 node'a replike edilir. Consumer ödemeyi işler, DB'ye yazar, başarılıysa ack gönderir. Consumer crash olursa mesaj başka consumer'a redelivery edilir. Idempotency key (payment_id) ile duplicate ödeme engellenir.