Redis
Redis Streams ist ein neues Feature, das Log-ähnliche Datenstrukturen auf abstrakte Weise modelliert und mit Redis 5.0 eingeführt wurde. Einfach gesagt, ist ein Stream in Redis eine Liste, in der Einträge angehängt werden. Jeder Eintrag hat eine eindeutige ID und besteht aus Schlüssel-Werte-Paaren. So können Nachrichten tatsächlich komplexe Strukturen haben, anstatt stringifizierte Versionen von JSON-Objekten zu sein. Im Gegensatz zu Pub/Sub-Nachrichten, die nach dem fire-and-forget Prinzip funktionieren, bewahren Redis-Streams Nachrichten auf Dauer. Das macht Redis Streams ideal um z.B. Chat-Systeme, message broker oder message queues zu implementieren.

Stream Grundlagen

Die grundlegenden Stream-Operationen sind natürlich Lesen und Schreiben. Um Nachrichten in einen Stream zu schreiben, gibt es den XADD-Befehl:

XADD mystream * key1 value1 key2 value2

Dieser Befehl fügt eine Struktur wie die folgende in einen Stream namens “mystream” hinzu:

{
"key1": "value1“,
"key2": "value2"
}

Wie eingangs erwähnt, hat jede hinzufügte Nachricht eine eindeutige ID, die das zweite Argument der XADD-Operation ist. In unserem Fall haben wir * übergeben, damit Redis die ID automatisch generiert. In den meisten Anwendungsfällen reicht das völlig aus. Man könnte die ID aber auch selbst angeben.

Daten aus Streams abrufen

Nachdem wir mit XADD einen Stream erstellt und Nachrichten hinzufügt haben, können wir jetzt Daten aus dem Stream abrufen. Dabei gibt es verschiedene Zugriffsmethoden. Im ersten Beispiel abonnieren wir mit dem XREAD-Befehl einen Stream und warten auf neue Nachrichten:

XREAD BLOCK 0 STREAMS mystream $

Auf den ersten Blick mag das wenig einleuchtend sein, also zerlegen wir den Befehl kurz in seine Komponenten:

  • XREAD ist der Befehl zum Abrufen von Einträgen.
  • BLOCK 0 bedeutet, dass der Client endlos blockiert, wenn keine Einträge vorhanden sind. Geben wir hier anstatt 0 beispielsweise 1000 an, tritt nach 1000 Millisekunden ein Timeout auf, wenn nichts eingeht.
  • STREAMS ist eine Direktive, mit der wir eine Liste von Streams gefolgt von einer Liste von IDs angeben, von denen wir lesen wollen. In unserem Beispiel lesen wir von einem Stream mit der Pseudo ID $, die weiter unten erklärt wird.
  • mystream ist der Name des Streams, von dem wir lesen wollen.
  • $ ist eine Pseudo-ID, die jede neue Nachricht liefert, die ankommt nachdem der Befehl abgeschickt wurde. Das bedeutet, wir möchten alle vorherigen Einträge im Stream ignorieren und konzentrieren uns nur auf Nachrichten, die von nun an eintreffen.

Da wir im Moment keine neuen Nachrichten einliefern, blockiert der Befehl endlos, wenn wir ihn jetzt abschicken.

Wir ändern obigen Befehl nun folgendermaßen ab, um alle Nachrichten aus unserem Stream zu lesen:

XREAD STREAMS mystream 0

Wie wir sehen, müssen wir beim XREAD-Befehl nur die STREAMS Direktive angeben, um Ergebnisse zu erhalten. 0 ist wieder eine Pseudo-ID, die sozusagen den Beginn eines Streams angibt.

Wenn wir nur eine bestimmte maximale Anzahl von Nachrichten lesen wollen, erweitern wir obigen Befehl um die COUNT Option:

XREAD COUNT 2 STREAMS mystream 0

Das sind auch schon die Grundlagen um mit XADD Nachrichten in einen Stream zu schreiben und mit XREAD Nachrichten aus Streams zu lesen.

In einem folgenden Blogpost werden wir uns anschauen, wie man Clients schreibt, die alle Nachrichten lesen können. Auch wenn diese nicht verbunden waren, als die Nachrichten im Stream ankamen. Und weitere Features wie den XRANGE-Befehl und Consumer Groups.

Eric Lippmann
Eric Lippmann
Lead Senior Developer

Eric kam während seines ersten Lehrjahres zu NETWAYS und hat seine Ausbildung bereits 2011 sehr erfolgreich abgeschlossen. Seit Beginn arbeitet er in der Softwareentwicklung und dort an den unterschiedlichen NETWAYS Open Source Lösungen, insbesondere inGraph und im Icinga Team an Icinga Web. Darüber hinaus zeichnet er für viele Kundenentwicklungen in der Finanz- und Automobilbranche verantwortlich.