Heim > Backend-Entwicklung > Python-Tutorial > Messaging in verteilten Systemen mit ZeroMQ

Messaging in verteilten Systemen mit ZeroMQ

Barbara Streisand
Freigeben: 2024-11-21 07:33:11
Original
488 Leute haben es durchsucht

Messaging in verteilten Systemen mit ZeroMQ

Lassen Sie uns Python verwenden, um die verschiedenen Nachrichtenmuster zu entwickeln.

Sie müssen sich das folgende Video ansehen, um den Schritt-für-Schritt-Befehlen zu folgen.

Nehmen Sie sich Zeit; Stellen Sie sicher, dass Sie die Befehle noch einmal überprüfen, bevor Sie sie ausführen.

  • Das folgende Video demonstriert die in diesem Tutorial verwendeten Befehle.

Messaging in distributed systems using ZeroMQ

Ich führe dieses Tutorial auf meiner GCP-VM aus, kann es aber auch gerne lokal ausführen ✅

Dieses Tutorial stellt die Konzepte von Sockets in Python3 mit ZeroMQ vor. ZeroMQ ist eine einfache Möglichkeit, Sockets zu entwickeln, damit verteilte Prozesse durch das Senden von Nachrichten miteinander kommunizieren können.

  • In seiner einfachsten Form „lauscht“ ein Socket (Knoten) auf einem bestimmten IP-Port, während ein anderer Socket versucht, eine Verbindung herzustellen. Mithilfe von Sockets können wir On-to-One-, One-to-Many- und Many-to-Many-Verbindungsmuster erstellen.

Die Nachrichtenmuster, die wir heute untersuchen werden, sind die folgenden:

  • Paar: Exklusive Eins-zu-Eins-Kommunikation, bei der zwei Kollegen miteinander kommunizieren. Die Kommunikation erfolgt bidirektional und es wird kein bestimmter Status im Socket gespeichert. Der Server lauscht an einem bestimmten Port und der Client stellt eine Verbindung zu diesem her.

Messaging in distributed systems using ZeroMQ

  • Client – ​​Server: Ein Client verbindet sich mit einem oder mehreren Servern. Dieses Muster ermöglicht den REQUEST-RESPONSE-Modus. Ein Client sendet eine Anfrage „zmq.REQ“ und erhält eine Antwort.

Messaging in distributed systems using ZeroMQ

  • Veröffentlichen/Abonnieren: Ein traditionelles Kommunikationsmuster, bei dem Absender von Nachrichten, sogenannte Herausgeber, Nachrichten an bestimmte Empfänger, sogenannte Abonnenten, senden. Nachrichten werden veröffentlicht, ohne zu wissen, was oder ob ein Abonnent dieses Wissens existiert. Mehrere Abonnenten abonnieren Nachrichten/Themen, die von einem Herausgeber veröffentlicht werden, oder ein Abonnent kann eine Verbindung zu mehreren Herausgebern herstellen.

Messaging in distributed systems using ZeroMQ

  • Push- und Pull-Sockets (auch bekannt als Pipelines): Ermöglichen die Verteilung von Nachrichten an mehrere Mitarbeiter, angeordnet in einer Pipeline. Ein Push-Socket verteilt gesendete Nachrichten gleichmäßig an seine Pull-Clients. Dies entspricht dem Producer/Consumer-Modell, aber die vom Consumer berechneten Ergebnisse werden nicht stromaufwärts, sondern stromabwärts an einen anderen Pull-/Consumer-Socket gesendet.

Messaging in distributed systems using ZeroMQ

? Hinweis: Das Arbeiten mit Sockets kann schwierig sein. Das wiederholte Ausführen desselben Codes unter Verwendung derselben Portnummer/dieselben Sockets kann dazu führen, dass die Verbindung „hängt“ (der Server scheint zu laufen, aber es kann keine Verbindungen akzeptieren). Dies liegt daran, dass wir die vorherigen Verbindungen nicht korrekt geschlossen und zerstört haben.

Der beste Weg, dies zu beheben, besteht darin, den Socket zu schließen und den ZeroMQ-Kontext zu zerstören. Weitere Einzelheiten finden Sie unter Try – Catch-Blöcke von Phase 2 und Phase 3.

In diesem Tutorial können solche Probleme auftreten, z. B. wenn derselbe Server mehrmals am selben Port ausgeführt wird. Wenn Sie auf hängende Probleme stoßen, wird empfohlen, den Python-Prozess abzubrechen, die TCP-Portnummer zu bereinigen und den Server erneut auszuführen (siehe Schritt 11).

Phase 1: Koppeln eines Servers mit einem Client

Beginnen wir mit der Erstellung einer neuen VM, dann installieren wir Python3.

  • Behalten Sie eine Kopie der internen IP der VM. Für dieses Tutorial verwenden wir die interne IP-Adresse.
    1. Öffnen Sie eine neue Terminalverbindung und führen Sie die folgenden Befehle (nacheinander) aus. Der letzte Befehl installiert ZeroMQ.
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Geben Sie „Y“ ein, wenn Sie dazu aufgefordert werden.

Viele Anwendungen bestehen heutzutage aus Komponenten, die sich über Netzwerke erstrecken, daher ist die Nachrichtenübermittlung unerlässlich. Heute werden wir TCP für die Nachrichtenübertragung verwenden.

Sie können über VSC auf Ihre VM zugreifen oder die Befehle über SSH ausführen und Dateien mit Pico bearbeiten. In meinem Fall verwende ich SSH.

? Stellen Sie sicher, dass Sie den Code sorgfältig kopieren.

Wir müssen unseren ersten ZeroMQ-Server erstellen. Der Server ermöglicht die Bindung mit jeweils nur einem Client.

  • Erstellen Sie eine neue Datei mit dem Namen „pair-server.py“ und geben Sie dann den folgenden Code ein.

  • Der Code erstellt einen neuen Socket mithilfe des Musters zmq.PAIR und bindet dann den Server an einen bestimmten IP-Port (den wir bereits in GCP geöffnet haben). Beachten Sie, dass der Server nicht aufhört zu laufen, bis wir ihn stoppen.

  • Schauen Sie sich die Kommentare an, um zu verstehen, wie das funktioniert.

  • Stellen Sie sicher, dass Sie das ; ändern. das ist die interne IP-Adresse der GCP-VM; Der Client-Port sollte mit dem Server identisch sein.

# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Führen Sie den Server noch nicht aus, lassen Sie uns zuerst den Client erstellen.

Erstellen Sie den Kunden und nehmen Sie sich eine Minute Zeit, um die Kommentare zu prüfen. Ich werde es „pair-client.py“ nennen.

Stellen Sie sicher, dass Sie das ; ändern. Der Port sollte derselbe sein wie im Server.

$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Wir benötigen zwei Terminalfenster, um das PAIR-Beispiel auszuführen. Wir werden den Server in einem Fenster und den Client im anderen ausführen. Führen Sie es nun wie folgt aus.

  • Führen Sie den Server aus
# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
  • Führen Sie den Client aus
import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Untersuchen Sie die Ausgabe. Wir haben gerade einen neuen PAIR-Socket erstellt.

  • Das Skript wird beendet, wenn der Client seine Verbindung herstellt. Stoppen Sie dann den Server (Strg-C) und beenden Sie ihn.

Wir müssen die TCP-Verbindung trennen, bevor wir sie erneut ausführen. Verwenden Sie dazu den folgenden Befehl.

$ python3 pair-server.py
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

? Hinweise:

  • Wir können nur ein PAAR gleichzeitig ausführen, das bedeutet, dass wir nicht mehrere Clients haben können. Denken Sie daran, dass es sich um ein PAAR handelt. Der erste Client sperrt den Socket .

  • Wenn wir den Server einmal und den Client zweimal ausführen, bleibt der zweite Client „hängen“, was bedeutet, dass der zweite Client darauf wartet, dass sich ein neuer Server verbindet.

  • Wenn wir das Paar mehr als einmal ausführen möchten, müssen wir den Server beenden und die TCP-Verbindung löschen.

  • PAARE sind ideal, wenn ein Client exklusiven Zugriff auf einen Server benötigt.

  • Wir können mehrere Server für mehrere Clients als PAARE haben, aber wir müssen unterschiedliche PORT-Nummern für die Verbindungen verwenden.

Jede Phase ist unabhängig voneinander. Stoppen Sie daher den Server, löschen Sie die TCP-Ports und fahren Sie mit der nächsten Phase fort.

