Laden Sie Celery-Worker automatisch mit einem benutzerdefinierten Django-Befehl neu

WBOY
Freigeben: 2024-07-22 09:40:11
Original
1002 人浏览过

Automatically reload Celery workers with a custom Django command

Celery hatte zuvor ein --autoreload-Flag, das inzwischen entfernt wurde. Django verfügt jedoch über ein integriertes automatisches Neuladen in seinen runserver-Befehl manage.py. Das Fehlen eines automatischen Neuladens in Celery-Workern führt zu einer verwirrenden Entwicklungserfahrung: Durch die Aktualisierung des Python-Codes wird der Django-Server mit dem aktuellen Code neu geladen, aber alle Aufgaben, die der Server auslöst, führen veralteten Code im Celery-Worker aus.

In diesem Beitrag erfahren Sie, wie Sie einen benutzerdefinierten manage.py-Runworker-Befehl erstellen, der Celery-Worker während der Entwicklung automatisch neu lädt. Der Befehl wird dem Runserver nachempfunden sein, und wir werden einen Blick darauf werfen, wie Djangos automatisches Neuladen unter der Haube funktioniert.

Bevor wir anfangen

In diesem Beitrag wird davon ausgegangen, dass Sie eine Django-App haben, auf der Celery bereits installiert ist (Anleitung). Es setzt außerdem voraus, dass Sie die Unterschiede zwischen Projekten und Anwendungen in Django verstehen.

Alle Links zum Quellcode und zur Dokumentation gelten für die aktuellen Versionen von Django und Celery zum Zeitpunkt der Veröffentlichung (Juli 2024). Wenn Sie dies in ferner Zukunft lesen, haben sich die Dinge möglicherweise geändert.

Schließlich wird das Hauptprojektverzeichnis in den Beispielen des Beitrags my_project genannt.

Lösung: ein benutzerdefinierter Befehl

Wir werden einen benutzerdefinierten manage.py-Befehl namens runworker erstellen. Da Django über seinen Runsever-Befehl ein automatisches Neuladen ermöglicht, verwenden wir den Quellcode von Runserver als Grundlage für unseren benutzerdefinierten Befehl.

Sie können einen Befehl in Django erstellen, indem Sie in einer beliebigen Anwendung Ihres Projekts ein Verzeichnis „management/commands/“ erstellen. Sobald die Verzeichnisse erstellt wurden, können Sie eine Python-Datei mit dem Namen des Befehls, den Sie erstellen möchten, in diesem Verzeichnis ablegen (docs).

Angenommen, Ihr Projekt verfügt über eine Anwendung namens polls, erstellen wir eine Datei unter polls/management/commands/runworker.py und fügen den folgenden Code hinzu:

# polls/management/commands/runworker.py

import sys
from datetime import datetime

from celery.signals import worker_init

from django.conf import settings
from django.core.management.base import BaseCommand
from django.utils import autoreload

from my_project.celery import app as celery_app


class Command(BaseCommand):
    help = "Starts a Celery worker instance with auto-reloading for development."

    # Validation is called explicitly each time the worker instance is reloaded.
    requires_system_checks = []
    suppressed_base_arguments = {"--verbosity", "--traceback"}

    def add_arguments(self, parser):
        parser.add_argument(
            "--skip-checks",
            action="store_true",
            help="Skip system checks.",
        )
        parser.add_argument(
            "--loglevel",
            choices=("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "FATAL"),
            type=str.upper,  # Transforms user input to uppercase.
            default="INFO",
        )

    def handle(self, *args, **options):
        autoreload.run_with_reloader(self.run_worker, **options)

    def run_worker(self, **options):
        # If an exception was silenced in ManagementUtility.execute in order
        # to be raised in the child process, raise it now.
        autoreload.raise_last_exception()

        if not options["skip_checks"]:
            self.stdout.write("Performing system checks...\n\n")
            self.check(display_num_errors=True)

        # Need to check migrations here, so can't use the
        # requires_migrations_check attribute.
        self.check_migrations()

        # Print Django info to console when the worker initializes.
        worker_init.connect(self.on_worker_init)

        # Start the Celery worker.
        celery_app.worker_main(
            [
                "--app",
                "my_project",
                "--skip-checks",
                "worker",
                "--loglevel",
                options["loglevel"],
            ]
        )

    def on_worker_init(self, sender, **kwargs):
        quit_command = "CTRL-BREAK" if sys.platform == "win32" else "CONTROL-C"

        now = datetime.now().strftime("%B %d, %Y - %X")
        version = self.get_version()
        print(
            f"{now}\n"
            f"Django version {version}, using settings {settings.SETTINGS_MODULE!r}\n"
            f"Quit the worker instance with {quit_command}.",
            file=self.stdout,
        )
Nach dem Login kopieren

WICHTIG: Stellen Sie sicher, dass Sie alle Instanzen von my_project durch den Namen Ihres Django-Projekts ersetzen.

Wenn Sie diesen Code kopieren und einfügen und mit der Programmierung fortfahren möchten, können Sie hier getrost aufhören, ohne den Rest dieses Beitrags zu lesen. Dies ist eine elegante Lösung, die Ihnen bei der Entwicklung Ihres Django & Celery-Projekts gute Dienste leisten wird. Wenn Sie jedoch mehr darüber erfahren möchten, wie es funktioniert, lesen Sie weiter.

Wie es funktioniert (optional)

Anstatt diesen Code Zeile für Zeile durchzugehen, werde ich die interessantesten Teile thematisch besprechen. Wenn Sie mit den benutzerdefinierten Django-Befehlen noch nicht vertraut sind, sollten Sie die Dokumentation lesen, bevor Sie fortfahren.

Automatisches Nachladen

Dieser Teil fühlt sich am magischsten an. Im Hauptteil der handle()-Methode des Befehls gibt es einen Aufruf von Djangos internem autoreload.run_with_reloader(). Es akzeptiert eine Rückruffunktion, die jedes Mal ausgeführt wird, wenn eine Python-Datei im Projekt geändert wird. Wie funktioniert das eigentlich?

Werfen wir einen Blick auf eine vereinfachte Version des Quellcodes der Funktion autoreload.run_with_reloader(). Die vereinfachte Funktion schreibt Code neu, fügt ihn ein und löscht ihn, um Klarheit über seine Funktionsweise zu schaffen.

# NOTE: This has been dramatically pared down for clarity.

def run_with_reloader(callback_func, *args, **kwargs):
    # NOTE: This will evaluate to False the first time it is run.
    is_inside_subprocess = os.getenv("RUN_MAIN") == "true"

    if is_inside_subprocess:
        # The reloader watches for Python file changes.
        reloader = get_reloader()

        django_main_thread = threading.Thread(
            target=callback_func, args=args, kwargs=kwargs
        )
        django_main_thread.daemon = True
        django_main_thread.start()

        # When the code changes, the reloader exits with return code 3.
        reloader.run(django_main_thread)

    else:
        # Returns Python path and the arguments passed to the command.
        # Example output: ['/path/to/python', './manage.py', 'runworker']
        args = get_child_arguments()

        subprocess_env = {**os.environ, "RUN_MAIN": "true"}
        while True:
            # Rerun the manage.py command in a subprocess.
            p = subprocess.run(args, env=subprocess_env, close_fds=False)
            if p.returncode != 3:
                sys.exit(p.returncode)
Nach dem Login kopieren

Wenn manage.py runworker in der Befehlszeile ausgeführt wird, ruft es zuerst die Methode handle() auf, die run_with_reloader() aufruft.

In run_with_reloader() wird geprüft, ob eine Umgebungsvariable namens RUN_MAIN den Wert „true“ hat. Beim ersten Aufruf der Funktion sollte RUN_MAIN keinen Wert haben.

Wenn RUN_MAIN nicht auf „true“ gesetzt ist, tritt run_with_reloader() in eine Schleife ein. Innerhalb der Schleife wird ein Unterprozess gestartet, der die übergebene Datei manage.py [Befehlsname] erneut ausführt, und dann darauf warten, dass dieser Unterprozess beendet wird. Wenn der Unterprozess mit Rückkehrcode 3 beendet wird, startet die nächste Iteration der Schleife einen neuen Unterprozess und wartet. Die Schleife wird ausgeführt, bis ein Unterprozess einen Exit-Code zurückgibt, der nicht 3 ist (oder bis der Benutzer mit Strg + C beendet wird). Sobald es einen Rückkehrcode ungleich 3 erhält, wird das Programm vollständig beendet.

Der erzeugte Unterprozess führt den Befehl manage.py erneut aus (in unserem Fall manage.py runworker), und der Befehl ruft erneut run_with_reloader() auf. Dieses Mal wird RUN_MAIN auf „true“ gesetzt, da der Befehl in einem Unterprozess ausgeführt wird.

Da run_with_reloader() nun weiß, dass es sich in einem Unterprozess befindet, ruft es einen Reloader auf, der auf Dateiänderungen wartet, fügt die bereitgestellte Rückruffunktion in einen Thread ein und übergibt sie an den Reloader, der mit der Suche nach Änderungen beginnt.

Wenn ein Reloader eine Dateiänderung erkennt, führt er sys.exit(3) aus. Dadurch wird der Unterprozess beendet, was die nächste Iteration der Schleife aus dem Code auslöst, der den Unterprozess erzeugt hat. Im Gegenzug wird ein neuer Unterprozess gestartet, der eine aktualisierte Version des Codes verwendet.

Pemeriksaan & migrasi sistem

Secara lalai, arahan Django melakukan semakan sistem sebelum ia menjalankan kaedah handle() mereka. Walau bagaimanapun, dalam kes runserver & perintah runworker tersuai kami, kami akan menangguhkan menjalankannya sehingga kami berada di dalam panggilan balik yang kami sediakan untuk run_with_reloader(). Dalam kes kami, ini ialah kaedah run_worker() kami. Ini membolehkan kami menjalankan arahan dengan muat semula automatik sambil membetulkan semakan sistem yang rosak.

Untuk menangguhkan menjalankan semakan sistem, nilai atribut require_system_checks ditetapkan kepada senarai kosong dan semakan dilakukan dengan memanggil self.check() dalam badan run_worker(). Seperti runserver, perintah runworker tersuai kami juga menyemak untuk melihat sama ada semua migrasi telah dijalankan dan ia memaparkan amaran jika terdapat migrasi yang belum selesai.

Oleh kerana kami sudah melakukan semakan sistem Django dalam kaedah run_worker(), kami melumpuhkan semakan sistem dalam Celery dengan menghantarnya bendera --skip-checks untuk mengelakkan kerja pendua.

Semua kod yang berkaitan dengan semakan dan pemindahan sistem telah ditarik balik terus daripada kod sumber arahan runserver.

celery_app.worker_main()

Pelaksanaan kami melancarkan pekerja Saderi terus daripada Python menggunakan celery_app.worker_main() dan bukannya menggunakan Saderi.

on_worker_init()

Kod ini dilaksanakan apabila pekerja dimulakan, memaparkan tarikh & masa, versi Django dan arahan untuk berhenti. Ia dimodelkan selepas maklumat yang dipaparkan apabila runserver but.

Pelat dandang runserver lain

Barisan berikut juga telah diangkat dari sumber runserver:

  • suppressed_base_arguments = {"--verbosity", "--traceback"}
  • autoreload.raise_last_exception()

Tahap log

Arahan tersuai kami mempunyai tahap log boleh dikonfigurasikan sekiranya pembangun ingin melaraskan tetapan daripada CLI tanpa mengubah suai kod.

Melangkah lebih jauh

Saya mencucuk-dan-menucuk kod sumber untuk Django & Celery untuk membina pelaksanaan ini, dan terdapat banyak peluang untuk melanjutkannya. Anda boleh mengkonfigurasi arahan untuk menerima lebih banyak hujah pekerja Celery. Sebagai alternatif, anda boleh mencipta perintah manage.py tersuai yang memuatkan semula sebarang perintah shell secara automatik seperti yang David Browne lakukan dalam Gist ini.

Jika anda mendapati ini berguna, sila tinggalkan suka atau komen. Terima kasih kerana membaca.

以上是Laden Sie Celery-Worker automatisch mit einem benutzerdefinierten Django-Befehl neu的详细内容。更多信息请关注PHP中文网其他相关文章!

Quelle:dev.to
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage
Über uns Haftungsausschluss Sitemap
Chinesische PHP-Website:Online-PHP-Schulung für das Gemeinwohl,Helfen Sie PHP-Lernenden, sich schnell weiterzuentwickeln!