需求场景:
智能家居网关(以下简称gateway),需要和netty服务器通讯(以下简称netty),netty和gateway之间需要保持长连接(换句话说,netty和gateway之间都会主动给对方发送消息)
碰到的问题:
netty作为服务器端如何主动的向gateway发送消息,我尝试当每个gateway连接到netty(TCP/IP)时使用一个map把该channelSocket的id和该channelSocket绑定在一起
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String uuid = ctx.channel().id().asLongText();
GatewayService.addGatewayChannel(uuid, (SocketChannel)ctx.channel());
System.out.println("a new connect come in: " + uuid);
}
GatewayService其实就是一个ConcurrentHashMap
public class GatewayService {
private static Map<String, SocketChannel> map = new ConcurrentHashMap<>();
public static void addGatewayChannel(String id, SocketChannel gateway_channel){
map.put(id, gateway_channel);
}
public static Map<String, SocketChannel> getChannels(){
return map;
}
public static SocketChannel getGatewayChannel(String id){
return map.get(id);
}
public static void removeGatewayChannel(String id){
map.remove(id);
}
}
我在服务器端尝试每间隔一段时间loop这个ConcurrentHashMap如果里面已经有绑定的channelSocket,就使用write方法向客户端发送消息
Runnable sendTask = new Runnable() {
@Override
public void run() {
sendTaskLoop:
for(;;){
System.out.println("task is beginning...");
try{
Map<String, SocketChannel> map = GatewayService.getChannels();
Iterator<String> it = map.keySet().iterator();
while (it.hasNext()) {
String key = it.next();
SocketChannel obj = map.get(key);
System.out.println("channel id is: " + key);
System.out.println("channel: " + obj.isActive());
obj.writeAndFlush("hello, it is Server test header ping");
}
}catch(Exception e){break sendTaskLoop;}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
new Thread(sendTask).start();
理论上客户端应该是可以接受到我发送的消息,但是我观察了一下源代码,发现writeAndFlush这个方法最终会被handler触发,于是我又在handler中覆写了write方法
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("write handler");
ctx.writeAndFlush(msg);
}
可是最终结果客户端并没有收到任何消息,请问netty如何主动向客户端发送消息?
On peut à peu près deviner que le serveur n'a pas d'encodeur, il ne peut donc pas être envoyé du tout
D'après le code publié, vous pouvez voir qu'il existe deux façons d'envoyer des messages au client :
et
Dans netty, toutes les entrées et sorties sont
ByteBuf
L'auteur doit confirmer si le serveur dispose d'un encodeur correspondant et convertir la chaîne enByteBuf
.GatewayService n'a pas besoin d'être forcé à se convertir en type SocketChannel lors de l'enregistrement du canal
.Il n'y a aucun problème pour obtenir le canal et exécuter writeAndFlush.
Afin de résoudre davantage le problème, vous pouvez procéder comme suit :
1. Surveiller le résultat de la promesse de ctx.writeAndFlush(msg);, par exemple
2. Faites attention à ce que l'encodeur et le décodeur des deux côtés du client et du serveur soient identiques
Il est recommandé d'utiliser ChannelGroup pour gérer les clients. La raison pour laquelle le serveur leader ne peut pas transmettre de données peut être due au fait qu'il n'y a pas d'encodeur. ctx.writeAndFlush ne peut pas écrire directement les types de chaîne.
Bonjour, votre code peut être modifié comme ceci :
Le code du handler est modifié comme suit :
@Override
GatewayService :
classe publique GatewayService {
}
Le code pour parcourir le client est le suivant :
la classe publique TimeTask implémente Runnable{
// obj.writeAndFlush("bonjour, c'est le ping de l'en-tête de test du serveur");
}
C'est tout. Je l'ai personnellement testé et ça marche