Phase 2: Koppeln eines Servers mit mehreren Clients

Lassen Sie uns eine Client-Server-Verbindung erstellen, bei der mehrere Clients eine Verbindung zu einem einzelnen Server herstellen. Dies ist das beliebteste Nachrichtenmuster.

  • Lassen Sie uns einen Server im Kontext des Musters REP-REQ (Antwort auf eine Anfrage) erstellen.
  • Wir rufen den Server rep-server.py über Port 5555 auf.
$ python3 pair-client.py
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Jetzt werden wir zwei Clients entwickeln, die hinsichtlich ihrer Funktionalität identisch sein werden.

$ sudo fuser -k 5555/tcp # 5555 refers to your port number
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Lassen Sie uns eine Kopie dieses Clients erstellen und diese entsprechend bearbeiten. Führen Sie den folgenden Befehl aus, um eine neue Kopie zu erstellen.

* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Bearbeiten Sie dann req-client2.py und ändern Sie Client 1 in Client 2.

Bearbeiten wir die Druck- und Socket-Nachrichten (Zeilen 8 und 9)

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 1 ", request,"...")
    socket.send_string("Hello from client 1")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()
Nach dem Login kopieren
Nach dem Login kopieren

Um dieses Beispiel auszuführen, benötigen wir drei Terminalfenster, eines für den Server und zwei für die Clients. Führen Sie im ersten Terminal Folgendes aus:

  • Lass uns den Server starten
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
  • Lassen Sie uns den ersten Kunden starten
# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
  • Lassen Sie uns den zweiten Client starten
import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Überprüfen Sie die Ausgabe der Fenster. Wir haben gerade zwei Clients erstellt, die mit einem Server kommunizieren. Sie können so viele Clients haben, wie Sie möchten. Sie müssen Clients erstellen, auch mit unterschiedlichen Funktionalitäten, die eine Verbindung zu einem Server herstellen.

? Hinweise:

  • Client – ​​Server ist das am weitesten verbreitete Muster. Wir haben es bereits in Klasse 1 verwendet, als wir den Apache HTTP-Server installiert und ausgeführt haben.

  • Stoppen Sie den Server und bereinigen Sie TCP-Port 5555

    • Töte den Server:


bash
$ sudo Fuser -k 5555/tcp

Phase 3: Koppeln eines Servers mit einem Client

Das Publish-Subscribe-Muster ist eine sehr verbreitete Methode, um die Übertragung von Daten an viele Clients zu steuern, die einen Kontext abonniert haben, und zwar so, dass Server Daten an einen oder mehrere Clients senden.

$ python3 pair-server.py
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Lassen Sie uns zunächst ein einfaches Beispiel erstellen.

$ python3 pair-client.py
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Lassen Sie uns eine neue Datei erstellen und sie pub_server.py nennen.

$ sudo fuser -k 5555/tcp # 5555 refers to your port number
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
  • Dieser Befehl weist Python an, einen Server in bestimmten und auszuführen
* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Erstellen Sie eine neue Datei pub_client.py.
* Das Skript akzeptiert drei Argumente von der Befehlszeile (das sind die IP und die beiden Ports).

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 1 ", request,"...")
    socket.send_string("Hello from client 1")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()
Nach dem Login kopieren
Nach dem Login kopieren

Wir sind bereit, unsere Pub-Sub-Anwendung auszuführen! Wir benötigen drei Terminalfenster. Im ersten Terminal ausführen:

$ cp req-client1.py req-client2.py
Nach dem Login kopieren
  • Im zweiten Terminallauf:
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 2 ", request,"...")
        socket.send_string("Hello from client 2")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()
Nach dem Login kopieren
  • Jeder Server generiert Wetterdaten. Zum Beispiel:
    • Die Postleitzahl, z. B.: 10001
    • Das gemäßigte Klima, z. B.: -68

Lassen Sie uns den Client ausführen, um eine Verbindung herzustellen und Daten nach Postleitzahl zu abonnieren, z. B. 10001 (NYC). Denken Sie daran, dass das Client-Skript beide Serverinstanzen abonniert. Führen Sie den nächsten Befehl aus:

