Maison > cadre php > Laravel > Que dois-je faire si l'extension de laravel-swoole dans laravel8 n'est pas compatible avec la file d'attente des messages ?

Que dois-je faire si l'extension de laravel-swoole dans laravel8 n'est pas compatible avec la file d'attente des messages ?

藏色散人
Libérer: 2021-05-21 08:59:05
avant
2784 Les gens l'ont consulté
Ci-dessous

Laravel La colonne tutoriel présentera la file d'attente de messages Laravel-Swoole, j'espère que cela sera utile aux amis dans le besoin !

Pendant cette période, j'ai utilisé laravel8+laravel-swoole pour réaliser des projets, et j'ai découvert que l'extension de laravel-swoole n'est pas compatible avec les files d'attente de messages ;


Après en y réfléchissant, que dois-je faire ? , que dois-je faire ? Écrivez-le vous-même ! Heureusement, l'extension thinkphp-swoole est déjà compatible, alors


Téléchargez directement la version modifiée ! idées et code ! Commençons !

La première consiste à ajouter une autre commande de démarrage ou à démarrer la file d'attente des messages pour la consommation au démarrage de swoole. Une personne paresseuse comme moi peut le résoudre avec une seule commande et n'écrira jamais deux commandes. .

Réécrivez d'abord la commande de démarrage swoole

<?php

namespace crmeb\swoole\command;


use Illuminate\Support\Arr;
use Swoole\Process;
use SwooleTW\Http\Server\Facades\Server;
use SwooleTW\Http\Server\Manager;
use crmeb\swoole\server\InteractsWithQueue;
use crmeb\swoole\server\FileWatcher;
use Swoole\Runtime;

class HttpServerCommand extends \SwooleTW\Http\Commands\HttpServerCommand
{
    use InteractsWithQueue;

    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = &#39;crmeb:http {action : start|stop|restart|reload|infos}&#39;;

    /**
     * Run swoole_http_server.
     */
    protected function start()
    {
        if ($this->isRunning()) {
            $this->error(&#39;Failed! swoole_http_server process is already running.&#39;);

            return;
        }

        $host             = Arr::get($this->config, &#39;server.host&#39;);
        $port             = Arr::get($this->config, &#39;server.port&#39;);
        $hotReloadEnabled = Arr::get($this->config, &#39;hot_reload.enabled&#39;);
        $queueEnabled     = Arr::get($this->config, &#39;queue.enabled&#39;);
        $accessLogEnabled = Arr::get($this->config, &#39;server.access_log&#39;);
        $coroutineEnable  = Arr::get($this->config, &#39;coroutine.enable&#39;);

        $this->info(&#39;Starting swoole http server...&#39;);
        $this->info("Swoole http server started: <http://{$host}:{$port}>");
        if ($this->isDaemon()) {
            $this->info(
                &#39;> (You can run this command to ensure the &#39; .
                &#39;swoole_http_server process is running: ps aux|grep "swoole")&#39;
            );
        }

        $manager = $this->laravel->make(Manager::class);
        $server  = $this->laravel->make(Server::class);

        if ($accessLogEnabled) {
            $this->registerAccessLog();
        }
        //热更新重写
        if ($hotReloadEnabled) {
            $manager->addProcess($this->getHotReloadProcessNow($server));
        }
        //启动消息队列进行消费
        if ($queueEnabled) {
            $this->prepareQueue($manager);
        }

        if ($coroutineEnable) {
            Runtime::enableCoroutine(true, Arr::get($this->config, &#39;coroutine.flags&#39;, SWOOLE_HOOK_ALL));
        }

        $manager->run();
    }

    /**
     * @param Server $server
     * @return Process|void
     */
    protected function getHotReloadProcessNow($server)
    {
        return new Process(function () use ($server) {
            $watcher = new FileWatcher(
                Arr::get($this->config, &#39;hot_reload.include&#39;, []),
                Arr::get($this->config, &#39;hot_reload.exclude&#39;, []),
                Arr::get($this->config, &#39;hot_reload.name&#39;, [])
            );

            $watcher->watch(function () use ($server) {
                $server->reload();
            });
        }, false, 0, true);
    }

}
Copier après la connexion

Classe InteractsWithQueue

<?php
namespace crmeb\swoole\server;


use crmeb\swoole\queue\Manager as QueueManager;
use SwooleTW\Http\Server\Manager;

/**
 * Trait InteractsWithQueue
 * @package crmeb\swoole\server
 */
trait InteractsWithQueue
{
    public function prepareQueue(Manager $manager)
    {
        /** @var QueueManager $queueManager */
        $queueManager = $this->laravel->make(QueueManager::class);

        $queueManager->attachToServer($manager, $this->output);
    }
}
Copier après la connexion

Classe Manager

<?php
namespace crmeb\swoole\queue;


use Illuminate\Contracts\Container\Container;
use Swoole\Constant;
use Swoole\Process;
use Swoole\Process\Pool;
use Swoole\Timer;
use Illuminate\Support\Arr;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Worker;
use crmeb\swoole\server\WithContainer;
use Illuminate\Queue\Jobs\Job;
use function Swoole\Coroutine\run;
use Illuminate\Queue\WorkerOptions;
use SwooleTW\Http\Server\Manager as ServerManager;
use Illuminate\Console\OutputStyle;

class Manager
{
    use WithContainer;

    /**
     * Container.
     *
     * @var \Illuminate\Contracts\Container\Container
     */
    protected $container;

    /**
     * @var OutputStyle
     */
    protected $output;

    /**
     * @var Closure[]
     */
    protected $workers = [];

    /**
     * Manager constructor.
     * @param Container $container
     */
    public function __construct(Container $container)
    {
        $this->container = $container;
    }

    /**
     * @param ServerManager $server
     */
    public function attachToServer(ServerManager $server, OutputStyle $output)
    {
        $this->output = $output;
        $this->listenForEvents();
        $this->createWorkers();
        foreach ($this->workers as $worker) {
            $server->addProcess(new Process($worker, false, 0, true));
        }
    }

    /**
     * 运行消息队列命令
     */
    public function run(): void
    {
        @cli_set_process_title("swoole queue: manager process");

        $this->listenForEvents();
        $this->createWorkers();

        $pool = new Pool(count($this->workers));

        $pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, int $workerId) {
            $process = $pool->getProcess($workerId);
            run($this->workers[$workerId], $process);
        });

        $pool->start();
    }

    /**
     * 创建执行任务
     */
    protected function createWorkers()
    {
        $workers = $this->getConfig(&#39;queue.workers&#39;, []);

        foreach ($workers as $queue => $options) {

            if (strpos($queue, &#39;@&#39;) !== false) {
                [$queue, $connection] = explode(&#39;@&#39;, $queue);
            } else {
                $connection = null;
            }

            $this->workers[] = function (Process $process) use ($options, $connection, $queue) {

                @cli_set_process_title("swoole queue: worker process");

                /** @var Worker $worker */
                $worker = $this->container->make(&#39;queue.worker&#39;);
                /** @var WorkerOptions $option */
                $option = $this->container->make(WorkerOptions::class);

                $option->sleep = Arr::get($options, "sleep", 3);
                $option->maxTries = Arr::get($options, "tries", 0);
                $option->timeout = Arr::get($options, "timeout", 60);

                $timer = Timer::after($option->timeout * 1000, function () use ($process) {
                    $process->exit();
                });

                $worker->runNextJob($connection, $queue, $option);

                Timer::clear($timer);
            };
        }
    }

    /**
     * 注册事件
     */
    protected function listenForEvents()
    {
        $this->container->make(&#39;events&#39;)->listen(JobFailed::class, function (JobFailed $event) {
            $this->writeOutput($event->job);

            $this->logFailedJob($event);
        });
    }

    /**
     * 记录失败任务
     * @param JobFailed $event
     */
    protected function logFailedJob(JobFailed $event)
    {
        $this->container[&#39;queue.failer&#39;]->log(
            $event->connection,
            $event->job->getQueue(),
            $event->job->getRawBody(),
            $event->exception
        );
    }

    /**
     * Write the status output for the queue worker.
     *
     * @param Job $job
     * @param     $status
     */
    protected function writeOutput(Job $job, $status)
    {
        switch ($status) {
            case &#39;starting&#39;:
                $this->writeStatus($job, &#39;Processing&#39;, &#39;comment&#39;);
                break;
            case &#39;success&#39;:
                $this->writeStatus($job, &#39;Processed&#39;, &#39;info&#39;);
                break;
            case &#39;failed&#39;:
                $this->writeStatus($job, &#39;Failed&#39;, &#39;error&#39;);
                break;
        }
    }

    /**
     * Format the status output for the queue worker.
     *
     * @param Job $job
     * @param string $status
     * @param string $type
     * @return void
     */
    protected function writeStatus(Job $job, $status, $type)
    {
        $this->output->writeln(sprintf(
            "<{$type}>[%s][%s] %s</{$type}> %s",
            date(&#39;Y-m-d H:i:s&#39;),
            $job->getJobId(),
            str_pad("{$status}:", 11), $job->getName()
        ));
    }

}
Copier après la connexion

Ajoutez la classe CrmebServiceProvider

<?php
namespace crmeb\swoole;


use Illuminate\Contracts\Debug\ExceptionHandler;
use Illuminate\Contracts\Http\Kernel;
use crmeb\swoole\command\HttpServerCommand;
use Illuminate\Queue\Worker;
use SwooleTW\Http\HttpServiceProvider;
use SwooleTW\Http\Middleware\AccessLog;
use SwooleTW\Http\Server\Manager;

/**
 * Class CrmebServiceProvider
 * @package crmeb\swoole
 */
class CrmebServiceProvider extends HttpServiceProvider
{



    /**
     * Register manager.
     *
     * @return void
     */
    protected function registerManager()
    {
        $this->app->singleton(Manager::class, function ($app) {
            return new Manager($app, &#39;laravel&#39;);
        });

        $this->app->alias(Manager::class, &#39;swoole.manager&#39;);

        $this->app->singleton(&#39;queue.worker&#39;, function ($app) {
            $isDownForMaintenance = function () {
                return $this->app->isDownForMaintenance();
            };

            return new Worker(
                $app[&#39;queue&#39;],
                $app[&#39;events&#39;],
                $app[ExceptionHandler::class],
                $isDownForMaintenance
            );
        });
    }

    /**
     * Boot websocket routes.
     *
     * @return void
     */
    protected function bootWebsocketRoutes()
    {
        require base_path(&#39;vendor/swooletw/laravel-swoole&#39;) . &#39;/routes/laravel_routes.php&#39;;
    }

    /**
     * Register access log middleware to container.
     *
     * @return void
     */
    protected function pushAccessLogMiddleware()
    {
        $this->app->make(Kernel::class)->pushMiddleware(AccessLog::class);
    }

    /**
     * Register commands.
     */
    protected function registerCommands()
    {
        $this->commands([
            HttpServerCommand::class,
        ]);
    }

    /**
     * Merge configurations.
     */
    protected function mergeConfigs()
    {
        $this->mergeConfigFrom(base_path(&#39;vendor/swooletw/laravel-swoole&#39;) . &#39;/config/swoole_http.php&#39;, &#39;swoole_http&#39;);
        $this->mergeConfigFrom(base_path(&#39;vendor/swooletw/laravel-swoole&#39;) . &#39;/config/swoole_websocket.php&#39;, &#39;swoole_websocket&#39;);
    }

    /**
     * Publish files of this package.
     */
    protected function publishFiles()
    {
        $this->publishes([
            base_path(&#39;vendor/swooletw/laravel-swoole&#39;) . &#39;/config/swoole_http.php&#39; => base_path(&#39;config/swoole_http.php&#39;),
            base_path(&#39;vendor/swooletw/laravel-swoole&#39;) . &#39;/config/swoole_websocket.php&#39; => base_path(&#39;config/swoole_websocket.php&#39;),
            base_path(&#39;vendor/swooletw/laravel-swoole&#39;) . &#39;/routes/websocket.php&#39; => base_path(&#39;routes/websocket.php&#39;),
        ], &#39;laravel-swoole&#39;);
    }
}
Copier après la connexion

puis ajoutez

Mettez-le dans crmebswooleCrmebServiceProvider::class et chargez-le dans config/app.php Réécrivez la méthode de démarrage de la commande swoole 🎜>Une fois swoole démarré, la file d'attente peut être automatiquement consommée. providers

Recommandations associées : config/swoole_http.phpLes cinq derniers didacticiels vidéo Laravel


Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:learnku.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal