Quorum Queues (“kolejki kworum”) to nowy typ kolejek oparty o algorytm Raft, który służy do osiągania konsensusu w systemie rozproszonym. Nowy typ kolejek najprościej porównać do klasycznych kolejek replikowanych z włączoną persystencją (durable Mirrored Queues aka. HA Queues) i zapisywaniem wiadomości na dysk (persistent messages).
Po co zatem nowa alternatywa?
Po pierwsze gwarancje jakie dają kolejki HA nie są łatwe do zrozumienia. Sama replikacja danych ma kilka trybów: możemy mieć kopię na każdym węźle (ha-mode: all
), stałą liczbę kopii (ha-mode: exactly
) lub na konkretnych węzłach (ha-mode: nodes
). Dodawanie kolejnych replik może odbywać się z automatyczną synchronizacją (ha-sync-mode: automatic
), która promuje bezpieczeństwo danych lub manualną (ha-sync-mode: manual
), żeby nie blokować kolejki na czas synchronizacji (relaksując gwarancje bezpieczeństwa danych).
Podobnie rzecz ma się z semantyką działania kolejek w przypadku awarii, która może być skonfigurowana na kilka sposobów. Jeśli tracimy węzeł główny danej kolejki (master node), następuje migracja i zmienia się jej master node. W związku z tym, że może dojść do sytuacji, w której do dyspozycji mamy tylko niezsynchronizowaną replikę (slave node) stajemy przed wyborem pomiędzy dostępnością kolejki, a utratą wiadomości; podobnie w przypadku wyłączania węzła będącego master node dla kolejki (parametry ha-promote-on-failure
oraz ha-promote-on-shutdown
).
Ponadto, w pewnych skrajnych przypadkach może dojść do utraty wiadomości, ponieważ potwierdzenie do producenta (publisher confirm) jest wysyłane za wcześnie.
Quorum Queues kładą nacisk na bezpieczeństwo danych i uproszczenie replikacji oraz obsługi sytuacji awaryjnych (w sensie łatwości zrozumienia przez użytkownika końcowego). Nadają się szczególnie jako długożyjące kolejki pełniące jakąś krytyczną funkcję w danym systemie, w którym bezpieczeństwo danych i odporność na błędy są ważniejsze niż niska latencja czy zaawansowane opcje kolejek (patrz Ograniczenia i kiedy nie używać).
Jakie gwarancje dają Quorum Queues?
Quorum Queues gwarantują bezpieczeństwo wiadomości na wypadek partycji sieciowych oraz awarii węzłów. Opublikowane i potwierdzone wiadomości (publisher confirms) są bezpieczne, o ile większość węzłów (czyli kworum, tj: N/2+1
), na których znajduje się replika danej kolejki, nie zostanie permanentnie usunięta. Jednak żeby te gwarancje mogły być spełnione, klaster musi się składać co najmniej z 3 węzłów.
Dostępność kolejki (możliwość publikowania / konsumowania) jest zależna jedynie od tego, czy kworum węzłów jest dostępne. W przypadku 3 węzłowego klastra, system toleruje awarię co najwyżej jednego węzła – awaria kolejnego skutkuje niedostępnością kolejki celem zapewnienia spójności danych. W przypadku 5-cio węzłowego klastra, możemy utracić 2 węzły.
Jeśli tracimy węzeł będący liderem (terminologia Rafta), wybierany jest inny spośród dostępnych followersów (terminologia Rafta; nie mogłem znaleźć dobrego polskiego odpowiednika 🙂 ) – o ile spełniony jest warunek dostępności kworum węzłów. Kontrastując to z utratą master node’a dla kolejki HA, nie ma potencjalnego procesu synchronizacji, a co za tym idzie, nawet kolejka z dużym backlogiem zostanie szybko przywrócona.
Przykład
Zilustrujmy to przykładem 3 węzłowego klastra, który łatwo możemy zestawić przy użyciu Dockera i Docker Compose:
$> # pobieramy repo, które pozwoli zestawić klaster RabbitMQ
$> git clone git@github.com:pardahlman/docker-rabbitmq-cluster.git
$> cd docker-rabbitmq-cluster.git
$> # uruchamiamy w tle 3 kontenery z RabbitMQ i jeden z HA proxy
$> docker-compose up -d
Tak to wygląda u mnie:

Następnie w trzech osobnych terminalach możemy podpiąć się do logów poszczególnych węzłów, żeby mieć lepszy obraz sytuacji:
$> docker-compose logs -f rabbitmq1 # w innych terminalach rabbitmq2/rabbitmq3

Następnie zweryfikujmy, że klaster poprawnie się sformował:
$> docker-compose exec rabbitmq1 rabbitmqctl cluster_status

Na koniec konfiguracji środowiska zainstalujemy bibliotekę Pika, która pozwoli nam zintegrować się z RabbitMQ przy użyciu Pythona.
Ja instaluję ją w ramach virtualenva, ale można ją też zainstalować globalnie wg instrukcji z RabbitMQ Tutorials dla Pythona. Wtedy pomijamy
pipenv
w poniższych przykładach.
$> pipenv install pika
Kiedy mamy wszystko ustawione, stwórzmy kolejkę typu Quorum, opublikujmy wiadomość, a następnie zatrzymajmy dwa węzły. Mając tylko jeden węzeł, sprawdźmy jak skończy się próba skonsumowania lub opublikowania wiadomości.
Zacznijmy od stworzenia kolejki i wysłania wiadomości, przy użyciu prostego producenta w Pythonie:
# send.py
import sys
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.confirm_delivery()
channel.queue_declare(queue='qq', durable=True, arguments={
'x-queue-type': 'quorum'})
message = sys.argv[1]
channel.basic_publish(exchange='', routing_key='qq', body=message)
print(" [x] Sent '%s'" % message)
connection.close()
$> pipenv run python send.py message1
[x] Sent 'message1'
Możemy sprawdzić wynik naszych działań poprzez stronę naszej kolejki w Management UI, która powinna wyglądać podobnie jak na poniższym zrzucie ekranu:

Widzimy, że wiadomość została z sukcesem zapisana w kolejce (wartość Ready
) . Na koniec uruchommy prostego konsumenta, wiadomość powinna do nas trafić:
# consume.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(
queue='qq', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
$> pipenv run python consume.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'message1'
Wprowadźmy teraz trochę zamieszania zatrzymując węzły rabbitmq2
oraz rabbitmq3
pozostawiając przy życiu tylko lidera. Jednak zanim to zrobimy, “odepnijmy” konsumenta i jeszcze raz wyślijmy jakąś wiadomość (np. message2
), żeby coś było w naszej kolejce:

Żeby zatrzymać node’y 2 i 3 możemy użyć poniższej komendy:
$> docker-compose stop rabbitmq2 rabbitmq3
Na stronie naszej kolejki zobaczymy, że tylko lider kolejki jest dostępny (Online
vs Members
):

Spróbujmy skonsumować naszą wiadomość posługując się wcześniej utworzonym konsumentem:
$> pipenv run python consume.py
Żadna wiadomość nie nadchodzi, a ostatecznie broker zamyka połączenie powiadamiając o błędzie:

Spróbujmy jeszcze coś wysłać:
$> pipenv run python send.py message3
Komenda zawiesza się, co oznacza że ACK (publish confirm) nie nadchodzi. Management UI potwierdza że qq
pozostaje “obojętna” na nową wiadomość:

Przywróćmy teraz do życia node rabbitmq2
:
$> docker-compose start rabbitmq2
Nasz producent powinien się “odwiesić”, a wiadomość znaleźć w kolejce:

Uruchamiając teraz naszego konsumenta powinniśmy dostać dwie wiadomości (message2
i message3
jeśli podążałeś za instrukcjami).
Tym krótkim przykładem zobrazowaliśmy jak zachowuje się kolejka typu Quorum w sytuacji, w której traci węzły z replikami:
- utrata kworum węzłów (w naszym przypadku
N/2+1 = 2
) skutkuje niedostępnością kolejki – nie mogliśmy nic wysłać ani skonsumować, - przywrócenie jednego węzła, czyli zapewnienie kworum, odblokowało kolejkę,
- wiadomość wysłana kiedy kolejka była niedostępna, została zbuforowana przez serwer i ostatecznie trafiła do kolejki, a producent dostał ACK (można to podejrzeć używając Wireshark i filtra AMQP)
Można mieć przynajmniej 2 uwagi do tego przykładu:
- Wyłączaliśmy węzły, a nie “odcinaliśmy” połączenia między nimi (netsplit) – można to zasymulować wykorzystując
docker-compose pause
. -
Wyłączyliśmy followersów, a nie lidera – sprawdzenie co się stanie w tym przypadku pozostawiam Tobie 🙂
W stosunku do Mirrored Queues widać, że Quorum Queues wydają się podlegać prostszym zasadom zachowania w przypadku awarii węzłów i bardziej przejrzystym gwarancjom bezpieczeństwa wiadomości.
Chcąc permanentnie usunąć kontenery i klaster należy wykonać komendę:
docker-compose down
W jaki sposób RabbitMQ zapewnia gwarancje Quorum Queues?
Kolejki typu “kworum” bazują na algorytmie osiągania konsensusu w systemie rozproszonym – RAFT. Jego znajomość nie jest konieczna do posługiwania się tymi kolejkami, natomiast poznanie jego podstawowych zasad pozwala na lepsze zrozumienie ich działania. W tej sekcji przedstawiam wybrane zagadnienia z działania algorytmu istotne z punktu widzenia Quorum Queues.
Zapisy i odczyty
Zacznijmy od tego, że węzły w klastrze implementującym Rafta, dzielą się na lidera (jeden w klastrze) i followersów. Wszystkie zapisy i odczyty w takim systemie są realizowane przez lidera, mimo że klienci mogą połączyć się z dowolnym węzłem w klastrze. W przypadku trafienia na followersa są po prostu odsyłani do lidera.
Rozpatrzmy teraz przykładowy zapis zakładając, że klient połączył się z liderem. Lider po otrzymaniu danych od klienta robi commit do swojego lokalnego logu trwale zapisując dane. Następnie dane od klienta zostają zakolejkowane do wysłania do pozostałych węzłów w klastrze wraz z kolejnym żądaniem AppendEntries RPC
. Klient dostanie od serwera potwierdzenie zapisu kiedy ten otrzyma potwierdzenie od kworum (większości) węzłów, że z sukcesem trwale zapisały otrzymany dane u siebie. Pozostałe węzły (mniejszość) mogą potwierdzić zapis już po wysłaniu ACKa do klienta inicjalizującego żądanie.