$ python3 rep-server.py
Nach dem Login kopieren
  • Wenn Sie fertig sind, beenden Sie die Server (Strg-Z) und löschen Sie die TCP-Ports, indem Sie die nächsten Befehle ausführen:
$ python3 req-client1.py
Nach dem Login kopieren
$ python3 req-client2.py
Nach dem Login kopieren
Phase 4: Push/Pull: Verwendung eines Pipeline-Musters**

Mit Push/Pull-Sockets können Sie Nachrichten an mehrere Mitarbeiter verteilen, die in einer Pipeline angeordnet sind. Dies ist sehr nützlich, um Code parallel auszuführen. Ein Push-Socket verteilt Nachrichten gleichmäßig an seine Pull-Clients und die Clients senden eine Antwort an einen anderen Server, den sogenannten Collector.

Messaging in distributed systems using ZeroMQ

  • Dies entspricht dem Producer/Consumer-Modell, aber die vom Consumer berechneten Ergebnisse werden nicht stromaufwärts, sondern stromabwärts an einen anderen Pull-/Consumer-Socket gesendet.

  • Wir werden die folgende Funktionalität implementieren.

  • Der Produzent sendet Zufallszahlen von 0 bis 10 an die Verbraucher.

  • Zwei Instanzen desselben Verbrauchers ziehen die Zahlen und führen eine schwere Aufgabe aus.

  • Die Aufgabe könnte jede schwere Berechnung sein, z. B. eine Matrixmultiplikation.

  • Der Einfachheit halber gibt unsere „schwere Aufgabe“ einfach dieselbe Zahl zurück.

  • Die Verbraucher werden die einzelnen Ergebnisse (Berechnungen schwerer Aufgaben) an einen Ergebnissammler senden, der die Ergebnisse aggregiert.

  • Der Einfachheit halber zieht eine Instanz des Ergebnissammlers die Ergebnisse und berechnet die Teilsumme jedes Verbrauchers. Bei Bedarf können wir die beiden Teilsummen problemlos summieren.

  • Sehen wir uns ein einfaches Beispiel an.

    • Produzent generiert [1,2,3,4,5].
    • Verbraucher 1 erhält [2,4], berechnet dann eine schwere Aufgabe und leitet die Ergebnisse an den Ergebnissammler weiter.
    • Verbraucher 2 erhält [1,3,5], berechnet dann eine schwere Aufgabe und leitet die Ergebnisse an den Ergebnissammler weiter.
    • Der Ergebnissammler berechnet die Zählungen und Teilsummen, z. B.:
    • Verbraucher1[2,4], das bedeutet 2 Zahlen, die von Verbraucher1 empfangen wurden und deren Summe 6 ist.
    • Consumer2[1,3,5], das bedeutet 3 Nummern, die von diesem Consumer2 empfangen wurden und deren Summe 9 ist.
  • Dieses Beispiel zeigt das Potenzial der verteilten Verarbeitung für die Parallelverarbeitung.

Lassen Sie uns zunächst den Produzenten „produzent.py“ erstellen, der auf Port 5555 läuft. Stellen Sie sicher, dass Sie Ihre .
anpassen

$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Dann erstellen Sie die Datei „consumer.py“ wie folgt. Vergessen Sie nicht, die beiden s im Code zu ändern.

# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Lassen Sie uns zum Schluss noch die Collector.py entwickeln und die .
erneut ändern

import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Stellen Sie sicher, dass kein Einrückungsfehler vorliegt!

$ python3 pair-server.py
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Zuerst müssen wir Collector.py ausführen. Der Collector wartet auf die Datenerfassung, bis wir den Producer starten.

$ python3 pair-client.py
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
  • Dann starten wir die Konsumenten einzeln und führen jeden Befehl in einem anderen Terminalfenster aus.
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
  • Führen Sie denselben Befehl in einem anderen Terminal aus.
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren
  • Schließlich starten wir unseren Produzenten, der Daten an unsere Pipeline sendet.
* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
Nach dem Login kopieren
Nach dem Login kopieren
Nach dem Login kopieren

Gut gemacht! ? Sie haben ZeroMQ verwendet, um Nachrichtenmuster zu entwickeln!

Das obige ist der detaillierte Inhalt vonMessaging in verteilten Systemen mit ZeroMQ. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Quelle:dev.to
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Neueste Artikel des Autors
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage