RabbitMQ 3.8 i Quorum Queues

Quorum Queues (“kolejki kworum”) to nowy typ kolejek oparty o algorytm Raft, który służy do osiągania konsensusu w systemie rozproszonym. Nowy typ kolejek najprościej porównać do klasycznych kolejek replikowanych z włączoną persystencją (durable Mirrored Queues aka. HA Queues) i zapisywaniem wiadomości na dysk (persistent messages).

Po co zatem nowa alternatywa?

Po pierwsze gwarancje jakie dają kolejki HA nie są łatwe do zrozumienia. Sama replikacja danych ma kilka trybów: możemy mieć kopię na każdym węźle (ha-mode: all), stałą liczbę kopii (ha-mode: exactly) lub na konkretnych węzłach (ha-mode: nodes). Dodawanie kolejnych replik może odbywać się z automatyczną synchronizacją (ha-sync-mode: automatic), która promuje bezpieczeństwo danych lub manualną (ha-sync-mode: manual), żeby nie blokować kolejki na czas synchronizacji (relaksując gwarancje bezpieczeństwa danych).

Podobnie rzecz ma się z semantyką działania kolejek w przypadku awarii, która może być skonfigurowana na kilka sposobów. Jeśli tracimy węzeł główny danej kolejki (master node), następuje migracja i zmienia się jej master node. W związku z tym, że może dojść do sytuacji, w której do dyspozycji mamy tylko niezsynchronizowaną replikę (slave node) stajemy przed wyborem pomiędzy dostępnością kolejki, a utratą wiadomości; podobnie w przypadku wyłączania węzła będącego master node dla kolejki (parametry ha-promote-on-failure oraz ha-promote-on-shutdown).

Ponadto, w pewnych skrajnych przypadkach może dojść do utraty wiadomości, ponieważ potwierdzenie do producenta (publisher confirm) jest wysyłane za wcześnie.

Quorum Queues kładą nacisk na bezpieczeństwo danych i uproszczenie replikacji oraz obsługi sytuacji awaryjnych (w sensie łatwości zrozumienia przez użytkownika końcowego). Nadają się szczególnie jako długożyjące kolejki pełniące jakąś krytyczną funkcję w danym systemie, w którym bezpieczeństwo danych i odporność na błędy są ważniejsze niż niska latencja czy zaawansowane opcje kolejek (patrz Ograniczenia i kiedy nie używać).

Jakie gwarancje dają Quorum Queues?

Quorum Queues gwarantują bezpieczeństwo wiadomości na wypadek partycji sieciowych oraz awarii węzłów. Opublikowane i potwierdzone wiadomości (publisher confirms) są bezpieczne, o ile większość węzłów (czyli kworum, tj: N/2+1), na których znajduje się replika danej kolejki, nie zostanie permanentnie usunięta. Jednak żeby te gwarancje mogły być spełnione, klaster musi się składać co najmniej z 3 węzłów.

Dostępność kolejki (możliwość publikowania / konsumowania) jest zależna jedynie od tego, czy kworum węzłów jest dostępne. W przypadku 3 węzłowego klastra, system toleruje awarię co najwyżej jednego węzła – awaria kolejnego skutkuje niedostępnością kolejki celem zapewnienia spójności danych. W przypadku 5-cio węzłowego klastra, możemy utracić 2 węzły.

Jeśli tracimy węzeł będący liderem (terminologia Rafta), wybierany jest inny spośród dostępnych followersów (terminologia Rafta; nie mogłem znaleźć dobrego polskiego odpowiednika 🙂 ) – o ile spełniony jest warunek dostępności kworum węzłów. Kontrastując to z utratą master node’a dla kolejki HA, nie ma potencjalnego procesu synchronizacji, a co za tym idzie, nawet kolejka z dużym backlogiem zostanie szybko przywrócona.

Przykład

Zilustrujmy to przykładem 3 węzłowego klastra, który łatwo możemy zestawić przy użyciu Dockera i Docker Compose:

$> # pobieramy repo, które pozwoli zestawić klaster RabbitMQ
$> git clone git@github.com:pardahlman/docker-rabbitmq-cluster.git
$> cd docker-rabbitmq-cluster.git
$> # uruchamiamy w tle 3 kontenery z RabbitMQ i jeden z HA proxy
$> docker-compose up -d

Tak to wygląda u mnie:

Log docker-compose bo wykonaniu komendy up.
Log docker-compose bo wykonaniu komendy up.

Następnie w trzech osobnych terminalach możemy podpiąć się do logów poszczególnych węzłów, żeby mieć lepszy obraz sytuacji:

$> docker-compose logs -f rabbitmq1 # w innych terminalach rabbitmq2/rabbitmq3
Log kontenera rabbitmq2.
Log kontenera rabbitmq2.

Następnie zweryfikujmy, że klaster poprawnie się sformował:

$> docker-compose exec rabbitmq1 rabbitmqctl cluster_status
Status klastra RabbitMQ.
Status klastra RabbitMQ.

Na koniec konfiguracji środowiska zainstalujemy bibliotekę Pika, która pozwoli nam zintegrować się z RabbitMQ przy użyciu Pythona.

Ja instaluję ją w ramach virtualenva, ale można ją też zainstalować globalnie wg instrukcji z RabbitMQ Tutorials dla Pythona. Wtedy pomijamy pipenv w poniższych przykładach.

$> pipenv install pika

Kiedy mamy wszystko ustawione, stwórzmy kolejkę typu Quorum, opublikujmy wiadomość, a następnie zatrzymajmy dwa węzły. Mając tylko jeden węzeł, sprawdźmy jak skończy się próba skonsumowania lub opublikowania wiadomości.

Zacznijmy od stworzenia kolejki i wysłania wiadomości, przy użyciu prostego producenta w Pythonie:

# send.py
import sys
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.confirm_delivery()

channel.queue_declare(queue='qq', durable=True, arguments={
    'x-queue-type': 'quorum'})

message = sys.argv[1]
channel.basic_publish(exchange='', routing_key='qq', body=message)
print(" [x] Sent '%s'" % message)
connection.close()
$> pipenv run python send.py message1
[x] Sent 'message1'

Możemy sprawdzić wynik naszych działań poprzez stronę naszej kolejki w Management UI, która powinna wyglądać podobnie jak na poniższym zrzucie ekranu:

ManagementUI: strona kolejki qq po wysłaniu wiadomości 'message1'
ManagementUI: strona kolejki qq po wysłaniu wiadomości ‘message1’.

Widzimy, że wiadomość została z sukcesem zapisana w kolejce (wartość Ready) . Na koniec uruchommy prostego konsumenta, wiadomość powinna do nas trafić:

# consume.py
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)


