RRedis Handbook

ORTA

Pub/Sub

Fire-and-forget mesajlaşma. Subscriber yoksa mesaj kaybolur.

Kod örneği görünümü Bu sayfadaki eşleşen örnekleri seçilen istemciye göre gösterir.
Pub/Sub (at-most-once) Publisher Sub A ✓ Sub B ✓ Sub C ✗ mesaj kayıp! Streams (persistent) Producer Stream Log persist + replay Worker A Worker B Subscriber yoksa mesaj KAYBOLUR. Replay yok. Mesajlar persist. Consumer Group + ACK + replay.

Ne Zaman Pub/Sub Kullan / Kullanma

Kullan Kullanma Gerçek Hayat
Anlık bildirim (mesaj kaybı tolere edilebilir) Mesaj kaybı kabul edilemez (ödeme, sipariş) Chat app: Online kullanıcılara anlık mesaj — offline'daysa zaten göremez
Cache invalidation broadcast (tüm instance'lara) Guaranteed delivery gerekli Microservice: 8 pod'a "ürün güncellendi, cache'i temizle" sinyali
Realtime dashboard, live feed Consumer geçici olarak kapanabiliyorsa Monitoring: CPU/memory metrik'lerini dashboard'a push
SignalR backplane (multi-instance) Queue semantiği (FIFO, retry, DLQ) E-ticaret: Stok değişiklik bildirimi — kayıpsa sorun yok, 2s sonra refresh gelir

KRİTİK: Pub/Sub at-most-once garantisi verir. Subscriber bağlı değilse mesaj kalıcı olarak kaybolur. Ödeme tamamlandı eventi, sipariş oluşturuldu — bunlar için asla Pub/Sub kullanma, Redis Streams veya RabbitMQ/Kafka tercih et.

Gerçek hayat senaryosu — Canlı bildirim sistemi: Kullanıcı online → Pub/Sub ile anlık push. Kullanıcı offline → mesajı DB'ye yaz + Streams'e ekle. Online'a döndüğünde Streams'ten unread mesajları çek. Hibrit mimari: Pub/Sub (hız) + Streams (güvenilirlik).

# Terminal 1 (subscriber):
SUBSCRIBE notifications:user:1001
PSUBSCRIBE notifications:*          # pattern

# Terminal 2 (publisher):
PUBLISH notifications:user:1001 '{"type":"order_shipped","orderId":5432}'
public class NotificationService
{
    private readonly ISubscriber _subscriber;
    private readonly IDatabase _redis;

    public NotificationService(IConnectionMultiplexer mux)
    {
        _subscriber = mux.GetSubscriber();
        _redis = mux.GetDatabase();
    }

    // Publish
    public async Task PublishAsync(string userId, object notification)
    {
        var channel = RedisChannel.Literal($"notifications:{userId}");
        var message = JsonSerializer.Serialize(notification);
        await _subscriber.PublishAsync(channel, message);
    }

    // Subscribe (genellikle BackgroundService'de)
    public async Task SubscribeAsync(string userId, Action<string> handler)
    {
        var channel = RedisChannel.Literal($"notifications:{userId}");
        await _subscriber.SubscribeAsync(channel, (ch, msg) =>
        {
            if (msg.HasValue)
                handler(msg.ToString());
        });
    }

    // Pattern subscribe
    public async Task SubscribePatternAsync(Action<string, string> handler)
    {
        var pattern = RedisChannel.Pattern("notifications:*");
        await _subscriber.SubscribeAsync(pattern, (ch, msg) =>
        {
            handler(ch.ToString(), msg.ToString());
        });
    }

    public async Task UnsubscribeAsync(string userId)
    {
        var channel = RedisChannel.Literal($"notifications:{userId}");
        await _subscriber.UnsubscribeAsync(channel);
    }
}

// BackgroundService ile consumer
public class NotificationConsumer : BackgroundService
{
    private readonly IConnectionMultiplexer _mux;
    private readonly ILogger<NotificationConsumer> _logger;

    public NotificationConsumer(IConnectionMultiplexer mux, ILogger<NotificationConsumer> logger)
    {
        _mux = mux;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        var sub = _mux.GetSubscriber();
        await sub.SubscribeAsync(RedisChannel.Pattern("events:*"), (channel, message) =>
        {
            _logger.LogInformation("Received {Channel}: {Message}", channel, message);
            // Process message...
        });

        await Task.Delay(Timeout.Infinite, ct);
    }
}

Pub/Sub at-most-once. Mesaj kaybı kabul edilemezse → Redis Streams kullan.