Oczywiście może się zdarzyć, że lider ulegnie awarii po wysłaniu ACKa do klienta, ale zanim mniejszość potwierdzi trwałe zapisanie danych w swoim logu. Gdyby w takiej sytuacji został wybrany lider bez tego ostatniego zapisu (z mniejszości, która nie zdążyła potwierdzić zapisu), mielibyśmy do czynienia z potencjalną utratą danych (bo nie wiadomo czy follower, który nie potwierdził zapisu przed awarią lidera po prostu nie zdążył odesłać ACKa, czy nie udało mu się danych zapisać). Jednak Raft ma na tę (i wiele innych) okoliczności “sposób”, opisany w sytuacjach awaryjnych poniżej.
Elekcja lidera
Lider utrzymuje swoją pozycję dopóki followersi otrzymują od niego periodyczny heartbeat. Technicznie heartbeat to AppendEntries RPC
– w tym także “pusty” (czyli bez nowych danych), jeśli nie ma żadnych zapisów do zreplikowania.
Jeśli w czasie election timeout
follower nie otrzyma żadnej komunikacji od lidera, zmienia swój status na kandydata na lidera i próbuje nim zostać. W tym celu wysyła RequestVote RPC
do pozostałych węzłów. Jeśli dostanie głos od większości, zostaje nowym liderem (sam głosuje na siebie).
Raft implementuje 2 mechanizmy zabezpieczające przed wybraniem dwóch liderów jednocześnie:
- Kandydat zostaje wybrany na lidera jeśli dostanie głos od większości węzłów w klastrze (a tylko jeden węzeł może dostać większość biorąc pod uwagę, że każdy węzeł ma jeden głos).
- Każdy węzeł pamięta tzw. epokę (z ang. “epoch”, w terminologii Rafta “term”). Za każdym razem, kiedy wybierany jest nowy lider, liczba całkowita reprezentująca epokę jest inkrementowana – kandydat przed wysłaniem
RequestVote RPC
inkrementuje epokę i dołącza ją do żądania. Jeśli węzeł otrzyma żądanie z epoką starszą niż ta, którą uważa za aktualną to odrzuca żądanie.
Sytuacje awaryjne
Jeśli lider ulegnie awarii lub zostanie odcięty od części węzłów na skutek partycji sieci, followersi (w 1-szym przypadku wszyscy, w 2-gim kilku) przestaną otrzymywać hertbeaty i rozpocznie się elekcja nowego lidera.
W przypadku gdy lider ulega awarii, followersi przejdą w stan kandydowania na liderów po upływie election timeout
(który jest randomizowany dla każdego węzła, żeby zminimalizować ryzyko głosowania na wielu kandydatów jednocześnie).
W 2-gim przypadku dojdzie do elekcji tylko po tej stronie sieci, gdzie znajdzie się większość węzłów klastra – bo głos większości jest wymagany do zostania liderem (pod warunkiem, że któraś część partycji zawiera większość – w klastrach o parzystej liczbie węzłów może dojść do równego podziału węzłów). Druga część klastra, w której po awarii sieci znajdzie się mniejszość węzłów, pozostanie bez lidera, a tym samym nie będzie akceptować żądań od klientów. Dzięki temu mechanizmowi nie dojdzie rozsynchronizowania węzłów w klastrze.

Rozpatrzmy też ciekawy przypadek, w którym lider ulega awarii zaraz po tym jak potwierdził zapis klienta (czyli po otrzymania ACKa od większości węzłów), ale zanim potwierdziły pozostałe węzły (mniejszość). W przypadku klastra 5-cio węzłowego miałoby to miejsce po otrzymania ACKa od 2 serwerów (bo razem z liderem 3 węzły trwale zapisały dane na tym etapie), ale przed uzyskaniem potwierdzenia od dwóch pozostałych.
Mamy więc sytuację, w której potwierdziliśmy klientowi zapis, dane są trwale zapisane na trzech serwerach, nie wiadomo natomiast czy dwa ostatnie serwery zapisały dane z sukcesem. Gdyby któryś z nich nie zapisał trwale danych i zostałby liderem, mogłoby dojść do utraty danych. Tutaj wchodzi do gry kolejny mechanizm Rafta, który chroni klaster przed taką sytuacją: indeks ostatniego zapisu do loga (last log entry index
) trafia do RequestVote RPC
. Jeśli jakiś węzeł otrzyma RequestVote RPC
od kandydata, którego last log entry index
jest mniejszy niż jego własny, nie zagłosuje na niego. Teraz biorąc pod uwagę nasz przypadek, kandydatura węzła bez utrwalonych ostatnich danych zostałaby odrzucona, bo większość te dane zapisała, a tym samym ich last log entry index
będzie większy niż owego kandydata.
To nie koniec Rafta…
Chciałem tylko przybliżyć pewne ramy tego algorytmu, żeby pogłębić Twoje zrozumienie Quorum Queues, a przy okazji przemycić coś ciekawego z teoretycznej strony systemów rozproszonych. Jeśli udało mi się Cię zaciekawić odsyłam do pracy opisującej algorytm.
Ograniczenia i kiedy nie używać
Wracając jeszcze do samych kolejek typu “kworum”, warto spojrzeć na ich ograniczenia w stosunku do klasycznych kolejek i wynikające z nich konsekwencje co do ich użyteczności w pewnych sytuacjach.
Zacznijmy od tego, że nie są to kolejki, które miałby mieć tymczasową naturę: nie wspierają opcji exclusive
(kolejka, która “ginie” wraz z połączeniem, które ją utworzyło) oraz TTL
(kolejka “ginie” jeśli nie ma konsumentów przez określony czas; ustawiane przez argument x-expires
lub politykę expires
). Ponadto, są to kolejki, które zawsze są durable
i wszystkie wiadomości są zawsze zapisywane na dysk (poza tym są trzymane również w pamięci – lazy mode
nie ma zastosowania).
Kolejki nie będą się też nadawać w sytuacjach kiedy wymagana jest niska latencja. Wynika to z ograniczeń zastosowanego algorytmu konsensusu Raft (zapisy na dysk, synchronizacja węzłów itd.) oraz korzystania z publisher confirms
oraz consumer acknowledgments
, które są konieczne do zapewnienia bezpieczeństwa danych oferowanego przez Quorum Queues.
Kolejki “kworum” są też zbyteczne jeśli bezpieczeństwo danych nie jest istotne – aplikacje klienckie nie mają potrzeby korzystania z ACK po stronie producentów i konsumentów. W takiej sytuacji kolejki “kworum” dodają niepotrzebny narzut.
Wreszcie ze względu na to, że wszystkie wiadomości są trzymane nie tylko na dysku, ale też w pamięci RAM, kolejki nie będą mieć zastosowania kiedy spodziewamy się bardzo długiego backlogu wiadomości (maksymalna długość będzie zależeć od dostępnych zasobów i rozmiarów wiadomości). Dodatkowo, kolejki nie reagują na alarmy spowodowane zbyt wysokim zużyciem pamięci, które są mechanizmem samoobrony Rabbita przed “zabiciem” go przez system operacyjny z powodu zaalokowania zbyt dużej ilości pamięci (zob. OOM Killer).
To wybrane ograniczenia Quorum Queues, które uznałem za najistotniejsze. Pełny opis znajdziecie w oficjalnej dokumentacji.
Podsumowanie
Quorum Queues to krok w kierunku łatwiejszej budowy systemów wysokiej dostępności (High Availability) i solidnej odporności na błędy (Fault Tolerance) opartych o RabbitMQ. Mimo że są to kolejki przeznaczone do konkretnych zastosowań i mają nieco ograniczeń, z pewnością wnoszą nowe możliwości dla całego community RabbitMQ.
One Reply to “RabbitMQ 3.8 i Quorum Queues”