Apache Mina Server는 네트워크 통신 응용 프레임워크, 즉 주로 TCP/IP 및 UDP/IP 프로토콜 스택 기반의 통신 프레임워크입니다(물론 Java 객체 직렬화 서비스도 제공할 수 있음). , 가상 머신 파이프라인 통신 서비스 등), 미나
Mina는 이벤트 기반 비동기(Mina의 비동기 IO는 기본적으로 기본 지원으로 Java NIO를 사용함) 작업을 위한 프로그래밍 모델을 제공하여 고성능, 확장성이 뛰어난 네트워크 통신 애플리케이션을 신속하게 개발하는 데 도움이 됩니다. 미나
크게 1.x와 2.x 두가지 브랜치가 있는데, 여기서는 최신 버전 2.0을 설명합니다. Mina 1.x를 사용하는 경우 일부 기능이 적용되지 않을 수 있습니다. 이 문서를 연구하려면 JAVA IO, JAVA NIO, JAVASocket, JAVA 스레드 및 동시성 라이브러리(java.util.concurrent.*)에 대한 지식이 필요합니다. Mina는 또한 네트워크 통신의 서버 측과 클라이언트 측에 대한 캡슐화를 제공합니다. Mina는 전체 Netcom 통신 구조에서 다음과 같은 위치에 있습니다. Mina의 API는 애플리케이션에서 실제 네트워크 통신을 분리한다는 것을 알 수 있습니다. 보내고 받고 싶은 데이터와 비즈니스 로직에만 신경 쓰면 됩니다. 마찬가지로 어느 쪽 끝이던 미나는
실행 프로세스는 다음과 같습니다.
(1.) IoService: 이 인터페이스는 스레드에서 소켓 설정을 담당하며 자체 Selector를 가지며 여부를 모니터링합니다. 연결이 설정되었습니다.
(2.) IoProcessor: 이 인터페이스는 다른 스레드에 있으며 채널에서 읽고 쓸 데이터가 있는지 확인하는 역할을 담당합니다. 이는 JAVA NIO 코딩을 사용할 때와 다른 점입니다. 일반적으로 JAVA NIO 코딩에서는 Selector를 사용합니다. 즉, IoService와 IoProcessor라는 두 가지 기능 인터페이스를 구분하지 않습니다. 또한 IoProcessor는 IoService에 등록된 필터를 호출하고 필터 체인 이후에 IoHandler를 호출하는 역할을 담당합니다.
(3.) IoFilter: 이 인터페이스는 인터셉터 세트를 정의합니다. 이러한 인터셉터는 로그 출력, 블랙리스트 필터링, 데이터 인코딩(쓰기 방향) 및 디코딩(읽기 방향)과 같은 기능을 포함할 수 있습니다. 그 중 데이터의 인코딩과 디코딩이 가장 중요하고 미나를 사용할 때 가장 주의해야 할 점입니다.
(4.) IoHandler: 이 인터페이스는 데이터가 수신되고 전송되는 비즈니스 로직 작성을 담당합니다.
1. 단순 TCPServer:
(1.) 1단계: IoService 작성
위에서 실행 과정을 먼저 작성해야 합니다. IoService 자체는 서버이자 클라이언트입니다. 여기서는 서버를 작성하므로 IoAcceptor는 프로토콜과 독립적이므로 TCPServer를 작성하려고 합니다. 우리는 IoAcceptor의 구현인 NioSocketAcceptor를 사용하며 실제로 하위 수준에서 java.nio.channels.ServerSocketChannel 클래스를 호출합니다. 물론 Apache의 APR 라이브러리를 사용하는 경우 AprSocketAcceptor를 TCPServer로 사용하도록 선택할 수 있습니다.
전설에 따르면 Apache APR 라이브러리의 성능은 JVM과 함께 제공되는 기본 라이브러리보다 훨씬 높습니다. 그런 다음 지정된 IoService에 의해 내부적으로 IoProcessor가 생성되고 호출되므로 신경 쓸 필요가 없습니다.
IoAcceptor acceptor=new NioSocketAcceptor(); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE,10); acceptor.bind(new InetSocketAddress(9123));
이 코드에서는 서버 측에서 TCP/IP NIO 기반 소켓을 초기화한 다음 IoSessionConfig를 호출하여 데이터를 읽기 위한 버퍼 크기와 읽기-쓰기 채널을 설정합니다. 초 유휴 상태로 들어갑니다.
(2.) 2단계: 필터 작성
여기에서는 가장 간단한 문자열 전송을 다루겠습니다. Mina는 문자열을 인코딩하고 디코딩하기 위한 TextLineCodecFactory 코덱 팩토리를 제공했습니다.
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory( <span style="white-space:pre"> </span>Charset.forName("UTF-8"), <span style="white-space:pre"> </span>LineDelimeter.WINDOWS.getValue(), <span style="white-space:pre"> </span>LineDelimiter. WINDOWS.getValue())) );
이 코드는 acceptor.bind() 메서드보다 먼저 실행되어야 합니다. 소켓을 바인딩한 후에는 이러한 준비를 완료할 수 없기 때문입니다. 여기서는 코덱이 어떻게 작동하는지 알 필요가 없습니다. 이것은 나중에 설명하겠습니다. 여기서는 우리가 전송하는 데이터가 줄바꿈 문자로 표시되어 있다는 점만 분명히 하면 됩니다. 따라서 우리는 Mina의 자체 줄바꿈 코덱 팩토리를 사용합니다.
(3.) 3단계: IoHandler 작성
这里我们只是简单的打印Client 传说过来的数据。
public class MyIoHandler extends IoHandlerAdapter { // 这里我们使用的SLF4J作为日志门面,至于为什么在后面说明。 private final static Logger log = LoggerFactory .getLogger(MyIoHandler.class); @Override public void messageReceived(IoSession session, Object message) throws Exception { String str = message.toString(); log.info("The message received is [" + str + "]"); if (str.endsWith("quit")) { session.close(true); return; } } }
然后我们把这个IoHandler 注册到IoService:
acceptor.setHandler(new MyIoHandler());
当然这段代码也要在acceptor.bind()方法之前执行。然后我们运行MyServer 中的main 方法,你可以看到控制台一直处于阻塞状态,此时,我们用telnet 127.0.0.1 9123 访问,然后输入一些内容,当按下回车键,你会发现数据在Server 端被输出,但要注意不要输入中文,因为Windows 的命令行窗口不会对传输的数据进行UTF-8 编码。当输入quit 结尾的字符串时,连接被断开。这里注意你如果使用的操作系统,或者使用的Telnet 软件的换行符是什么,如果不清楚,可以删掉第二步中的两个红色的参数,使用TextLineCodec 内部的自动识别机制。
2. 简单的TCPClient:
这里我们实现Mina 中的TCPClient,因为前面说过无论是Server 端还是Client 端,在Mina中的执行流程都是一样的。唯一不同的就是IoService 的Client 端实现是IoConnector。
(1.) 第一步:编写IoService并注册过滤器
public class MyClient { main方法: IoConnector connector=new NioSocketConnector(); connector.setConnectTimeoutMillis(30000); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue() ) ) ); connector.connect(new InetSocketAddress("localhost", 9123)); }
(2.) 第三步:编写IoHandler
public class ClientHandler extends IoHandlerAdapter { private final static Logger LOGGER = LoggerFactory .getLogger(ClientHandler.class); private final String values; public ClientHandler(String values) { this.values = values; } @Override public void sessionOpened(IoSession session) { session.write(values); } }
注册IoHandler:
connector.setHandler(new ClientHandler("你好!\r\n 大家好!"));
然后我们运行MyClient,你会发现MyServer 输出如下语句:
The message received is [你好!]
The message received is [大家好!]
我们看到服务端是按照收到两条消息输出的,因为我们用的编解码器是以换行符判断数据是否读取完毕的。
3. 介绍Mina的TCP的主要接口:
通过上面的两个示例,你应该对Mina 如何编写TCP/IP 协议栈的网络通信有了一些感性的认识。
(1.)IoService:
这个接口是服务端IoAcceptor、客户端IoConnector 的抽象,提供IO 服务和管理IoSession的功能,它有如下几个常用的方法:
A. TransportMetadata getTransportMetadata():
这个方法获取传输方式的元数据描述信息,也就是底层到底基于什么的实现,譬如:nio、apr 等。
B. void addListener(IoServiceListener listener):
这个方法可以为IoService 增加一个监听器,用于监听IoService 的创建、活动、失效、空闲、销毁,具体可以参考IoServiceListener 接口中的方法,这为你参与IoService 的生命周期提供了机会。
C. void removeListener(IoServiceListener listener):
这个方法用于移除上面的方法添加的监听器。
D. void setHandler(IoHandler handler):
这个方法用于向IoService 注册IoHandler,同时有getHandler()方法获取Handler。
E. Map
这个方法获取IoService 上管理的所有IoSession,Map 的key 是IoSession 的id。
F. IoSessionConfig getSessionConfig():
这个方法用于获取IoSession 的配置对象,通过IoSessionConfig 对象可以设置Socket 连接的一些选项。
(2.)IoAcceptor:
这个接口是TCPServer 的接口,主要增加了void bind()监听端口、void unbind()解除对套接字的监听等方法。这里与传统的JAVA 中的ServerSocket 不同的是IoAcceptor 可以多次调用bind()方法(或者在一个方法中传入多个SocketAddress 参数)同时监听多个端口。
3.)IoConnector:
这个接口是TCPClient 的接口, 主要增加了ConnectFuture connect(SocketAddressremoteAddress,SocketAddress localAddress)方法,用于与Server 端建立连接,第二个参数如果不传递则使用本地的一个随机端口访问Server 端。这个方法是异步执行的,同样的,也可以同时连接多个服务端。
(4.)IoSession:
이 인터페이스는 서버 측과 클라이언트 측 간의 연결을 나타내는 데 사용됩니다. 반환될 때 IoAcceptor.accept() 인스턴스.
이 인터페이스에는 다음과 같은 일반적으로 사용되는 메소드가 있습니다.
A. WriteFuture write(객체 메시지):
이 메소드는 데이터를 쓰는 데 사용되며 작업은 비동기식입니다.
B. CloseFuture close(즉시 부울):
이 메서드는 IoSession을 닫는 데 사용됩니다. 이 작업도 비동기식입니다. 매개변수로 true를 지정하면 즉시 닫힙니다. 그렇지 않으면 모든 쓰기 작업이 플러시된 후에 닫힙니다.
C. 객체 setAttribute(객체 키, 객체 값):
이 메소드는 HttpSession의 setAttrbute() 메소드와 유사하게 세션 중에 사용할 수 있는 일부 속성을 세션에 추가하는 데 사용됩니다. IoSession은 내부적으로 동기화된 HashMap을 사용하여 추가한 사용자 정의 속성을 저장합니다.
D.SocketAddress getRemoteAddress():
이 메소드는 원격 연결의 소켓 주소를 가져옵니다.
E. void suspendWrite():
이 메서드는 쓰기 작업을 일시 중지하는 데 사용되므로 이 메서드와 쌍을 이루는 void restartWrite() 메서드가 있습니다. read() 메서드에도 동일하게 적용됩니다.
F.ReadFuture read():
이 메소드는 데이터를 읽는 데 사용되지만, 기본적으로 사용할 수는 없습니다. 이 비동기 읽기 메소드를 사용하려면 IoSessionConfig의 setUseReadOperation(true)를 호출해야 합니다. 일반적으로 우리는 이 방법을 사용하지 않을 것입니다. 왜냐하면 이 방법의 내부 구현은 데이터를 BlockingQueue에 저장하는 것이기 때문입니다. 서버 측이라면 클라이언트 측에서 보낸 많은 양의 데이터가 서버에서 이런 방식으로 읽혀지기 때문입니다. 측면에서는 메모리 누수가 발생할 수 있지만 클라이언트에서는 때때로 더 편리할 수 있습니다.
G.IoService getService():
이 메소드는 현재 세션 객체와 연관된 IoService 인스턴스를 반환합니다.
TCP 연결 닫기 정보:
클라이언트에 있든 서버에 있든 IoSession은 기본 TCP 연결을 나타내는 데 사용됩니다. 그런 다음 IoSession 중 하나의 IoSession에서 close() 메서드를 호출하면 이를 알 수 있습니다. 서버 또는 클라이언트, TCP 연결이 닫혀 있는 것으로 표시되지만 메인 스레드는 여전히 실행 중입니다. 즉, JVM이 종료되지 않은 것입니다. 이는 IoSession의 close()가 TCP 연결 채널만 닫고 서버를 닫지 않기 때문입니다. -측 및 클라이언트측 프로그램. 서버와 클라이언트를 중지하려면 IoService의 dispose() 메서드를 호출해야 합니다.
(5.)IoSessionConfig: 이 방법은 이 세션의 구성을 지정하는 데 사용됩니다. 일반적으로 사용되는 방법은 다음과 같습니다.
A. void setReadBufferSize(int size):
B. void setIdleTime(IdleStatus status, int idleTime):
이 메소드는 채널과 관련된 읽기, 쓰기 또는 읽기 및 쓰기 이벤트가 지정된 시간 내에 발생하지 않도록 설정하며 채널은 유휴 상태로 전환됩니다. . 이 메소드가 호출되면 필터와 IoHandler의 sessionIdle() 메소드는 매 유휴 시간마다 호출됩니다.
C. void setWriteTimeout(int time):
이 메소드는 쓰기 작업에 대한 시간 제한을 설정합니다.
D.void setUseReadOperation(boolean useReadOperation):
이 메서드는 IoSession의 read() 메서드를 사용할 수 있는지 여부를 설정합니다.
(6.)IoHandler:
위의 샘플 코드에서 볼 수 있듯이 기본적으로 데이터를 읽고 전송하는 인터페이스입니다. 인터페이스는 항상 완료됩니다. 이 인스턴스는 IoService에 바인딩됩니다. 인스턴스는 하나만 있습니다(IoHandler 인스턴스가 IoService에 주입되지 않으면 예외가 발생합니다). 다음과 같은 메소드가 있습니다.
A. void sessionCreated(IoSession session):
이 메소드는 Session 객체가 생성될 때 호출됩니다. TCP 연결의 경우 연결이 수락되면 호출되지만 현재는 TCP 연결이 설정되지 않았다는 점에 유의해야 합니다. 이 메서드는 문자 그대로의 의미, 즉 연결 개체 IoSession이 생성될 때만 나타납니다. 메서드가 다시 호출됩니다. UDP의 경우 UDP는 연결이 없기 때문에 데이터 패킷을 수신할 때 이 메서드가 호출됩니다.
B. void sessionOpened(IoSession session):
이 메서드는 연결이 열릴 때 호출됩니다. 항상 sessionCreated() 메서드 이후에 호출됩니다. TCP의 경우 연결이 설정된 후에 호출되어 일부 인증 작업을 수행하고 데이터를 보내는 등의 작업을 수행할 수 있습니다. UDP의 경우 이 메서드는 sessionCreated()와 다르지 않지만 바로 뒤에 실행됩니다. 가끔 데이터를 전송하면 sessionCreated() 메서드는 처음에만 호출되지만 sessionOpened() 메서드는 매번 호출됩니다.
C. void sessionClosed(IoSession 세션):
TCP의 경우 연결이 종료될 때 이 메서드가 호출됩니다. UDP의 경우 이 메서드는 IoSession의 close() 메서드가 호출될 때만 삭제됩니다.
D.void sessionIdle(IoSession session, IdleStatus status):
IoSession 채널이 유휴 상태에 들어갈 때 이 메서드가 호출됩니다. UDP 프로토콜의 경우 이 메서드는 호출되지 않습니다.
E.voidExceptionCaught(IoSession session, Throwable cause):
이 메서드는 프로그램이나 Mina 자체에서 예외가 발생할 때 호출됩니다. 일반적으로 여기서는 IoSession이 닫힙니다.
F.void messageReceived(IoSession session, Object message):
메시지를 수신할 때 호출되는 메소드, 즉 메시지를 수신하는 데 사용되는 메소드입니다. 프로토콜 코덱을 사용하면 필요한 유형으로 캐스팅할 수 있습니다. 보통 위의 예처럼 프로토콜 코덱을 사용하는데, 프로토콜 코덱이
TextLineCodecFactory이기 때문에 메시지를 강제로 String 형식으로 변환할 수 있습니다.
G.void messageSent(IoSession session, Object message):
이 메소드는 메시지가 성공적으로 전송되었을 때 호출됩니다. 메시지가 성공적으로 전송된 후에는 이 메소드를 호출할 수 없다는 의미입니다. 메시지를 보내는 데 사용됩니다.
메시지 보내기 타이밍:
sessionOpened() 및 messageReceived() 메서드에서 IoSession.write() 메서드를 호출하여 메시지 보내기를 완료해야 합니다. sessionOpened() 메서드에서는 TCP 연결이 실제로 열려 있고, 마찬가지로 messageReceived() 메서드에서도 TCP 연결도 열려 있지만 둘의 타이밍이 다르기 때문입니다. sessionOpened() 메서드는 TCP 연결이 설정된 후 데이터가 수신되기 전에 전송됩니다. messageReceived() 메서드는 데이터가 수신된 후 전송됩니다. 수신된 콘텐츠의 모양에 따라 어떤 종류의 데이터를 보낼지 결정할 수 있습니다. . 이 인터페이스에는 메서드가 너무 많기 때문에 일반적으로 관심 있는 메서드를 처리하기 위해 어댑터 모드 IoHandlerAdapter가 사용됩니다.
(7.)IoBuffer:
이 인터페이스는 JAVA NIO의 ByteBuffer를 캡슐화한 것입니다. 이는 주로 ByteBuffer가 기본 데이터 유형에 대한 읽기 및 쓰기 작업만 제공하기 때문입니다. 그리고 문자열과 같은 객체 유형에 대한 작성 방법이 더 편리합니다. 또한 ByteBuffer는 길이가 고정되어 있으므로 매우 번거롭습니다. IoBuffer의 가변 길이 구현은 StringBuffer와 유사합니다. ByteBuffer와 마찬가지로 IoBuffer는 스레드로부터 안전하지 않습니다. 이 섹션의 내용 중 일부가 명확하지 않은 경우 java.nio.ByteBuffer 인터페이스를 참조할 수 있습니다. 이 인터페이스에는 일반적으로 사용되는 메서드가 있습니다.
A. static IoBuffer 할당(int 용량, boolean useDirectBuffer):
이 메서드는 SimpleBufferAllocator를 통해 내부적으로 인스턴스를 생성하며, 두 번째 매개변수는 지정합니다. 직접 버퍼 또는 JAVA 메모리 힙 캐시를 사용하는 경우 기본값은 false입니다.
B.void free():
일부 IoBufferAllocator 구현에서 재사용할 수 있도록 버퍼를 해제합니다. 일반적으로 성능을 향상하려는 경우가 아니면 이 메서드를 호출할 필요가 없습니다. 분명한).
C.IoBuffer setAutoExpand(boolean autoExpand):
이 메소드는 위에서 언급한 가변 길이인 용량을 자동으로 확장하도록 IoBuffer를 설정합니다. 기본적으로 가변 길이 기능이 활성화되어 있지 않음을 알 수 있습니다.
D.IoBuffer setAutoShrink(boolean autoShrink):
이 메소드는 IoBuffer가 자동으로 축소되도록 설정하여 Compact() 메소드가 호출된 후 일부 사용되지 않는 공간을 잘라낼 수 있습니다. 이 메서드가 호출되지 않거나 false로 설정된 경우에는 Shrink() 메서드를 호출하여 수동으로 공간을 축소할 수도 있습니다.
E.IoBuffer order(ByteOrder bo):
이 메서드는 Big Endian인지 Little Endian인지를 설정하며, JAVA에서는 기본값이 Big Endian이고, C++ 및 기타 언어는 일반적으로 Little Endian입니다.
F.IoBuffer asReadOnlyBuffer():
이 메소드는 IoBuffer를 읽기 전용으로 설정합니다.
G.Boolean prefixedDataAvailable(int prefixLength, int maxDataLength):
이 메서드는 데이터의 처음 1, 2, 4바이트가 데이터의 길이를 나타낼 때 사용되며,
prefixLentgh는 이 데이터의 처음 몇 바이트(1, 2, 4 중 하나만 가능)는 이 데이터의 길이를 나타내며
maxDataLength는 읽을 최대 바이트 수를 나타냅니다. 반환 결과는 총 데이터(길이를 나타내는 바이트)인
remaining()-prefixLength>=maxDataLength 방정식에 따라 달라집니다. 남은 바이트 수는 읽으려는 바이트 수보다 크거나 같습니다. .
H. String getPrefixedString(int prefixLength,CharsetDecoder decoder):
위 메서드가 true를 반환하면 이 메서드는 길이를 나타내는 바이트 이후의 데이터 읽기를 시작합니다. 이 두 메서드의 prefixLength를 유지하세요. . 값은 동일합니다.
후술할 PrefixedStringDecoder의 내부 구현에는 G와 H 두 가지 메소드가 사용됩니다.
IoBuffer의 나머지 메소드는 ByteBuffer와 유사하며 다음과 같은 추가 편리한 작업 메소드가 있습니다.
IoBuffer putString(문자열 값, CharsetEncoder 인코더)은 지정된 인코딩 메소드에 문자열과 InputStream을 편리하게 저장할 수 있습니다. ) 메서드는 IoBuffer에서 읽지 않은 나머지 데이터를 입력 스트림 등으로 변환합니다.
(8.)IoFuture:
Mina의 많은 작업에서 반환 값이 XXXFuture라는 것을 알 수 있습니다. IoFuture의 하위 클래스입니다. 이러한 반환 값을 보면 이 메서드가 비동기적으로 실행된다는 의미입니다. 주요 하위 클래스는 ConnectFuture, CloseFuture 및 WriteFuture입니다. 이 인터페이스의 대부분의 작업은
java.util.concurrent.Future 인터페이스(예: wait(), waitUninterruptible() 등)와 유사합니다. 일반적으로 우리는 다음 작업의 결과를 기다리기 위해 waitUninterruptible() 메서드를 사용하는 경우가 많습니다. 반환할 비동기 실행입니다. 이 인터페이스에는 일반적으로 사용되는 메소드가 있습니다.
A.IoFuture addListener(IoFutureListener>listener):
이 메소드는 비동기 실행 결과가 반환될 때 콜백 메소드인 OperationComplete를 추가하는 데 사용됩니다. 즉, waitUninterruptible() 메서드 대신 비동기 실행 결과를 기다리는 또 다른 메서드입니다. 차단을 유발하지 않는다는 장점이 있습니다.
B.IoFuture RemoveListener(IoFutureListener> 리스너):
이 메소드는 지정된 리스너를 제거하는 데 사용됩니다.
C.IoSession getSession():
이 메소드는 현재 IoSession을 반환합니다. 예를 들어 서버에 접근하기 위해 클라이언트에서 connect() 메소드를 호출할 때 이는 실제로는 비동기 실행 메소드입니다. 연결 여부
接成功。那么如果我想在连接成功之后执行一些事情(譬如:获取连接成功后的IoSession对象),该怎么办呢?按照上面的说明,你有如下两种办法:
第一种:
ConnectFuture future = connector.connect(new InetSocketAddress( HOSTNAME, PORT)); // 等待是否连接成功,相当于是转异步执行为同步执行。 future.awaitUninterruptibly(); // 连接成功后获取会话对象。如果没有上面的等待,由于connect()方法是异步的,session 可能会无法获取。 session = future.getSession();
第二种:
ConnectFuture future = connector.connect(new InetSocketAddress( HOSTNAME, PORT)); future.addListener(new IoFutureListener<ConnectFuture>() { @Override public void operationComplete(ConnectFuture future) { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } IoSession session = future.getSession(); System.out.println("++++++++++++++++++++++++++++"); } }); System.out.println("*************");
为了更好的看清楚使用监听器是异步的,而不是像awaitUninterruptibly()那样会阻塞主线程的执行,我们在回调方法中暂停5 秒钟,然后输出+++,在最后输出***。我们执行代码之后,你会发现首先输出***(这证明了监听器是异步执行的),然后IoSession 对象Created,系统暂停5 秒,然后输出+++,最后IoSession 对象Opened,也就是TCP 连接建立。
4.日志配置:
前面的示例代码中提到了使用SLF4J 作为日志门面,这是因为Mina 内部使用的就是SLF4J,你也使用SLF4J 可以与之保持一致性。Mina 如果想启用日志跟踪Mina 的运行细节,你可以配置LoggingFilter 过滤器,这样你可
以看到Session 建立、打开、空闲等一系列细节在日志中输出,默认SJF4J 是按照DEBUG级别输出跟踪信息的,如果你想给某一类别的Mina 运行信息输出指定日志输出级别,可以调用LoggingFilter 的setXXXLogLevel(LogLevel.XXX)。
例:
LoggingFilter lf = new LoggingFilter(); lf.setSessionOpenedLogLevel(LogLevel.ERROR); acceptor.getFilterChain().addLast("logger", lf);
这里IoSession 被打开的跟踪信息将以ERROR 级别输出到日志。
5.过滤器:
前面我们看到了LoggingFilter、ProtocolCodecFilter 两个过滤器,一个负责日志输出,一个负责数据的编解码,通过最前面的Mina 执行流程图,在IoProcessor 与IoHandler 之间可以有很多的过滤器,这种设计方式为你提供可插拔似的扩展功能提供了非常便利的方式,目前的Apache CXF、Apache Struts2 中的拦截器也都是一样的设计思路。Mina 中的IoFilter 是单例的,这与CXF、Apache Struts2 没什么区别。IoService 实例上会绑定一个DefaultIoFilterChainBuilder
实例,DefaultIoFilterChainBuilder 会把使用内部的EntryImpl 类把所有的过滤器按照顺序连在一起,组成一个过滤器链。
DefaultIoFilterChainBuilder 类如下常用的方法:
A. void addFirst(String name,IoFilter filter):
这个方法把过滤器添加到过滤器链的头部,头部就是IoProcessor 之后的第一个过滤器。同样的addLast()方法把过滤器添加到过滤器链的尾部。
B. void addBefore(String baseName,String name,IoFilter filter):
这个方法将过滤器添加到baseName 指定的过滤器的前面,同样的addAfter()方法把过滤器添加到baseName 指定的过滤器的后面。这里要注意无论是那种添加方法,每个过滤器的名字(参数name)必须是唯一的。
C. IoFilter remove(Stirng name):
这个方法移除指定名称的过滤器,你也可以调用另一个重载的remove()方法,指定要移除的IoFilter 的类型。
D. List
这个方法返回当前IoService 上注册的所有过滤器。默认情况下,过滤器链中是空的,也就是getAll()方法返回长度为0 的List,但实际Mina内部有两个隐藏的过滤器:HeadFilter、TailFilter,分别在List 的最开始和最末端,很明显,TailFilter 在最末端是为了调用过滤器链之后,调用IoHandler。但这两个过滤器对你来说是透明的,可以忽略它们的存在。编写一个过滤器很简单,你需要实现IoFilter 接口,如果你只关注某几个方法,可以继承IoFilterAdapter 适配器类。IoFilter
接口中主要包含两类方法,一类是与IoHandler 中的方法名一致的方法,相当于拦截IoHandler 中的方法,另一类是IoFilter 的生命周期回调方法,这些回调方法的执行顺序和解释如下所示:
(1.)init()在首次添加到链中的时候被调用,但你必须将这个IoFilter 用
ReferenceCountingFilter 包装起来,否则init()方法永远不会被调用。
(2.)onPreAdd()在调用添加到链中的方法时被调用,但此时还未真正的加入到链。
(3.)onPostAdd()在调用添加到链中的方法后被调,如果在这个方法中有异常抛出,则过滤器会立即被移除,同时destroy()方法也会被调用(前提是使用ReferenceCountingFilter包装)。
(4.)onPreRemove()在从链中移除之前调用。
(5.)onPostRemove()在从链中移除之后调用。
(6.)destory()在从链中移除时被调用,使用方法与init()要求相同。
无论是哪个方法,要注意必须在实现时调用参数nextFilter 的同名方法,否则,过滤器链的执行将被中断,IoHandler 中的同名方法一样也不会被执行,这就相当于Servlet 中的Filter 必须调用filterChain.doFilter(request,response)才能继续前进是一样的道理。
示例:
public class MyIoFilter implements IoFilter { @Override public void destroy() throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%�stroy"); } @Override public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%exceptionCaught"); nextFilter.exceptionCaught(session, cause); } @Override public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterClose"); nextFilter.filterClose(session); } @Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterWrite"); nextFilter.filterWrite(session, writeRequest); } @Override public void init() throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%init"); } @Override public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived"); nextFilter.messageReceived(session, message); } @Override public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageSent"); nextFilter.messageSent(session, writeRequest); } @Override public void onPostAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostAdd"); } @Override public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostRemove"); } @Override public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreAdd"); } @Override public void onPreRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreRemove"); } @Override public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionClosed"); nextFilter.sessionClosed(session); } @Override public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionCreated"); nextFilter.sessionCreated(session); } @Override public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionIdle"); nextFilter.sessionIdle(session, status); } @Override public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionOpened"); nextFilter.sessionOpened(session); } }
我们将这个拦截器注册到上面的TCPServer 的IoAcceptor 的过滤器链中的最后一个:
acceptor.getFilterChain().addLast("myIoFilter", new ReferenceCountingFilter(new MyIoFilter()));
这里我们将MyIoFilter 用ReferenceCountingFilter 包装起来,这样你可以看到init()、destroy()方法调用。我们启动客户端访问,然后关闭客户端,你会看到执行顺序如下所示:
init onPreAdd onPostAdd sessionCreated sessionOpened messageReceived filterClose sessionClosed onPreRemove onPostRemove destroy。
IoHandler 的对应方法会跟在上面的对应方法之后执行,这也就是说从横向(单独的看一个过滤器中的所有方法的执行顺序)上看,每个过滤器的执行顺序是上面所示的顺序;从纵向(方法链的调用)上看,如果有filter1、filter2 两个过滤器,sessionCreated()方法的执行顺序如下所示:
filter1-sessionCreated filter2-sessionCreated IoHandler-sessionCreated。
这里你要注意init、onPreAdd、onPostAdd 三个方法并不是在Server 启动时调用的,而是IoSession 对象创建之前调用的,也就是说IoFilterChain.addXXX()方法仅仅负责初始化过滤器并注册过滤器,但并不调用任何方法,包括init()初始化方法也是在IoProcessor 开始工作的时候被调用。IoFilter 是单例的,那么init()方法是否只被执行一次呢?这个是不一定的,因为IoFilter是被IoProcessor 调用的,而每个IoService 通常是关联多个IoProcessor,所以IoFilter的init()方法是在每个IoProcessor
线程上只执行一次。关于Mina 的线程问题,我们后面会详细讨论,这里你只需要清楚,init()与destroy()的调用次数与IoProceesor 的个数有关,假如一个IoService 关联了3 个IoProcessor,有五个并发的客户端请求,那么你会看到三次init()方法被调用,以后将不再会调用。Mina中自带的过滤器:
过滤器 说明
BlacklistFilter 设置一些IP 地址为黑名单,不允许访问。
BufferedWriteFilter 设置输出时像BufferedOutputStream 一样进行缓冲。
CompressionFilter 设置在输入、输出流时启用JZlib 压缩。
ConnectionThrottleFilter 这个过滤器指定同一个IP 地址(不含端口号)上的请求在多长的毫秒值内可以有一个请求,如果小于指定的时间间隔就有连续两个请求,那么第二个请求将被忽略(IoSession.close())。正如Throttle 的名字一样,调节访问的频率这个过滤器最好放在过滤器链的前面。
FileRegionWriteFilter 如果你想使用File 对象进行输出,请使用这个过滤器。要注意,你需要使用WriteFuture 或者在
messageSent() 方法中关闭File 所关联的FileChannel 通道。
StreamWriteFilter 如果你想使用InputStream 对象进行输出,请使用这个过滤器。要注意,你需要使用WriteFuture或者在messageSent()方法中关闭File 所关联的
FileChannel 通道。NoopFilter 这个过滤器什么也不做,如果你想测试过滤器链是否起作用,可以用它来测试。
ProfilerTimerFilter 这个过滤器用于检测每个事件方法执行的时间,所以最好放在过滤器链的前面。
ProxyFilter 这个过滤器在客户端使用ProxyConnector 作为实现时,会自动加入到过滤器链中,用于完成代理功能。
RequestResponseFilter 暂不知晓。
SessionAttributeInitializingFilter 이 필터는 일반적으로 필터 앞에 배치되는 IoSession의 일부 속성(Map)을 배치하여 일부 초기화 정보를 배치합니다.
MdcInjectionFilter는 로그 출력에 대해 MDC 작업을 수행합니다. LOG4J의 MDC 및 NDC 설명서를 참조할 수 있습니다.
WriteRequestFilter 쓰기 요청에 대한 필터를 래핑하는 데 사용되는 CompressionFilter 및 RequestResponseFilter의 기본 클래스입니다.
이전 LoggingFilger 로그 필터와 같이 각 섹션에서 자세히 설명하지만 여기에 나열되지 않은 일부 필터도 있습니다.
6. 프로토콜 코덱:
앞서 말씀드린 것처럼 미나오브젝트를 사용할 때 가장 주의해야 할 부분이 프로토콜 코덱인데, 왜냐하면 네트워크를 통해 전송되는 데이터는 바이너리 데이터(바이트)이고, 프로그램에서 마주하는 것은 JAVA 객체이기 때문에 데이터를 보낼 때 JAVA 객체를 바이너리 데이터로 인코딩하고, 전송할 때 바이너리 데이터를 바이너리 데이터로 인코딩해야 하기 때문입니다. 데이터를 JAVA 객체로 디코딩합니다(이것은 JAVA 객체의 직렬화 및 역직렬화만큼 간단하지 않습니다). Mina의 프로토콜 코덱은 필터 ProtocolCodecFilter를 통해 구성됩니다. 이 필터의 구성 방법에는 앞에서 TextLineCodecFactory를 등록하는 ProtocolCodecFactory가 필요합니다.
코드를 볼 수 있습니다.
ProtocolCodecFactory에는 다음 두 가지 메소드가 있습니다.
공용 인터페이스 ProtocolCodecFactory {
ProtocolEncoder getEncoder(IoSession session)에서 예외가 발생합니다.
ProtocolDecoder getDecoder(IoSession session)에서 예외가 발생합니다.
}
그러므로 , ProtocolCodecFactory를 구축하려면 두 개의 ProtocolEncoder 및 ProtocolDecoder 인스턴스가 필요합니다. JAVA 객체와 바이너리 데이터 간 변환 방법을 문의하고 싶으신가요? 이는 특정 통신 프로토콜에 따라 다릅니다. 즉, 서버와 클라이언트는 네트워크를 통해 전송되는 데이터 형식에 동의해야 합니다. 예를 들어 첫 번째 바이트는 데이터 길이를 나타내고 두 번째 바이트는 데이터 유형을 나타냅니다. 다음은 실제 데이터(텍스트일 수 있고 그림 등일 수 있음)이며 첫 번째 바이트에서 지정한 길이의 데이터를 읽을 때까지 길이에 따라 세 번째 바이트부터 뒤로 읽을 수 있습니다.
간단히 말하면 HTTP 프로토콜은 브라우저와 웹 서버 간에 합의된 통신 프로토콜입니다. 양측은 지정된 프로토콜에 따라 데이터를 인코딩하고 디코딩합니다. 좀 더 직관적으로 말하자면, 지금까지 우리가 사용해 온 TextLine 코덱은 네트워크를 통해 전달되는 데이터를 읽을 때 ASCII 10 및 13 문자(/r, /n)가 저장되어 있는 바이트를 찾는 한, 이전 바이트는 문자열로 간주됩니다(기본적으로 UTF-8 인코딩이 사용됨). 위에서 언급한 것은 다양한 프로토콜이 실제로 7계층 네트워크 구조의 애플리케이션 계층 프로토콜이라는 것입니다. 이들은 네트워크 계층(IP) 및 전송 계층(TCP) 위에 위치하며, Mina의 프로토콜 코덱을 사용하면 자신만의 프로토콜 세트를 구현할 수 있습니다. 애플리케이션 계층 프로토콜 스택.
(6-1.) 간단한 코덱 예:
아래에서는 통신 프로토콜을 가정하여 통신 사업자의 SMS 프로토콜을 시뮬레이션하는 코덱 구현을 제공합니다. 아래:
M sip:wap.fetion.com.cn SIP-C/2.0
S: 1580101xxxx
R: 1889020xxxx
L: 21
Hello World!
这里的第一行表示状态行,一般表示协议的名字、版本号等,第二行表示短信的发送号码,第三行表示短信接收的号码,第四行表示短信的字节数,最后的内容就是短信的内容。上面的每一行的末尾使用ASC II 的10(/n)作为换行符,因为这是纯文本数据,协议要
求双方使用UTF-8 对字符串编解码。实际上如果你熟悉HTTP 协议,上面的这个精简的短信协议和HTTP 协议的组成是非常像的,第一行是状态行,中间的是消息报头,最后面的是消息正文。在解析这个短信协议之前,你需要知晓TCP 的一个事项,那就是数据的发送没有规模性,所谓的规模性就是作为数据的接收端,不知道到底什么时候数据算是读取完毕,所以应用层协议在制定的时候,必须指定数据读取的截至点。一般来说,有如下三种方式设置数据读取的长度:
(1.)使用分隔符,譬如:TextLine 编解码器。你可以使用/r、/n、NUL 这些ASC II 中的特殊的字符来告诉数据接收端,你只要遇见分隔符,就表示数据读完了,不用在那里傻等着不知道还有没有数据没读完啊?我可不可以开始把已经读取到的字节解码为指定的数据类型了啊?
(2.)定长的字节数,这种方式是使用长度固定的数据发送,一般适用于指令发送,譬如:数据发送端规定发送的数据都是双字节,AA 表示启动、BB 表示关闭等等。
(3.)在数据中的某个位置使用一个长度域,表示数据的长度,这种处理方式最为灵活,上面的短信协议中的那个L 就是短信文字的字节数,其实HTTP 协议的消息报头中的Content-Length 也是表示消息正文的长度,这样数据的接收端就知道我到底读到多长的
字节数就表示不用再读取数据了。相比较解码(字节转为JAVA 对象,也叫做拆包)来说,编码(JAVA 对象转为字节,也叫做打包)就很简单了,你只需要把JAVA 对象转为指定格式的字节流,write()就可以了。下面我们开始对上面的短信协议进行编解码处理。
第一步,协议对象:
public class SmsObject { private String sender;// 短信发送者 private String receiver;// 短信接受者 private String message;// 短信内容 public String getSender() { return sender; } public void setSender(String sender) { this.sender = sender; } public String getReceiver() { return receiver; } public void setReceiver(String receiver) { this.receiver = receiver; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
第二步,编码器:
在Mina 中编写编码器可以实现ProtocolEncoder,其中有encode()、dispose()两个方法需要实现。这里的dispose()方法用于在销毁编码器时释放关联的资源,由于这个方法一般我们并不关心,所以通常我们直接继承适配器ProtocolEncoderAdapter。
public class CmccSipcEncoder extends ProtocolEncoderAdapter { private final Charset charset; public CmccSipcEncoder(Charset charset) { this.charset = charset; } @Override public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { SmsObject sms = (SmsObject) message; CharsetEncoder ce = charset.newEncoder(); IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0"; String sender = sms.getSender(); String receiver = sms.getReceiver(); String smsContent = sms.getMessage(); buffer.putString(statusLine + '/n', ce); buffer.putString("S: " + sender + '/n', ce); buffer.putString("R: " + receiver + '/n', ce); buffer .putString("L: " + (smsContent.getBytes(charset).length) + "/n", ce); buffer.putString(smsContent, ce); buffer.flip(); out.write(buffer); } }
这里我们依据传入的字符集类型对message 对象进行编码,编码的方式就是按照短信协议拼装字符串到IoBuffer 缓冲区,然后调用ProtocolEncoderOutput 的write()方法输出字节流。这里要注意生成短信内容长度时的红色代码,我们使用String 类与Byte[]类型之间的转换方法获得转为字节流后的字节数。
解码器的编写有以下几个步骤:
A. 将 encode()方法中的message 对象强制转换为指定的对象类型;
B. 创建IoBuffer 缓冲区对象,并设置为自动扩展;
C. 将转换后的message 对象中的各个部分按照指定的应用层协议进行组装,并put()到IoBuffer 缓冲区;
D. 当你组装数据完毕之后,调用flip()方法,为输出做好准备,切记在write()方法之前,要调用IoBuffer 的flip()方法,否则缓冲区的position 的后面是没有数据可以用来输出的,你必须调用flip()方法将position 移至0,limit 移至刚才的position。这个flip()方法的含义请参看java.nio.ByteBuffer。
E. 最后调用ProtocolEncoderOutput 的write()方法输出IoBuffer 缓冲区实例。
第三步,解码器:
在Mina 中编写解码器,可以实现ProtocolDecoder 接口,其中有decode()、finishDecode()、dispose()三个方法。这里的finishDecode()方法可以用于处理在IoSession 关闭时剩余的未读取数据,一般这个方法并不会被使用到,除非协议中未定义任何标识数据什么时候截止的约定,譬如:Http 响应的Content-Length 未设定,那么在你认为读取完数据后,关闭TCP连接(IoSession 的关闭)后,就可以调用这个方法处理剩余的数据,当然你也可以忽略调剩余的数据。同样的,一般情况下,我们只需要继承适配器ProtocolDecoderAdapter,关注decode()方法即可。但前面说过解码器相对编码器来说,最麻烦的是数据发送过来的规模,以聊天室为例,一个TCP
连接建立之后,那么隔一段时间就会有聊天内容发送过来,也就是decode()方法会被往复调用,这样处理起来就会非常麻烦。那么Mina 中幸好提供了CumulativeProtocolDecoder类,从名字上可以看出累积性的协议解码器,也就是说只要有数据发送过来,这个类就会去读取数据,然后累积到内部的IoBuffer 缓冲区,但是具体的拆包(把累积到缓冲区的数据解码为JAVA 对象)交由子类的doDecode()方法完成,实际上CumulativeProtocolDecoder就是在decode()反复的调用暴漏给子类实现的doDecode()方法。
具体执行过程如下所示:
A. 你的doDecode()方法返回true 时,CumulativeProtocolDecoder 的decode()方法会首先判断你是否在doDecode()方法中从内部的IoBuffer 缓冲区读取了数据,如果没有,则会抛出非法的状态异常,也就是你的doDecode()方法返回true 就表示你已经消费了本次数据(相当于聊天室中一个完整的消息已经读取完毕),进一步说,也就是此时你必须已经消费过内部的IoBuffer 缓冲区的数据(哪怕是消费了一个字节的数据)。如果验证过通过,那么CumulativeProtocolDecoder
会检查缓冲区内是否还有数据未读取,如果有就继续调用doDecode()方法,没有就停止对doDecode()方法的调用,直到有新的数据被缓冲。
B. 当你的doDecode()方法返回false 时,CumulativeProtocolDecoder 会停止对doDecode()方法的调用,但此时如果本次数据还有未读取完的,就将含有剩余数据的IoBuffer 缓冲区保存到IoSession 中,以便下一次数据到来时可以从IoSession 中提取合并。如果发现本次数据全都读取完毕,则清空IoBuffer 缓冲区。简而言之,当你认为读取到的数据已经够解码了,那么就返回true,否则就返回false。这个CumulativeProtocolDecoder 其实最重要的工作就是帮你完成了数据的累积,因为这个工作是很烦琐的。
public class CmccSipcDecoder extends CumulativeProtocolDecoder { private final Charset charset; public CmccSipcDecoder(Charset charset) { this.charset = charset; } @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); CharsetDecoder cd = charset.newDecoder(); int matchCount = 0; String statusLine = "", sender = "", receiver = "", length = "", sms = ""; int i = 1; while (in.hasRemaining()) { byte b = in.get(); buffer.put(b); if (b == 10 && i < 5) { matchCount++; if (i == 1) { buffer.flip(); statusLine = buffer.getString(matchCount, cd); statusLine = statusLine.substring(0, statusLine.length() - 1); matchCount = 0; buffer.clear(); } if (i == 2) { buffer.flip(); sender = buffer.getString(matchCount, cd); sender = sender.substring(0, sender.length() -1); matchCount = 0; buffer.clear(); } if (i == 3) { buffer.flip(); receiver = buffer.getString(matchCount, cd); receiver = receiver.substring(0, receiver.length() 1); matchCount = 0; buffer.clear(); } if (i == 4) { buffer.flip(); length = buffer.getString(matchCount, cd); length = length.substring(0, length.length() -1); matchCount = 0; buffer.clear(); } i++; } else if (i == 5) { matchCount++; if (matchCount == Long.parseLong(length.split(": ")[1])) { buffer.flip(); sms = buffer.getString(matchCount, cd); i++; break; } } else { matchCount++; } } SmsObject smsObject = new SmsObject(); smsObject.setSender(sender.split(": ")[1]); smsObject.setReceiver(receiver.split(": ")[1]); smsObject.setMessage(sms); out.write(smsObject); return false; } }
我们的这个短信协议解码器使用/n(ASCII 的10 字符)作为分解点,一个字节一个字节的读取,那么第一次发现/n 的字节位置之前的部分,必然就是短信协议的状态行,依次类推,你就可以解析出来发送者、接受者、短信内容长度。然后我们在解析短信内容时,使用获取到的长度进行读取。全部读取完毕之后, 然后构造SmsObject 短信对象, 使用ProtocolDecoderOutput
的write()方法输出,最后返回false,也就是本次数据全部读取完毕,告知CumulativeProtocolDecoder 在本次数据读取中不需要再调用doDecode()方法了。这里需要注意的是两个状态变量i、matchCount,i 用于记录解析到了短信协议中的哪一行(/n),matchCount 记录在当前行中读取到了哪一个字节。状态变量在解码器中经常被使用,我们这里的情况比较简单,因为我们假定短信发送是在一次数据发送中完成的,所以状态变量的使用也比较简单。假如数据的发送被拆成了多次(譬如:短信协议的短信内容、消息报头被拆成了两次数据发送),那么上面的代码势必就会存在问题,因为当第二次调用doDecode()方法时,状态变量i、matchCount
势必会被重置,也就是原来的状态值并没有被保存。那么我们如何解决状态保存的问题呢?答案就是将状态变量保存在IoSession 中或者是Decoder 实例自身,但推荐使用前者,因为虽然Decoder 是单例的,其中的实例变量保存的状态在Decoder 实例销毁前始终保持,但Mina 并不保证每次调用doDecode()方法时都是同一个线程(这也就是说第一次调用doDecode()是IoProcessor-1 线程,第二次有可能就是IoProcessor-2 线程),这就会产生多线程中的实例变量的可视性(Visibility,具体请参考JAVA
的多线程知识)问题。IoSession中使用一个同步的HashMap 保存对象,所以你不需要担心多线程带来的问题。使用IoSession 保存解码器的状态变量通常的写法如下所示:
A. 在解码器中定义私有的内部类Context,然后将需要保存的状态变量定义在Context 中存储。
B. 在解码器中定义方法获取这个Context 的实例,这个方法的实现要优先从IoSession 中获取Context。
具体代码示例如下所示:
// 上下文作为保存状态的内部类的名字,意思很明显,就是让状态跟随上下文,在整个调用过程中都可以被保持。
public class XXXDecoder extends CumulativeProtocolDecoder{ private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context" ); public Context getContext(IoSession session){ Context ctx=(Context)session.getAttribute(CONTEXT); if(ctx==null){ ctx=new Context(); session.setAttribute(CONTEXT,ctx); } } private class Context { //状态变量 } }
注意这里我们使用了Mina 自带的AttributeKey 类来定义保存在IoSession 中的对象的键值,这样可以有效的防止键值重复。另外,要注意在全部处理完毕之后,状态要复位,譬如:聊天室中的一条消息读取完毕之后,状态变量要变为初始值,以便下次处理时重新使用。
第四步,编解码工厂:
public class CmccSipcCodecFactory implements ProtocolCodecFactory { private final CmccSipcEncoder encoder; private final CmccSipcDecoder decoder; public CmccSipcCodecFactory() { this(Charset.defaultCharset()); } public CmccSipcCodecFactory(Charset charSet) { this.encoder = new CmccSipcEncoder(charSet); this.decoder = new CmccSipcDecoder(charSet); } @Override public ProtocolDecoder getDecoder(IoSession session) throws Exception { return decoder; } @Override public ProtocolEncoder getEncoder(IoSession session) throws Exception { return encoder; } }
实际上这个工厂类就是包装了编码器、解码器,通过接口中的getEncoder()、getDecoder()方法向ProtocolCodecFilter 过滤器返回编解码器实例,以便在过滤器中对数据进行编解码处理。
第五步,运行示例:
下面我们修改最一开始的示例中的MyServer、MyClient 的代码,如下所示:
acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new CmccSipcCodecFactory(Charset .forName("UTF-8")))); connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new CmccSipcCodecFactory( Charset.forName("UTF-8")))); 然后我们在ClientHandler 中发送一条短信: public void sessionOpened(IoSession session) { SmsObject sms = new SmsObject(); sms.setSender("15801012253"); sms.setReceiver("18869693235"); sms.setMessage("你好!Hello World!"); session.write(sms); }
最后我们在MyIoHandler 中接收这条短信息:
public void messageReceived(IoSession session, Object message) throws Exception { SmsObject sms = (SmsObject) message; log.info("The message received is [" + sms.getMessage() + "]"); }
你会看到Server 端的控制台输出如下信息:
The message received is [你好!Hello World!]
(6-2.)复杂的解码器:
下面我们讲解一下如何在解码器中保存状态变量,也就是真正的实现上面所说的Context。
我们假设这样一种情况,有两条短信:
M sip:wap.fetion.com.cn SIP-C/2.0
S: 1580101xxxx
R: 1889020xxxx
L: 21
Hello World!
M sip:wap.fetion.com.cn SIP-C/2.0
S: 1580101xxxx
R: 1889020xxxx
L: 21
Hello World!
他们按照上面的颜色标识发送,也就是说红色部分、蓝色部分、绿色部分分别发送(调用三次IoSession.write()方法),那么如果你还用上面的CmccSipcDecoder,将无法工作,因为第一次数据流(红色部分)发送过取时,数据是不完整的,无法解析出一条短信息,当二次数据流(蓝色部分)发送过去时,已经可以解析出第一条短信息了,但是第二条短信还是不完整的,需要等待第三次数据流(绿色部分)的发送。注意:由于模拟数据发送的规模性问题很麻烦,所以这里采用了这种极端的例子说明问题,虽不具有典型性,但很能说明问题,这就足够了,所以不要追究这种发送消息是否在真实环境中存在,更不要追究其合理性。
CmccSispcDecoder 类改为如下的写法:
public class CmccSipcDecoder extends CumulativeProtocolDecoder { private final Charset charset; private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context"); public CmccSipcDecoder(Charset charset) { this.charset = charset; } @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { Context ctx = getContext(session); CharsetDecoder cd = charset.newDecoder(); int matchCount = ctx.getMatchCount(); int line = ctx.getLine(); IoBuffer buffer = ctx.innerBuffer; String statusLine = ctx.getStatusLine(), sender = ctx.getSender(), receiver = ctx.getReceiver(), length = ctx.getLength(), sms = ctx.getSms(); while (in.hasRemaining()) { byte b = in.get(); matchCount++; buffer.put(b); if (line < 4 && b == 10) { if (line == 0) { buffer.flip(); statusLine = buffer.getString(matchCount, cd); statusLine = statusLine.substring(0, statusLine.length() - 1); matchCount = 0; buffer.clear(); ctx.setStatusLine(statusLine); } if (line == 1) { buffer.flip(); sender = buffer.getString(matchCount, cd); sender = sender.substring(0, sender.length() - 1); matchCount = 0; buffer.clear(); ctx.setSender(sender); } if (line == 2) { buffer.flip(); receiver = buffer.getString(matchCount, cd); receiver = receiver.substring(0, receiver.length() - 1); matchCount = 0; buffer.clear(); ctx.setReceiver(receiver); } if (line == 3) { buffer.flip(); length = buffer.getString(matchCount, cd); length = length.substring(0, length.length() - 1); matchCount = 0; buffer.clear(); ctx.setLength(length); } line++; } else if (line == 4) { if (matchCount == Long.parseLong(length.split(": ")[1])) { buffer.flip(); sms = buffer.getString(matchCount, cd); ctx.setSms(sms); // 由于下面的break,这里需要调用else外面的两行代码 ctx.setMatchCount(matchCount); ctx.setLine(line); break; } } ctx.setMatchCount(matchCount); ctx.setLine(line); } if (ctx.getLine() == 4 && Long.parseLong(ctx.getLength().split(": ")[1]) == ctx .getMatchCount()) { SmsObject smsObject = new SmsObject(); smsObject.setSender(sender.split(": ")[1]); smsObject.setReceiver(receiver.split(": ")[1]); smsObject.setMessage(sms); out.write(smsObject); ctx.reset(); return true; } else { return false; } } private Context getContext(IoSession session) { Context context = (Context) session.getAttribute(CONTEXT); if (context == null){ context = new Context(); session.setAttribute(CONTEXT, context); } return context; } private class Context { private final IoBuffer innerBuffer; private String statusLine = ""; private String sender = ""; private String receiver = ""; private String length = ""; private String sms = ""; public Context() { innerBuffer = IoBuffer.allocate(100).setAutoExpand(true); } private int matchCount = 0; private int line = 0; public int getMatchCount() { return matchCount; } public void setMatchCount(int matchCount) { this.matchCount = matchCount; } public int getLine() { return line; } public void setLine(int line) { this.line = line; } public String getStatusLine() { return statusLine; } public void setStatusLine(String statusLine) { this.statusLine = statusLine; } public String getSender() { return sender; } public void setSender(String sender) { this.sender = sender; } public String getReceiver() { return receiver; } public void setReceiver(String receiver) { this.receiver = receiver; } public String getLength() { return length; } public void setLength(String length) { this.length = length; } public String getSms() { return sms; } public void setSms(String sms) { this.sms = sms; } public void reset() { this.innerBuffer.clear(); this.matchCount = 0; this.line = 0; this.statusLine = ""; this.sender = ""; this.receiver = ""; this.length = ""; this.sms = ""; } } }
这里我们做了如下的几步操作:
(1.) 所有记录状态的变量移到了Context 内部类中,包括记录读到短信协议的哪一行的line。每一行读取了多少个字节的matchCount,还有记录解析好的状态行、发送者、接受者、短信内容、累积数据的innerBuffer 等。这样就可以在数据不能完全解码,等待下一次doDecode()方法的调用时,还能承接上一次调用的数据。
(2.) 在 doDecode()方法中主要的变化是各种状态变量首先是从Context 中获取,然后操作之后,将最新的值setXXX()到Context 中保存。
(3.) 这里注意doDecode()方法最后的判断,当认为不够解码为一条短信息时,返回false,也就是在本次数据流解码中不要再调用doDecode()方法;当认为已经解码出一条短信息时,输出短消息,然后重置所有的状态变量,返回true,也就是如果本次数据流解码中还有没解码完的数据,继续调用doDecode()方法。下面我们对客户端稍加改造,来模拟上面的红、蓝、绿三次发送聊天短信息的情况:
MyClient:
ConnectFuture future = connector.connect(new InetSocketAddress( HOSTNAME, PORT)); future.awaitUninterruptibly(); session = future.getSession(); for (int i = 0; i < 3; i++) { SmsObject sms = new SmsObject(); session.write(sms); System.out.println("****************" + i); }
这里我们为了方便演示,不在IoHandler 中发送消息,而是直接在MyClient 中发送,你要注意的是三次发送都要使用同一个IoSession,否则就不是从同一个通道发送过去的了。
CmccSipcEncoder:
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { SmsObject sms = (SmsObject) message; CharsetEncoder ce = charset.newEncoder(); String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0"; String sender = "15801012253"; String receiver = "15866332698"; String smsContent = "你好!Hello World!"; IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); buffer.putString(statusLine + '/n', ce); buffer.putString("S: " + sender + '/n', ce); buffer.putString("R: " + receiver + '/n', ce); buffer.flip(); out.write(buffer); IoBuffer buffer2 = IoBuffer.allocate(100).setAutoExpand(true); buffer2.putString("L: " + (smsContent.getBytes(charset).length) + "/n",ce); buffer2.putString(smsContent, ce); buffer2.putString(statusLine + '/n', ce); buffer2.flip(); out.write(buffer2); IoBuffer buffer3 = IoBuffer.allocate(100).setAutoExpand(true); buffer3.putString("S: " + sender + '/n', ce); buffer3.putString("R: " + receiver + '/n', ce); buffer3.putString("L: " + (smsContent.getBytes(charset).length) + "/n",ce); buffer3.putString(smsContent, ce); buffer3.putString(statusLine + '/n', ce); buffer3.flip(); out.write(buffer3); }
上面的这段代码要配合MyClient来操作,你需要做的是在MyClient中的红色输出语句处设置断点,然后第一调用时CmccSipcEncoder中注释掉蓝、绿色的代码,也就是发送两条短信息的第一部分(红色的代码),依次类推,也就是MyClient的中的三次断点中,分别执行CmccSipcEncoder中的红、蓝、绿三段代码,也就是模拟两条短信的三段发送。你会看到Server端的运行结果是:当MyClient第一次到达断点时,没有短信息被读取到,当MyClient第二次到达断点时,第一条短信息输出,当MyClient第三次到达断点时,第二条短信息输出。
Mina中自带的解码器:
解码器 说明
CumulativeProtocolDecoder 累积性解码器,上面我们重点说明了这个解码器的用法。
SynchronizedProtocolDecoder 这个解码器用于将任何一个解码器包装为一个线程安全的解码器,用于解决上面说的每次执行decode()方法时可能线程不是上一次的线程的问题,但这样会在高并发时,大大降低系统的性能。
TextLineDecoder 按照文本的换行符( Windows:/r/n 、Linux:/n、Mac:/r)解码数据。
PrefixedStringDecoder 这个类继承自CumulativeProtocolDecoder类,用于读取数据最前端的1、2、4 个字节表示后面的数据长度的数据。譬如:一个段数据的前两个字节表示后面的真实数据的长度,那么你就可以用这个方法进行解码。
(6-3.)多路分离的解码器:
假设一段数据发送过来之后,需要根据某种条件决定使用哪个解码器,而不是像上面的例子,固定使用一个解码器,那么该如何做呢?幸好Mina 提供了org.apache.mina.filter.codec.demux 包来完成这种多路分离(Demultiplexes)的解码工作,也就是同时注册多个解码器,然后运行时依据传入的数据决定到底使用哪个解码器来工作。所谓多路分离就是依据条件分发到指定的解码器,譬如:上面的短信协议进行扩展,可以依据状态行来判断使用1.0 版本的短信协议解码器还是2.0版本的短信协议解码器。
下面我们使用一个简单的例子,说明这个多路分离的解码器是如何使用的,需求如下所示:
(1.) 客户端传入两个int 类型的数字,还有一个char 类型的符号。
(2.) 如果符号是+,服务端就是用1 号解码器,对两个数字相加,然后把结果返回给客户端。
(3.) 如果符号是-,服务端就使用2 号解码器,将两个数字变为相反数,然后相加,把结果返回给客户端。
Demux 开发编解码器主要有如下几个步骤:
A. 定义Client 端、Server 端发送、接收的数据对象。
B. 使用Demux 编写编码器是实现MessageEncoder
C. 使用Demux 编写编码器是实现MessageDecoder 接口,这个MessageDecoder 会在DemuxingProtocolDecoder 中调用。
D. 在 DemuxingProtocolCodecFactory 中调用addMessageEncoder()、addMessageDecoder()方法组装编解码器。
MessageEncoder的接口如下所示:
public interface MessageEncoder<T> { void encode(IoSession session, T message, ProtocolEncoderOutput out) throws Exception; }
你注意到消息编码器接口与在ProtocolEncoder 中没什么不同,区别就是Object message被泛型具体化了类型,你不需要手动的类型转换了。
MessageDecoder的接口如下所示:
public interface MessageDecoder { static MessageDecoderResult OK = MessageDecoderResult.OK; static MessageDecoderResult NEED_DATA = MessageDecoderResult.NEED_DATA; static MessageDecoderResult NOT_OK = MessageDecoderResult.NOT_OK; MessageDecoderResult decodable(IoSession session, IoBuffer in); MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception; void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception; }
(1.)decodable()方法有三个返回值,分别表示如下的含义:
A. MessageDecoderResult.NOT_OK:表示这个解码器不适合解码数据,然后检查其它解码器,如果都不满足会抛异常;
B. MessageDecoderResult.NEED_DATA:表示当前的读入的数据不够判断是否能够使用这个解码器解码,然后再次调用decodable()方法检查其它解码器,如果都是NEED_DATA,则等待下次输入;
C. MessageDecoderResult.OK: 表示这个解码器可以解码读入的数据, 然后则调用MessageDecoder 的decode()方法。这里注意decodable()方法对参数IoBuffer in 的任何操作在方法结束之后,都会复原,也就是你不必担心在调用decode()方法时,position 已经不在缓冲区的起始位置。这个方法相当于是预读取,用于判断是否是可用的解码器。
(2.)decode()方法有三个返回值,分别表示如下的含义:
A. MessageDecoderResult.NOT_OK:表示解码失败,会抛异常;
B. MessageDecoderResult.NEED_DATA:表示数据不够,需要读到新的数据后,再次调用decode()方法。
C. MessageDecoderResult.OK:表示解码成功。
代码演示:
(1.)客户端发送的数据对象:
public class SendMessage { private int i = 0; private int j = 0; private char symbol = '+'; public char getSymbol() { return symbol; } public void setSymbol(char symbol) { this.symbol = symbol; } public int getI() { return i; } public void setI(int i) { this.i = i; } public int getJ() { return j; } public void setJ(int j) { this.j = j; } }
(2.)服务端发送的返回结果对象:
public class ResultMessage { private int result = 0; public int getResult() { return result; } public void setResult(int result) { this.result = result; } }
(3.)客户端使用的SendMessage的编码器:
public class SendMessageEncoder implements MessageEncoder<SendMessage> { @Override public void encode(IoSession session, SendMessage message, ProtocolEncoderOutput out) throws Exception { IoBuffer buffer = IoBuffer.allocate(10); buffer.putChar(message.getSymbol()); buffer.putInt(message.getI()); buffer.putInt(message.getJ()); buffer.flip(); out.write(buffer); } }
这里我们的SendMessage、ResultMessage 中的字段都是用长度固定的基本数据类型,这样IoBuffer 就不需要自动扩展了,提高性能。按照一个char、两个int 计算,这里的IoBuffer只需要10 个字节的长度就可以了。
(4.)服务端使用的SendMessage的1号解码器:
public class SendMessageDecoderPositive implements MessageDecoder { @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { if (in.remaining() < 2) return MessageDecoderResult.NEED_DATA; else { char symbol = in.getChar(); if (symbol == '+') { return MessageDecoderResult.OK; } else { return MessageDecoderResult.NOT_OK; } } } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { SendMessage sm = new SendMessage(); sm.setSymbol(in.getChar()); sm.setI(in.getInt()); sm.setJ(in.getInt()); out.write(sm); return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // undo } }
因为客户端发送的SendMessage 的前两个字节(char)就是符号位,所以我们在decodable()方法中对此条件进行了判断,之后读到两个字节,并且这两个字节表示的字符是+时,才认为这个解码器可用。
(5.)服务端使用的SendMessage的2号解码器:
public class SendMessageDecoderNegative implements MessageDecoder { @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { if (in.remaining() < 2) return MessageDecoderResult.NEED_DATA; else { char symbol = in.getChar(); if (symbol == '-') { return MessageDecoderResult.OK; } else { return MessageDecoderResult.NOT_OK; } } } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { SendMessage sm = new SendMessage(); sm.setSymbol(in.getChar()); sm.setI(-in.getInt()); sm.setJ(-in.getInt()); out.write(sm); return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // undo } }
(6.)服务端使用的ResultMessage的编码器:
public class ResultMessageEncoder implements MessageEncoder<ResultMessage> { @Override public void encode(IoSession session, ResultMessage message, ProtocolEncoderOutput out) throws Exception { IoBuffer buffer = IoBuffer.allocate(4); buffer.putInt(message.getResult()); buffer.flip(); out.write(buffer); } }
(7.)客户端使用的ResultMessage的解码器:
public class ResultMessageDecoder implements MessageDecoder { @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { if (in.remaining() < 4) return MessageDecoderResult.NEED_DATA; else if (in.remaining() == 4) return MessageDecoderResult.OK; else return MessageDecoderResult.NOT_OK; } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { ResultMessage rm = new ResultMessage(); rm.setResult(in.getInt()); out.write(rm); return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // undo } }
(8.)组装这些编解码器的工厂:
public class MathProtocolCodecFactory extends DemuxingProtocolCodecFactory { public MathProtocolCodecFactory(boolean server) { if (server) { super.addMessageEncoder(ResultMessage.class, ResultMessageEncoder.class); super.addMessageDecoder(SendMessageDecoderPositive.class); super.addMessageDecoder(SendMessageDecoderNegative.class); } else { super .addMessageEncoder(SendMessage.class, SendMessageEncoder.class); super.addMessageDecoder(ResultMessageDecoder.class); } } }
这个工厂类我们使用了构造方法的一个布尔类型的参数,以便其可以在Server 端、Client端同时使用。我们以Server 端为例,你可以看到调用两次addMessageDecoder()方法添加了1 号、2 号解码器,其实DemuxingProtocolDecoder 内部在维护了一个MessageDecoder数组,用于保存添加的所有的消息解码器,每次decode()的时候就调用每个MessageDecoder的decodable()方法逐个检查,只要发现一个MessageDecoder
不是对应的解码器,就从数组中移除,直到找到合适的MessageDecoder,如果最后发现数组为空,就表示没找到对应的MessageDecoder,最后抛出异常。
(9.)Server端:
public class Server { public static void main(String[] args) throws Exception { IoAcceptor acceptor = new NioSocketAcceptor(); LoggingFilter lf = new LoggingFilter(); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 5); acceptor.getFilterChain().addLast("logger", lf); acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MathProtocolCodecFactory(true))); acceptor.setHandler(new ServerHandler()); acceptor.bind(new InetSocketAddress(9123)); } }
(10.)Server端使用的IoHandler:
public class ServerHandler extends IoHandlerAdapter { private final static Logger log = LoggerFactory .getLogger(ServerHandler.class); @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { session.close(true); } @Override public void messageReceived(IoSession session, Object message) throws Exception { SendMessage sm = (SendMessage) message; log.info("The message received is [ " + sm.getI() + " " + sm.getSymbol() + " " + sm.getJ() + " ]"); ResultMessage rm = new ResultMessage(); rm.setResult(sm.getI() + sm.getJ()); session.write(rm); } }
(11.)Client端:
public class Client { public static void main(String[] args) throws Throwable { IoConnector connector = new NioSocketConnector(); connector.setConnectTimeoutMillis(30000); connector.getFilterChain().addLast("logger", new LoggingFilter()); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MathProtocolCodecFactory(false))); connector.setHandler(new ClientHandler()); connector.connect(new InetSocketAddress("localhost", 9123)); } }
(12.)Client端的IoHandler:
public class ClientHandler extends IoHandlerAdapter { private final static Logger LOGGER = LoggerFactory .getLogger(ClientHandler.class); @Override public void sessionOpened(IoSession session) throws Exception { SendMessage sm = new SendMessage(); sm.setI(100); sm.setJ(99); sm.setSymbol('+'); session.write(sm); } @Override public void messageReceived(IoSession session, Object message) { ResultMessage rs = (ResultMessage) message; LOGGER.info(String.valueOf(rs.getResult())); } }
你尝试改变(12.)中的红色代码中的正负号,会看到服务端使用了两个不同的解码器对其进行处理。
7.线程模型配置:
Mina 中的很多执行环节都使用了多线程机制,用于提高性能。Mina 中默认在三个地方使用了线程:
(1.) IoAcceptor:
这个地方用于接受客户端的连接建立,每监听一个端口(每调用一次bind()方法),都启用一个线程,这个数字我们不能改变。这个线程监听某个端口是否有请求到来,一旦发现,则创建一个IoSession 对象。因为这个动作很快,所以有一个线程就够了。
(2.) IoConnector:
这个地方用于与服务端建立连接,每连接一个服务端(每调用一次connect()方法),就启用一个线程,我们不能改变。同样的,这个线程监听是否有连接被建立,一旦发现,则创建一个IoSession 对象。因为这个动作很快,所以有一个线程就够了。
(3.) IoProcessor:
这个地方用于执行真正的IO 操作,默认启用的线程个数是CPU 的核数+1,譬如:单CPU 双核的电脑,默认的IoProcessor 线程会创建3 个。这也就是说一个IoAcceptor 或者IoConnector 默认会关联一个IoProcessor 池,这个池中有3 个IoProcessor。因为IO 操作耗费资源,所以这里使用IoProcessor 池来完成数据的读写操作,有助于提高性能。这也就是前面说的IoAccetor、IoConnector 使用一个Selector,而IoProcessor 使用自己单独的Selector
的原因。那么为什么IoProcessor 池中的IoProcessor 数量只比CPU 的核数大1 呢?因为IO 读写操作是耗费CPU 的操作,而每一核CPU 同时只能运行一个线程,因此IoProcessor 池中的IoProcessor 的数量并不是越多越好。
이 IoProcessor의 수는 다음과 같이 조정될 수 있습니다.
IoAcceptor acceptor=new NioSocketAcceptor(5);
IoConnector 커넥터=new NioSocketConnector(5);
이렇게 하면 IoProcessor 풀 수가 줄어듭니다. 5는 동시에 5개의 읽기 및 쓰기 작업을 처리할 수 있음을 의미합니다. Mina의 디코더는 상태 변수를 저장하기 위해 Decoder 자체 대신 IoSession을 사용한다고 이전에 말한 것을 기억하십니까? 이는 Mina가 동일한 IoProcessor가 매번 doDecode() 메서드를 실행한다고 보장하지 않기 때문입니까? 실제로 이 문제의 근본 원인은 IoProcessor가 풀이기 때문입니다. IoSession이 유휴 상태(데이터 읽기가 발생하지 않음)에 들어갈 때마다 IoProcessor는 다른 IoSession에서 사용하기 위해 풀로 재활용되므로 IoSession이 사용 중 상태에 들어갈 때 유휴 상태 상태에서 다시 IoProcessor
IoProcessor 인스턴스가 다시 할당되지만 현재로서는 마지막 사용 상태에서 여전히 IoProcessor가 될 것이라는 보장은 없습니다. 또한 IoAcceptor와 IoConnector에도 생성자가 있음을 알 수 있습니다. java.util.concurrent.Executor 클래스를 스레드 풀 개체로 지정할 수 있습니다. 그러면 이 스레드 풀 개체는 무엇에 사용됩니까? 실제로 TCP 연결이 설정되었는지 모니터링하는 데 사용되는 스레드를 (1.)과 (2.)에서 생성하는 데 사용됩니다. 기본적으로 Executor 인스턴스를 생성하는 데는 Executors.newCachedThreadPool() 메서드가 사용됩니다. 이는 무제한 스레드 풀입니다. (자세한 내용은 JAVA를 참조하세요.
동시성 라이브러리). 이 Executor 인스턴스를 변경하려고 하지 말고 내장된 인스턴스를 사용하세요. 그렇지 않으면 특정 액세스 수준에서 성능이 갑자기 떨어지는 등 설명할 수 없는 문제가 발생할 수 있습니다. 제한되지 않은 스레드 풀은 생성된 소켓만큼 많은 스레드를 할당하기 때문에 실행기에서 스레드 풀을 생성하는 다른 방법으로 변경하고 제한적인 스레드 풀을 생성하면 일부 요청이 제때에 응답하지 않아 질문이 생길 수 있습니다.
Mina의 워크플로에 대한 전체 개요를 살펴보겠습니다.
(1.) IoService 인스턴스가 생성되면 IoService와 연결된 IoProcessor 풀 및 스레드 풀도 생성됩니다.
(2.) IoService는 소켓을 설정하고(IoAcceptor의 바인딩() 또는 IoConnector의 connect() 메소드가 호출됨) IoService는 스레드 풀에서 스레드를 꺼내 소켓 포트를 수신합니다.
(3.) IoService가 연결 요청을 모니터링할 때 소켓에서는 IoSession 객체를 생성하고 IoProcessor 풀에서 IoProcessor 인스턴스를 꺼내고 이 세션 채널에서 필터와 IoHandler를 실행합니다.
(4.) 이 IoSession 채널이 들어갈 때 IoProcessor는 유휴 상태이거나 닫힐 때 재활용됩니다. 위 내용은 Mina의 기본 스레드 작업 모드이므로 여기서는 IoProcessor의 멀티 스레드 작업 모드를 구성하는 방법에 대해 설명하겠습니다. IoProcessor는 세션의 모든 필터 및 IoHandler 실행, 즉 IO 읽기 및 쓰기 작업을 담당하므로 단일 스레드 작업 방법입니다(즉, 하나씩 순서대로 실행). 이벤트 메서드(예: sessionIdle(), sessionOpened() 등)를 별도의 스레드(즉, IoProcessor가 아닌 스레드)에서 실행하려면 여기에서 ExecutorFilter 필터를 사용해야 합니다. IoProcessor 생성자의 매개변수 중 하나가 java.util.concurrent.Executor라는 것을 알 수 있습니다. 이는 IoProcessor가 호출하는 필터와 IoHandler의 일부 이벤트 메소드가 스레드 풀에 할당된 스레드에서 독립적으로 실행되고 IoProcessor에서는 실행되지 않도록 허용합니다.
스레드가 위치한 스레드입니다.
例:
acceptor.getFilterChain().addLast("exceutor", new ExecutorFilter());
我们看到是用这个功能,简单的一行代码就可以了。那么ExecutorFilter 还有许多重载的构造方法,这些重载的有参构造方法,参数主要用于指定如下信息:
(1.) 指定线程池的属性信息,譬如:核心大小、最大大小、等待队列的性质等。你特别要关注的是ExecutorFilter 内部默认使用的是OrderedThreadPoolExecutor 作为线程池的实现,从名字上可以看出是保证各个事件在多线程执行中的顺序(譬如:各个事件方
法的执行是排他的,也就是不可能出现两个事件方法被同时执行;messageReceived()总是在sessionClosed() 方法之前执行), 这是因为多线程的执行是异步的, 如果没有OrderedThreadPoolExecutor 来保证IoHandler 中的方法的调用顺序,可能会出现严重的问题。但是如果你的代码确实没有依赖于IoHandler 中的事件方法的执行顺序,那么你可以使用UnorderedThreadPoolExecutor 作为线程池的实现。因此,你也最好不要改变默认的Executor
实现,否则,事件的执行顺序就会混乱,譬如:messageReceived()、messageSent()方法被同时执行。
(2.) 哪些事件方法被关注,也就哪些事件方法用这个线程池执行。线程池可以异步执行的事件类型是位于IoEventType 中的九个枚举值中除了SESSION_CREATED 之外的其余八个,这说明Session 建立的事件只能与IoProcessor 在同一个线程上执行。
public enum IoEventType { SESSION_CREATED, SESSION_OPENED, SESSION_CLOSED, MESSAGE_RECEIVED, MESSAGE_SENT, SESSION_IDLE, EXCEPTION_CAUGHT, WRITE, CLOSE, }
默认情况下,没有配置关注的事件类型,有如下六个事件方法会被自动使用线程池异步执行:
IoEventType.EXCEPTION_CAUGHT,
IoEventType.MESSAGE_RECEIVED,
IoEventType.MESSAGE_SENT,
IoEventType.SESSION_CLOSED,
IoEventType.SESSION_IDLE,
IoEventType.SESSION_OPENED
其实ExecutorFilter 的工作机制很简单,就是在调用下一个过滤器的事件方法时,把其交给Executor 的execute(Runnable runnable)方法来执行,其实你自己在IoHandler 或者某个过滤器的事件方法中开启一个线程,也可以完成同样的功能,只不过这样做,你就失去了程序的可配置性,线程调用的代码也会完全耦合在代码中。但要注意的是绝对不能开启线程让其执行sessionCreated()方法。如果你真的打算使用这个ExecutorFilter,那么最好想清楚它该放在过滤器链的哪个位置,针对哪些事件做异步处理机制。一般ExecutorFilter
都是要放在ProtocolCodecFilter 过滤器的后面,也就是不要让编解码运行在独立的线程上,而是要运行在IoProcessor 所在的线程,因为编解码处理的数据都是由IoProcessor 读取和发送的,没必要开启新的线程,否则性能反而会下降。一般使用ExecutorFilter 的典型场景是将业务逻辑(譬如:耗时的数据库操作)放在单独的线程中运行,也就是说与IO 处理无关的操作可以考虑使用ExecutorFilter 来异步执行。
위 내용은 Java의 mina에 대한 자세한 소개의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!