channel.basic_consume(
    queue='qq', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
$> pipenv run python consume.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'message1'

Wprowadźmy teraz trochę zamieszania zatrzymując węzły rabbitmq2 oraz rabbitmq3 pozostawiając przy życiu tylko lidera. Jednak zanim to zrobimy, “odepnijmy” konsumenta i jeszcze raz wyślijmy jakąś wiadomość (np. message2), żeby coś było w naszej kolejce:

ManagementUI: strona kolejki qq po wysłaniu wiadomości 'message2'.
ManagementUI: strona kolejki qq po wysłaniu wiadomości ‘message2’.

Żeby zatrzymać node’y 2 i 3 możemy użyć poniższej komendy:

$> docker-compose stop rabbitmq2 rabbitmq3

Na stronie naszej kolejki zobaczymy, że tylko lider kolejki jest dostępny (Online vs Members):

ManagementUI: strona kolejki qq po wyłączeniu 'rabbitmq2' i 'rabbitmq3'.
ManagementUI: strona kolejki qq po wyłączeniu ‘rabbitmq2’ i ‘rabbitmq3’.

Spróbujmy skonsumować naszą wiadomość posługując się wcześniej utworzonym konsumentem:

$> pipenv run python consume.py

Żadna wiadomość nie nadchodzi, a ostatecznie broker zamyka połączenie powiadamiając o błędzie:

Błąd zwrócony przez skrypt consume.py
Błąd zwrócony przez skrypt consume.py

Spróbujmy jeszcze coś wysłać:

$> pipenv run python send.py message3

Komenda zawiesza się, co oznacza że ACK (publish confirm) nie nadchodzi. Management UI potwierdza że qq pozostaje “obojętna” na nową wiadomość:

ManagementUI: strona kolejki qq po wyłączeniu 'rabbitmq2' i 'rabbitmq3' i wysłaniu 'message3'.
ManagementUI: strona kolejki qq po wyłączeniu ‘rabbitmq2’ i ‘rabbitmq3’ i wysłaniu ‘message3’.

Przywróćmy teraz do życia node rabbitmq2:

$> docker-compose start rabbitmq2

Nasz producent powinien się “odwiesić”, a wiadomość znaleźć w kolejce:

ManagementUI: strona kolejki qq po włączeniu 'rabbitmq2'.
ManagementUI: strona kolejki qq po włączeniu ‘rabbitmq2’.

Uruchamiając teraz naszego konsumenta powinniśmy dostać dwie wiadomości (message2 i message3 jeśli podążałeś za instrukcjami).

Tym krótkim przykładem zobrazowaliśmy jak zachowuje się kolejka typu Quorum w sytuacji, w której traci węzły z replikami:

  • utrata kworum węzłów (w naszym przypadku N/2+1 = 2) skutkuje niedostępnością kolejki – nie mogliśmy nic wysłać ani skonsumować,
  • przywrócenie jednego węzła, czyli zapewnienie kworum, odblokowało kolejkę,
  • wiadomość wysłana kiedy kolejka była niedostępna, została zbuforowana przez serwer i ostatecznie trafiła do kolejki, a producent dostał ACK (można to podejrzeć używając Wireshark i filtra AMQP)

Można mieć przynajmniej 2 uwagi do tego przykładu:

  1. Wyłączaliśmy węzły, a nie “odcinaliśmy” połączenia między nimi (netsplit) – można to zasymulować wykorzystując docker-compose pause.

  2. Wyłączyliśmy followersów, a nie lidera – sprawdzenie co się stanie w tym przypadku pozostawiam Tobie 🙂

W stosunku do Mirrored Queues widać, że Quorum Queues wydają się podlegać prostszym zasadom zachowania w przypadku awarii węzłów i bardziej przejrzystym gwarancjom bezpieczeństwa wiadomości.

Chcąc permanentnie usunąć kontenery i klaster należy wykonać komendę: docker-compose down

W jaki sposób RabbitMQ zapewnia gwarancje Quorum Queues?

Kolejki typu “kworum” bazują na algorytmie osiągania konsensusu w systemie rozproszonym – RAFT. Jego znajomość nie jest konieczna do posługiwania się tymi kolejkami, natomiast poznanie jego podstawowych zasad pozwala na lepsze zrozumienie ich działania. W tej sekcji przedstawiam wybrane zagadnienia z działania algorytmu istotne z punktu widzenia Quorum Queues.

Zapisy i odczyty

Zacznijmy od tego, że węzły w klastrze implementującym Rafta, dzielą się na lidera (jeden w klastrze) i followersów. Wszystkie zapisy i odczyty w takim systemie są realizowane przez lidera, mimo że klienci mogą połączyć się z dowolnym węzłem w klastrze. W przypadku trafienia na followersa są po prostu odsyłani do lidera.

Rozpatrzmy teraz przykładowy zapis zakładając, że klient połączył się z liderem. Lider po otrzymaniu danych od klienta robi commit do swojego lokalnego logu trwale zapisując dane. Następnie dane od klienta zostają zakolejkowane do wysłania do pozostałych węzłów w klastrze wraz z kolejnym żądaniem AppendEntries RPC. Klient dostanie od serwera potwierdzenie zapisu kiedy ten otrzyma potwierdzenie od kworum (większości) węzłów, że z sukcesem trwale zapisały otrzymany dane u siebie. Pozostałe węzły (mniejszość) mogą potwierdzić zapis już po wysłaniu ACKa do klienta inicjalizującego żądanie.

Lider wysyła potwierdzenie zapisu do klienta kiedy większość węzłów w klastrze (3 węzły: lider i dwóch followersów) trwale zapisała dane.
Lider wysyła potwierdzenie zapisu do klienta kiedy większość węzłów w klastrze (3 węzły: lider i dwóch followersów) trwale zapisała dane.

Oczywiście może się zdarzyć, że lider ulegnie awarii po wysłaniu ACKa do klienta, ale zanim mniejszość potwierdzi trwałe zapisanie danych w swoim logu. Gdyby w takiej sytuacji został wybrany lider bez tego ostatniego zapisu (z mniejszości, która nie zdążyła potwierdzić zapisu), mielibyśmy do czynienia z potencjalną utratą danych (bo nie wiadomo czy follower, który nie potwierdził zapisu przed awarią lidera po prostu nie zdążył odesłać ACKa, czy nie udało mu się danych zapisać). Jednak Raft ma na tę (i wiele innych) okoliczności “sposób”, opisany w sytuacjach awaryjnych poniżej.

Elekcja lidera

Lider utrzymuje swoją pozycję dopóki followersi otrzymują od niego periodyczny heartbeat. Technicznie heartbeat to AppendEntries RPC – w tym także “pusty” (czyli bez nowych danych), jeśli nie ma żadnych zapisów do zreplikowania.

Jeśli w czasie election timeout follower nie otrzyma żadnej komunikacji od lidera, zmienia swój status na kandydata na lidera i próbuje nim zostać. W tym celu wysyła RequestVote RPC do pozostałych węzłów. Jeśli dostanie głos od większości, zostaje nowym liderem (sam głosuje na siebie).

Raft implementuje 2 mechanizmy zabezpieczające przed wybraniem dwóch liderów jednocześnie:

  1. Kandydat zostaje wybrany na lidera jeśli dostanie głos od większości węzłów w klastrze (a tylko jeden węzeł może dostać większość biorąc pod uwagę, że każdy węzeł ma jeden głos).
  2. Każdy węzeł pamięta tzw. epokę (z ang. “epoch”, w terminologii Rafta “term”). Za każdym razem, kiedy wybierany jest nowy lider, liczba całkowita reprezentująca epokę jest inkrementowana – kandydat przed wysłaniem RequestVote RPC inkrementuje epokę i dołącza ją do żądania. Jeśli węzeł otrzyma żądanie z epoką starszą niż ta, którą uważa za aktualną to odrzuca żądanie.

Sytuacje awaryjne

Jeśli lider ulegnie awarii lub zostanie odcięty od części węzłów na skutek partycji sieci, followersi (w 1-szym przypadku wszyscy, w 2-gim kilku) przestaną otrzymywać hertbeaty i rozpocznie się elekcja nowego lidera.

W przypadku gdy lider ulega awarii, followersi przejdą w stan kandydowania na liderów po upływie election timeout (który jest randomizowany dla każdego węzła, żeby zminimalizować ryzyko głosowania na wielu kandydatów jednocześnie).

W 2-gim przypadku dojdzie do elekcji tylko po tej stronie sieci, gdzie znajdzie się większość węzłów klastra – bo głos większości jest wymagany do zostania liderem (pod warunkiem, że któraś część partycji zawiera większość – w klastrach o parzystej liczbie węzłów może dojść do równego podziału węzłów). Druga część klastra, w której po awarii sieci znajdzie się mniejszość węzłów, pozostanie bez lidera, a tym samym nie będzie akceptować żądań od klientów. Dzięki temu mechanizmowi nie dojdzie rozsynchronizowania węzłów w klastrze.

Klaster zostaje podzielony na dwie partycje na skutek awarii sieci. Tylko jedna z nich, ta w której da się uzyskać większość głosów ogółu węzłów w klastrze, będzie miała lidera (być może nowego). Druga część partycji pozostanie bez lidera i nie będzie obsługiwać klientów.
Klaster zostaje podzielony na dwie partycje na skutek awarii sieci. Tylko jedna z nich, ta w której da się uzyskać większość głosów ogółu węzłów w klastrze, będzie miała lidera (być może nowego). Druga część partycji pozostanie bez lidera i nie będzie obsługiwać klientów.

Rozpatrzmy też ciekawy przypadek, w którym lider ulega awarii zaraz po tym jak potwierdził zapis klienta (czyli po otrzymania ACKa od większości węzłów), ale zanim potwierdziły pozostałe węzły (mniejszość). W przypadku klastra 5-cio węzłowego miałoby to miejsce po otrzymania ACKa od 2 serwerów (bo razem z liderem 3 węzły trwale zapisały dane na tym etapie), ale przed uzyskaniem potwierdzenia od dwóch pozostałych.

Mamy więc sytuację, w której potwierdziliśmy klientowi zapis, dane są trwale zapisane na trzech serwerach, nie wiadomo natomiast czy dwa ostatnie serwery zapisały dane z sukcesem. Gdyby któryś z nich nie zapisał trwale danych i zostałby liderem, mogłoby dojść do utraty danych. Tutaj wchodzi do gry kolejny mechanizm Rafta, który chroni klaster przed taką sytuacją: indeks ostatniego zapisu do loga (last log entry index) trafia do RequestVote RPC. Jeśli jakiś węzeł otrzyma RequestVote RPC od kandydata, którego last log entry index jest mniejszy niż jego własny, nie zagłosuje na niego. Teraz biorąc pod uwagę nasz przypadek, kandydatura węzła bez utrwalonych ostatnich danych zostałaby odrzucona, bo większość te dane zapisała, a tym samym ich last log entry index będzie większy niż owego kandydata.

To nie koniec Rafta…

Chciałem tylko przybliżyć pewne ramy tego algorytmu, żeby pogłębić Twoje zrozumienie Quorum Queues, a przy okazji przemycić coś ciekawego z teoretycznej strony systemów rozproszonych. Jeśli udało mi się Cię zaciekawić odsyłam do pracy opisującej algorytm.

Ograniczenia i kiedy nie używać

Wracając jeszcze do samych kolejek typu “kworum”, warto spojrzeć na ich ograniczenia w stosunku do klasycznych kolejek i wynikające z nich konsekwencje co do ich użyteczności w pewnych sytuacjach.

Zacznijmy od tego, że nie są to kolejki, które miałby mieć tymczasową naturę: nie wspierają opcji exclusive (kolejka, która “ginie” wraz z połączeniem, które ją utworzyło) oraz TTL (kolejka “ginie” jeśli nie ma konsumentów przez określony czas; ustawiane przez argument x-expires lub politykę expires). Ponadto, są to kolejki, które zawsze są durable i wszystkie wiadomości są zawsze zapisywane na dysk (poza tym są trzymane również w pamięci – lazy mode nie ma zastosowania).

Kolejki nie będą się też nadawać w sytuacjach kiedy wymagana jest niska latencja. Wynika to z ograniczeń zastosowanego algorytmu konsensusu Raft (zapisy na dysk, synchronizacja węzłów itd.) oraz korzystania z publisher confirms oraz consumer acknowledgments, które są konieczne do zapewnienia bezpieczeństwa danych oferowanego przez Quorum Queues.

Kolejki “kworum” są też zbyteczne jeśli bezpieczeństwo danych nie jest istotne – aplikacje klienckie nie mają potrzeby korzystania z ACK po stronie producentów i konsumentów. W takiej sytuacji kolejki “kworum” dodają niepotrzebny narzut.

Wreszcie ze względu na to, że wszystkie wiadomości są trzymane nie tylko na dysku, ale też w pamięci RAM, kolejki nie będą mieć zastosowania kiedy spodziewamy się bardzo długiego backlogu wiadomości (maksymalna długość będzie zależeć od dostępnych zasobów i rozmiarów wiadomości). Dodatkowo, kolejki nie reagują na alarmy spowodowane zbyt wysokim zużyciem pamięci, które są mechanizmem samoobrony Rabbita przed “zabiciem” go przez system operacyjny z powodu zaalokowania zbyt dużej ilości pamięci (zob. OOM Killer).

To wybrane ograniczenia Quorum Queues, które uznałem za najistotniejsze. Pełny opis znajdziecie w oficjalnej dokumentacji.

Podsumowanie

Quorum Queues to krok w kierunku łatwiejszej budowy systemów wysokiej dostępności (High Availability) i solidnej odporności na błędy (Fault Tolerance) opartych o RabbitMQ. Mimo że są to kolejki przeznaczone do konkretnych zastosowań i mają nieco ograniczeń, z pewnością wnoszą nowe możliwości dla całego community RabbitMQ.

15 Błędów Przy Pracy z RabbitMQ

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *