スケジュールされたタスクを通じて 2 つ (複数) のシステム間で特定のデータを同期する必要が生じたことがありますか?異種システムにおける異なるプロセス間の相互呼び出しや通信の問題に悩んでいませんか? 「はい」の場合は、おめでとうございます。メッセージング サービスを使用すると、これらの問題を簡単に解決できます。メッセージ サービスは、複数のシステムや異種システム間のデータ交換 (メッセージ通知/通信) 問題の解決に優れています。また、システム間のサービスの相互呼び出し (RPC) にも使用できます。今回紹介する RabbitMQ は、現在最も主流のメッセージミドルウェアの 1 つです。
AMQP (Advanced Message Queuing Protocol) は、アプリケーション層プロトコルのオープン標準であり、メッセージ指向の中間部分の設計です。メッセージ ミドルウェアは、主にコンポーネント間の分離に使用されます。メッセージの送信者はメッセージ コンシューマーの存在を知る必要はありません。また、その逆も同様です。
AMQP の主な機能は、メッセージ指向、キュー、ルーティング (ポイントツーポイントおよびパブリッシュ/サブスクライブを含む)、信頼性、およびセキュリティです。
RabbitMQ は、オープン ソースの AMQP 実装であり、サーバー側は Erlang 言語で記述され、Python、Ruby、.NET、Java、JMS、C などのさまざまなクライアントをサポートします。 、PHP、ActionScript、XMPP、STOMP などが AJAX をサポートしています。分散システムでメッセージを保存および転送するために使用され、使いやすさ、拡張性、高可用性の点で優れたパフォーマンスを発揮します。 Web サイトは次のとおりです: http://www.rabbitmq.com/ さまざまな言語チュートリアルとサンプル コードがあります
これらの概念を理解すると、AMPQ プロトコルはさまざまなメッセージ キューのニーズを満たすために概念的に複雑になります。 , RabbitMQを上手に使うための基礎となります。
仮想ホスト (仮想ホスト): 仮想ホストは、一連のスイッチ、キュー、バインディングを保持します。なぜ複数の仮想ホストが必要なのでしょうか? RabbitMQ では、ユーザーは仮想ホストの粒度でのみアクセス許可を制御できます。したがって、グループ A がグループ B のスイッチ/キュー/バインディングにアクセスできないようにする必要がある場合は、A と B の両方に仮想ホストを作成する必要があります。すべての RabbitMQ サーバーにはデフォルトの仮想ホスト「/」があります。
RabbitMQ サーバーは複数の仮想ホストを持つことができ、ユーザーと権限の設定は仮想ホストに依存します。一般的な PHP アプリケーションの場合、ユーザー権限の設定は必要ありません。デフォルトで存在する「/」を使用するだけで済みます。簡単な設定例:
$conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/');
接続は、クライアントとサーバーの間に複数の接続を確立できることを指します。チャネルは論理的な接続として理解できます。一般的なアプリケーションでは、1 つのチャネルで十分であり、さらにチャネルを作成する必要はありません。サンプルコード:
//创建连接和 channel$conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($conn);
さまざまな種類のメッセージを区別するために、Exchange スイッチと Route ルーティングの 2 つの概念が設定されています。たとえば、タイプ A のメッセージを「C1」という名前のスイッチに送信し、タイプ B のメッセージを「C2」という名前のスイッチに送信します。クライアントが C1 に接続してキュー メッセージを処理する場合、タイプ A のメッセージのみを取得します。さらに、タイプ A のメッセージが多数ある場合、区別をさらに改良する必要があります。たとえば、クライアントが K 人のユーザーに対するタイプ A のメッセージのみを処理する場合、この目的にルーティングキーが使用されます。
$e_name = 'e_linvo'; //交换机名 $k_route = array(0=> 'key_1', 1=> 'key_2'); //路由key //创建交换机 $ex = new AMQPExchange($channel);$ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $ex->setFlags(AMQP_DURABLE); //持久化 echo "Exchange Status:".$ex->declare()."\n";for($i=0; $ipublish($message . date('H:i:s'), $k_route[i%2])."\n"; }
上記のコードからわかるように、メッセージを送信するときは「スイッチ」だけで十分です。スイッチの背後に対応する処理キューがあるかどうかについては、送信側は気にする必要はありません。 routingkey は空の文字列にすることができます。この例では、以下のルーティング キーの役割を理解しやすくするために、2 つのキーを使用してメッセージを交互に送信します。
スイッチには、2 つの重要な概念があります。
スイッチ (交換): ルーティング テーブルを備えたルーティング プログラムとして理解できます。各メッセージには、単純な文字列であるルーティング キーがあります。スイッチには一連のバインディング、つまりルーティング ルール (ルート) があります。複数のスイッチが存在する可能性があります。複数のキューを同じスイッチにバインドでき、複数のスイッチを同じキューにバインドすることもできます。 (多対多の関係)
A、と入力します。次の 3 つのタイプがあります:
1. ファンアウト交換 (ルーティング キーを処理しません): 交換に送信されたメッセージは、交換にバインドされているすべてのキューに転送されます。ファンアウト スイッチはメッセージを最も速く送信します。
2. 直接交換 (ルーティング キーの処理): キューが交換にバインドされており、現在の要件がルーティング キーが X である場合、ルーティング キーが X であるメッセージのみがこのキューによって転送されます。 。
3. トピック交換 (ルーティング キーを特定のパターンに一致させる、ファジー処理として理解できます): ルーティング キーの単語は「.」で区切られ、記号「#」は一致を意味します。 0 個以上の単語の場合、記号「*」は 1 単語以上または以下の単語と一致することを意味します。
类型总结:Fanout类型最简单,这种模型忽略routingkey;Direct类型是使用最多的,使用确定的routingkey。这种模型下,接收消息时绑定’key_1′则只接收key_1的消息;最后一种是Topic,这种模式与Direct类似,但是支持通配符进行匹配,比如: ‘key_*’,就会接受key_1和key_2。Topic貌似美好,但是有可能导致不严谨,所以还是推荐使用Direct。
B,持久化。指定了持久化的交换机,在重新启动时才能重建,否则需要客户端重新声明生成才行。
需要特别明确的概念:交换机的持久化,并不等于消息的持久化。只有在持久化队列中的消息,才能持久化;如果没有队列,消息是没有地方存储的;消息本身在投递时也有一个持久化标志的,PHP中默认投递到持久化交换机就是持久的消息,不用特别指定。
讲了这么多,才讲到队列呀。事实上,队列仅是针对接收方(consumer)的,由接收方根据需求创建的。只有队列创建了,交换机才会将新接受到的消息送到队列中,交换机是不会在队列创建之前的消息放进来的。换句话说,在建立队列之前,发出的所有消息都被丢弃了。下面这个图比RabbitMQ官方的图更清楚——Queue是属于ReceiveMessage的一部分。
接下来看一下创建队列及接收消息的示例:
$e_name = 'e_linvo'; //交换机名 $q_name = 'q_linvo'; //队列名 $k_route = ''; //路由key //创建连接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($conn); //创建交换机 $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $ex->setFlags(AMQP_DURABLE); //持久化 echo "Exchange Status:".$ex->declare()."\n"; //创建队列 $q = new AMQPQueue($channel); $q->setName($q_name);$q->setFlags(AMQP_DURABLE); //持久化 //绑定交换机与队列,并指定路由键 echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n"; //阻塞模式接收消息 echo "Message:\n";$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 $conn->disconnect(); /** * 消费回调函数 * 处理消息 */ function processMessage($envelope, $queue) { var_dump($envelope->getRoutingKey); $msg = $envelope->getBody(); echo $msg."\n"; //处理消息}
从上述示例中可以看到,交换机既可以由消息发送端创建,也可以由消息消费者创建。
创建一个队列(line:20)后,需要将队列绑定到交换机上(line:25)队列才能工作,routingkey也是在这里指定的。有的资料上写成bindingkey,其实一回事儿,弄两个名词反倒容易混淆。
消息的处理,是有两种方式:
A,一次性。用 $q->get([...]),不管取到取不到消息都会立即返回,一般情况下使用轮询处理消息队列就要用这种方式;
B,阻塞。用 $q->consum( callback, [...] ) 程序会进入持续侦听状态,每收到一个消息就会调用callback指定的函数一次,直到某个callback函数返回FALSE才结束。
关于callback,这里多说几句: PHP的call_back是支持使用数组的,比如: $c = new MyClass(); $c->counter = 100; $q->consume( array($c,’myfunc’) ) 这样就可以调用自己写的处理类。MyClass中myfunc的参数定义,与上例中processMessage一样就行。
在上述示例中,使用的$routingkey = ”, 意味着接收全部的消息。我们可以将其改为 $routingkey = ‘key_1′,可以看到结果中仅有设置routingkey为key_1的内容了。
注意: routingkey = ‘key_1′ 与 routingkey = ‘key_2′ 是两个不同的队列。假设: client1 与 client2 都连接到 key_1 的队列上,一个消息被client1处理之后,就不会被client2处理。而 routingkey = ” 是另类,client_all绑定到 ” 上,将消息全都处理后,client1和client2上也就没有消息了。
在程序设计上,需要规划好exchange的名称,以及如何使用key区分开不同类型的标记,在消息产生的地方插入发送消息代码。后端处理,可以针对每一个key启动一个或多个client,以提高消息处理的实时性。如何使用PHP进行多线程的消息处理,将在下一节中讲述。
#操作系统:CentOS release 6.2yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel java-devel unixODBC-devel;
访问 官网下载页
wget http://www.erlang.org/download/otp_src_R16B03.tar.gz;tar -zxvf otp_src_R16B03.tar.gz;cd otp_src_R16B03;./configure --prefix=/usr/local/erlang --with-ssl -enable-threads -enable-smmp-support -enable-kernel-poll --enable-hipe --without-javac;#不用java编译,故去掉java避免错误make && make install;
#vi /etc/profile在文件最后加入:PATH=$PATH:/usr/local/erlang/binexport PATH#source /etc/profile
访问 官网YUM安装教程
#自动安装erlang的YUM源 wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpmrpm -Uvh erlang-solutions-1.0-1.noarch.rpm #或手动安装YUM源 rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.ascAdd the following lines to some file in /etc/yum.repos.d/:[erlang-solutions]name=Centos $releasever - $basearch - Erlang Solutionsbaseurl=http://packages.erlang-solutions.com/rpm/centos/$releasever/$basearchgpgcheck=1gpgkey=http://packages.erlang-solutions.com/rpm/erlang_solutions.ascenabled=1
yum -y install erlang;
安装完后输入“erl”以下提示即为安装成功:
[root@localhost ~]# erlErlang/OTP 18 [erts-7.2] [source-e6dd627] [64-bit] [async-threads:10] [hipe] [kernel-poll:false]Eshell V7.2 (abort with ^G)