Socket, also known as socket, is a protocol, convention or specification for network communication between different processes.
For socket programming, it is more often like a layer of encapsulation or abstraction based on protocols such as TCP/UDP. It is an interface provided by a system for programming related to network communication.
We take the basic API provided by the Linux operating system as an example to understand the basic process of establishing a socket communication:
You can see that essentially, socket is a simplification and abstraction of the tcp connection (of course, it may also be other connections such as udp) protocols at the programming level.
First, we start with the socket basic code that only sends and receives messages once:
Server:
package com.marklux.socket.base; import java.io.IOException; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; /** * The very basic socket server that only listen one single message. */ public class BaseSocketServer { private ServerSocket server; private Socket socket; private int port; private InputStream inputStream; private static final int MAX_BUFFER_SIZE = 1024; public int getPort() { return port; } public void setPort(int port) { this.port = port; } public BaseSocketServer(int port) { this.port = port; } public void runServerSingle() throws IOException { this.server = new ServerSocket(this.port); System.out.println("base socket server started."); // the code will block here till the request come. this.socket = server.accept(); this.inputStream = this.socket.getInputStream(); byte[] readBytes = new byte[MAX_BUFFER_SIZE]; int msgLen; StringBuilder stringBuilder = new StringBuilder(); while ((msgLen = inputStream.read(readBytes)) != -1) { stringBuilder.append(new String(readBytes,0,msgLen,"UTF-8")); } System.out.println("get message from client: " + stringBuilder); inputStream.close(); socket.close(); server.close(); } public static void main(String[] args) { BaseSocketServer bs = new BaseSocketServer(9799); try { bs.runServerSingle(); }catch (IOException e) { e.printStackTrace(); } } }
Client:
package com.marklux.socket.base; import java.io.IOException; import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.net.Socket; /** * The very basic socket client that only send one single message. */ public class BaseSocketClient { private String serverHost; private int serverPort; private Socket socket; private OutputStream outputStream; public BaseSocketClient(String host, int port) { this.serverHost = host; this.serverPort = port; } public void connetServer() throws IOException { this.socket = new Socket(this.serverHost, this.serverPort); this.outputStream = socket.getOutputStream(); // why the output stream? } public void sendSingle(String message) throws IOException { try { this.outputStream.write(message.getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { System.out.println(e.getMessage()); } this.outputStream.close(); this.socket.close(); } public static void main(String[] args) { BaseSocketClient bc = new BaseSocketClient("127.0.0.1",9799); try { bc.connetServer(); bc.sendSingle("Hi from mark."); }catch (IOException e) { e.printStackTrace(); } } }
Run the server first, then run the client, and you can see the effect.
Note the IO operation implementation here. We use a byte array of size MAX_BUFFER_SIZE
as the buffer, and then take the bytes from the input stream and place them in the buffer. area, and then take the bytes from the buffer and construct them into a string. This is very useful when the input stream file is large. In fact, the NIO to be discussed later is also implemented based on this idea.
The above example only implements one-way communication, which is obviously a waste of channels. The socket connection supports full-duplex two-way communication (the bottom layer is tcp). In the following example, after receiving the client's message, the server will return a receipt to the client.
And we use some java.io packaged methods to simplify the entire communication process (because the message length is not large, the buffer is no longer used).
Server:
public void runServer() throws IOException { this.serverSocket = new ServerSocket(port); this.socket = serverSocket.accept(); this.inputStream = socket.getInputStream(); String message = new String(inputStream.readAllBytes(), "UTF-8"); System.out.println("received message: " + message); this.socket.shutdownInput(); // 告诉客户端接收已经完毕,之后只能发送 // write the receipt. this.outputStream = this.socket.getOutputStream(); String receipt = "We received your message: " + message; outputStream.write(receipt.getBytes("UTF-8")); this.outputStream.close(); this.socket.close(); }
Client:
public void sendMessage(String message) throws IOException { this.socket = new Socket(host,port); this.outputStream = socket.getOutputStream(); this.outputStream.write(message.getBytes("UTF-8")); this.socket.shutdownOutput(); // 告诉服务器,所有的发送动作已经结束,之后只能接收 this.inputStream = socket.getInputStream(); String receipt = new String(inputStream.readAllBytes(), "UTF-8"); System.out.println("got receipt: " + receipt); this.inputStream.close(); this.socket.close(); }
Note that here we call respectively after the server receives the message and the client sends the message. shutdownInput()
and shutdownOutput()
are replaced instead of directly closing the corresponding stream. This is because closing any stream will directly cause the socket to be closed, and subsequent receipts cannot be processed. of sent.
But note that after calling shutdownInput()
and shutdownOutput()
, the corresponding stream will also be closed and cannot be sent to the socket again. /Written.
In the two examples just now, each time the stream is opened, only one write/read can be performed After the fetch operation is completed, the corresponding stream is closed and cannot be written/read again.
In this case, if two messages need to be sent, two sockets must be established, which will consume resources and cause inconvenience. In fact, we don't need to close the corresponding stream at all, as long as we write the message in batches.
But in this case, we must face another problem: How to judge the end of a message sending?
The simplest way is to use some special symbols to mark the completion of a transmission. The server can complete a read as long as it reads the corresponding symbol, and then perform related processing operations.
In the following example, we use the newline character \n
to mark the end of a transmission. Each time the server receives a message, it prints it once, and uses Scanner to simplify the operation:
Server:
public void runServer() throws IOException { this.server = new ServerSocket(this.port); System.out.println("base socket server started."); this.socket = server.accept(); // the code will block here till the request come. this.inputStream = this.socket.getInputStream(); Scanner sc = new Scanner(this.inputStream); while (sc.hasNextLine()) { System.out.println("get info from client: " + sc.nextLine()); } // 循环接收并输出消息内容 this.inputStream.close(); socket.close(); }
Client:
public void connetServer() throws IOException { this.socket = new Socket(this.serverHost, this.serverPort); this.outputStream = socket.getOutputStream(); } public void send(String message) throws IOException { String sendMsg = message + "\n"; // we mark \n as a end of line. try { this.outputStream.write(sendMsg.getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { System.out.println(e.getMessage()); } // this.outputStream.close(); // this.socket.shutdownOutput(); } public static void main(String[] args) { CycleSocketClient cc = new CycleSocketClient("127.0.0.1", 9799); try { cc.connetServer(); Scanner sc = new Scanner(System.in); while (sc.hasNext()) { String line = sc.nextLine(); cc.send(line); } }catch (IOException e) { e.printStackTrace(); } }
The effect after running is that every time the client enters a line of text and presses Enter, the server will print out the corresponding Message read record.
Going back to the original point, the reason why it is difficult for us to locate when the message ends is because we cannot determine the length of each message.
In fact, you can send the length of the message first. When the server knows the length of the message, it can complete the reception of the message.
In general, sending a message becomes two steps
The length of the message sent
Send the message
The final problem is that the amount of bytes sent in the "length of message sent" step must be fixed, otherwise we will still reach a deadlock.
Generally speaking, we can use a fixed number of bytes to save the length of the message. For example, the first 2 bytes are the length of the message. However, the maximum length of the message we can transmit is also fixed. Okay, taking 2 bytes as an example, the maximum length of the message we send does not exceed 2^16 bytes, which is 64K.
If you understand the encoding of some characters, you will know that we can actually use variable length space to store the length of the message, for example:
The first byte starts with 0: That is 0XXXXXXX, which means the length is one byte, the maximum is 128, which means 128B
The first byte of the first byte is 110, then the following byte is attached to indicate the length: 110XXXXX 10XXXXXX, the maximum is 2048, which means 2K
The first byte of the first byte is 1110, then the following two bytes are included to indicate the length: 110XXXXX 10XXXXXX 10XXXXXX, the maximum is 131072, which means 128K
and so on
当然这样实现起来会麻烦一些,因此下面的例子里我们仍然使用固定的两个字节来记录消息的长度。
服务端:
public void runServer() throws IOException { this.serverSocket = new ServerSocket(this.port); this.socket = serverSocket.accept(); this.inputStream = socket.getInputStream(); byte[] bytes; while (true) { // 先读第一个字节 int first = inputStream.read(); if (first == -1) { // 如果是-1,说明输入流已经被关闭了,也就不需要继续监听了 this.socket.close(); break; } // 读取第二个字节 int second = inputStream.read(); int length = (first << 8) + second; // 用位运算将两个字节拼起来成为真正的长度 bytes = new byte[length]; // 构建指定长度的字节大小来储存消息即可 inputStream.read(bytes); System.out.println("receive message: " + new String(bytes,"UTF-8")); } }
客户端:
public void connetServer() throws IOException { this.socket = new Socket(host,port); this.outputStream = socket.getOutputStream(); } public void sendMessage(String message) throws IOException { // 首先要把message转换成bytes以便处理 byte[] bytes = message.getBytes("UTF-8"); // 接下来传输两个字节的长度,依然使用移位实现 int length = bytes.length; this.outputStream.write(length >> 8); // write默认一次只传输一个字节 this.outputStream.write(length); // 传输完长度后,再正式传送消息 this.outputStream.write(bytes); } public static void main(String[] args) { LengthSocketClient lc = new LengthSocketClient("127.0.0.1",9799); try { lc.connetServer(); Scanner sc = new Scanner(System.in); while (sc.hasNextLine()) { lc.sendMessage(sc.nextLine()); } } catch (IOException e) { e.printStackTrace(); } }
在考虑服务端处理多连接之前,我们先考虑使用多线程改造一下原有的一对一对话实例。
在原有的例子中,消息的接收方并不能主动地向对方发送消息,换句话说我们并没有实现真正的互相对话,这主要是因为消息的发送和接收这两个动作并不能同时进行,因此我们需要使用两个线程,其中一个用于监听键盘输入并将其写入socket,另一个则负责监听socket并将接受到的消息显示。
出于简单考虑,我们直接让主线程负责键盘监听和消息发送,同时另外开启一个线程用于拉取消息并显示。
消息拉取线程 ListenThread.java
public class ListenThread implements Runnable { private Socket socket; private InputStream inputStream; public ListenThread(Socket socket) { this.socket = socket; } @Override public void run() throws RuntimeException{ try { this.inputStream = socket.getInputStream(); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e.getMessage()); } while (true) { try { int first = this.inputStream.read(); if (first == -1) { // 输入流已经被关闭,无需继续读取 throw new RuntimeException("disconnected."); } int second = this.inputStream.read(); int msgLength = (first<<8) + second; byte[] readBuffer = new byte[msgLength]; this.inputStream.read(readBuffer); System.out.println("message from [" + socket.getInetAddress() + "]: " + new String(readBuffer,"UTF-8")); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e.getMessage()); } } } }
主线程,启动时由用户选择是作为server还是client:
public class ChatSocket { private String host; private int port; private Socket socket; private ServerSocket serverSocket; private OutputStream outputStream; // 以服务端形式启动,创建会话 public void runAsServer(int port) throws IOException { this.serverSocket = new ServerSocket(port); System.out.println("[log] server started at port " + port); // 等待客户端的加入 this.socket = serverSocket.accept(); System.out.println("[log] successful connected with " + socket.getInetAddress()); // 启动监听线程 Thread listenThread = new Thread(new ListenThread(this.socket)); listenThread.start(); waitAndSend(); } // 以客户端形式启动,加入会话 public void runAsClient(String host, int port) throws IOException { this.socket = new Socket(host, port); System.out.println("[log] successful connected to server " + socket.getInetAddress()); Thread listenThread = new Thread(new ListenThread(this.socket)); listenThread.start(); waitAndSend(); } public void waitAndSend() throws IOException { this.outputStream = this.socket.getOutputStream(); Scanner sc = new Scanner(System.in); while (sc.hasNextLine()) { this.sendMessage(sc.nextLine()); } } public void sendMessage(String message) throws IOException { byte[] msgBytes = message.getBytes("UTF-8"); int length = msgBytes.length; outputStream.write(length>>8); outputStream.write(length); outputStream.write(msgBytes); } public static void main(String[] args) { Scanner scanner = new Scanner(System.in); ChatSocket chatSocket = new ChatSocket(); System.out.println("select connect type: 1 for server and 2 for client"); int type = Integer.parseInt(scanner.nextLine().toString()); if (type == 1) { System.out.print("input server port: "); int port = scanner.nextInt(); try { chatSocket.runAsServer(port); } catch (IOException e) { e.printStackTrace(); } }else if (type == 2) { System.out.print("input server host: "); String host = scanner.nextLine(); System.out.print("input server port: "); int port = scanner.nextInt(); try { chatSocket.runAsClient(host, port); } catch (IOException e) { e.printStackTrace(); } } } }
作为服务端,如果一次只跟一个客户端建立socket连接,未免显得太过浪费资源,因此我们完全可以让服务端和多个客户端建立多个socket。
如果要处理多个连接,就必须解决并发问题,否则可以编写循环轮流处理。我们可以使用多线程来处理并发,不过线程的创建和销毁都会消耗大量的资源和时间,所以最好一步到位,用一个线程池来实现。
下面给出一个示范性质的服务端代码:
public class SocketServer { public static void main(String args[]) throws Exception { // 监听指定的端口 int port = 55533; ServerSocket server = new ServerSocket(port); // server将一直等待连接的到来 System.out.println("server将一直等待连接的到来"); //如果使用多线程,那就需要线程池,防止并发过高时创建过多线程耗尽资源 ExecutorService threadPool = Executors.newFixedThreadPool(100); while (true) { Socket socket = server.accept(); Runnable runnable=()->{ try { // 建立好连接后,从socket中获取输入流,并建立缓冲区进行读取 InputStream inputStream = socket.getInputStream(); byte[] bytes = new byte[1024]; int len; StringBuilder sb = new StringBuilder(); while ((len = inputStream.read(bytes)) != -1) { // 注意指定编码格式,发送方和接收方一定要统一,建议使用UTF-8 sb.append(new String(bytes, 0, len, "UTF-8")); } System.out.println("get message from client: " + sb); inputStream.close(); socket.close(); } catch (Exception e) { e.printStackTrace(); } }; threadPool.submit(runnable); } } }
我想你不难发现一个问题,那就是当socket连接成功建立后,如果中途发生异常导致其中一方断开连接,此时另一方是无法发现的,只有在再次尝试发送/接收消息才会因为抛出异常而退出。
简单的说,就是我们维持的socket连接,是一个长连接,但我们没有保证它的时效性,上一秒它可能还是可以用的,但是下一秒就不一定了。
最常见的确保连接随时可用的方法是通过定时发送心跳包来检测连接的正常性。对于需要实现高实时性的服务,比如消息推送,这仍然是非常关键的。
大体的方案如下:
双方约定好心跳包的格式,要能够区别于普通的消息。
客户端每隔一定时间,就向服务端发送一个心跳包
服务端每接收到心跳包时,将其抛弃
如果客户端的某个心跳包发送失败,就可以判断连接已经断开
如果对实时性要求很高,服务端也可以定时检查客户端发送心跳包的频率,如果超过一定时间没有发送可以认为连接已经断开
使用心跳包必然会增加带宽和性能的负担,对于普通的应用我们其实并没有必要使用这种方案,如果消息发送时抛出了连接异常,直接尝试重新连接就好了。
跟上面的方案对比,其实这个抛出异常的消息就充当了心跳包的角色。
总的来说,连接是否要保活,如何保活,需要根据具体的业务场景灵活地思考和定制。
The above is the detailed content of Methods to implement Socket programming based on Java. For more information, please follow other related articles on the PHP Chinese website!