Preface
Everyone is familiar with RabbitMq. This article mainly shares the encapsulation of RabbitMQ.Client after learning rabbitmq. At the end of the article, I will present the encapsulated components and demo.
Operation of rabbitmq
As can be seen from the figure below, the publisher (Publisher) first sends the message to the exchange (Exchange), and then sends it from the exchange to the specified queue (Queue). The binding relationship between the exchange and the queue has been declared before, and the final consumption The customer (Customer) can subscribe to or actively obtain specified queue messages for consumption.
Then the subscription and active retrieval just mentioned can be understood as push (passive) and pull (active).
Push, as long as a message is added to the queue, idle consumers will be notified to consume. (If I don’t look for you, I’ll just wait for you to look for me, observer mode)
Pull, the consumer will not be notified, but the consumer will take the initiative to fetch the queue messages in a round-robin manner or at regular intervals. (I only go to you when I need it)
Let me give you an example of a usage scenario. Suppose there are two systems: an order system and a shipping system. Shipping message instructions are initiated from the order system. In order to ship goods in a timely manner, the shipping system needs to subscribe to the queue and process them as long as there are instructions.
However, the program occasionally encounters exceptions, such as network or DB timeout, and the message is thrown into the failure queue. In this case, a resend mechanism is needed. But I don’t want to do while(IsPostSuccess == True), because as long as an exception occurs, there will be exceptions within a certain period of time, and such retry is meaningless.
At this time, there is no need to process the message in time. There is a JOB to fetch the failed queue message regularly or every few minutes (number of failures * interval minutes) and resend it.
Publish package
Steps: Initialize the link->Declare the exchanger->Declare the queue->Change the machine and queue binding->Publish the message. Note that I saved the Model in ConcurrentDictionary because declaration and binding are very time-consuming. Secondly, sending messages to repeated queues does not require re-initialization.
View Code
Next time is a screenshot of the publishing speed of the native test:
4.2W/S is a stable speed, and deserialization (ToJson) will be slightly faster.
Subscription package
When publishing, the exchanger and queue are declared and bound, but when subscribing, you only need to declare the queue. As you can see from the code below, when an exception is caught, the message will be sent to the custom "dead letter queue" and resent regularly by another JOB. Therefore, the finally response is successful.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
private static IModel GetModel(string queue, bool isProperties = false)
{ return ModelDic.GetOrAdd(queue, value =>
{ var model = _conn.CreateModel();
QueueDeclare(model, queue, isProperties);
model.BasicQos(0, 1, false);
ModelDic[queue] = model; return model;
});
}
public void Subscribe<T>(string queue, bool isProperties, Action<T> handler, bool isDeadLetter) where T : class
{
var channel = GetModel(queue, isProperties); var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{ var body = ea.Body; var msgStr = body.DeserializeUtf8(); var msg = msgStr.FromJson<T>(); try
{
handler(msg);
} catch (Exception ex)
{
ex.GetInnestException().WriteToFile( "队列接收消息" , "RabbitMq" ); if (!isDeadLetter)
PublishToDead<DeadLetterQueue>(queue, msgStr, ex);
} finally
{
channel.BasicAck(ea.DeliveryTag, false);
}
};
channel.BasicConsume(queue, false, consumer);
}
|
Copy after login
View Code
Next time is a screenshot of the publishing speed of the native test:
When it is fast, it is 1.9K/S, and when it is slow, it is 1.7K/S.
Pull package
Go directly to the code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
private void Poll<T>(string exchange, string queue, string routingKey, Action<T> handler) where T : class
{ var channel = GetModel(exchange, queue, routingKey); var result = channel.BasicGet(queue, false); if (result.IsNull()) return ; var msg = result.Body.DeserializeUtf8().FromJson<T>(); try
{
handler(msg);
} catch (Exception ex)
{
ex.GetInnestException().WriteToFile( "队列接收消息" , "RabbitMq" );
} finally
{
channel.BasicAck(result.DeliveryTag, false);
}
}
|
Copy after login
View Code
When it is fast, it is 1.8K/s, and when it is stable, it is 1.5K/S.
Rpc(远程调用)的封装
首先说明下,RabbitMq只是提供了这个RPC的功能,但是并不是真正的RPC,为什么这么说:
1、传统Rpc隐藏了调用细节,像调用本地方法一样传参、抛出异常
2、RabbitMq的Rpc是基于消息的,消费者消费后,通过新队列返回响应结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false)
{ var channel = GetModel(exchange, queue, routingKey, isProperties); var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queue, true, consumer); try
{ var correlationId = Guid.NewGuid().ToString(); var basicProperties = channel.CreateBasicProperties();
basicProperties.ReplyTo = queue;
basicProperties.CorrelationId = correlationId;
channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8()); var sw = Stopwatch.StartNew(); while (true)
{ var ea = consumer.Queue.Dequeue(); if (ea.BasicProperties.CorrelationId == correlationId)
{ return ea.Body.DeserializeUtf8();
} if (sw.ElapsedMilliseconds > 30000) throw new Exception( "等待响应超时" );
}
} catch (Exception ex)
{ throw ex.GetInnestException();
}
}
public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter)
{
var channel = GetModel(queue, isProperties); var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{ var body = ea.Body; var msgStr = body.DeserializeUtf8(); var msg = msgStr.FromJson<T>(); var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId; try
{
msg = handler(msg);
} catch (Exception ex)
{
ex.GetInnestException().WriteToFile( "队列接收消息" , "RabbitMq" );
} finally
{
channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8());
channel.BasicAck(ea.DeliveryTag, false);
}
};
channel.BasicConsume(queue, false, consumer);
}
|
Copy after login
View Code
可以用,但不建议去用。可以考虑其他的RPC框架。grpc、thrift等。
结尾
本篇文章,没有过多的写RabbitMq的知识点,因为园子的学习笔记实在太多了。下面把我的代码奉上 //m.sbmmt.com/ 。如果有发现写得不对的地方麻烦在评论指出,我会及时修改以免误导别人。
如果本篇文章您有用,请点击一下推荐,谢谢大家阅读。