Inhaltsverzeichnis
Dies ist ein Beispiel für normale Synchronisierung mit BSD-Sockets.
Dann rufen Sie bind() auf, um einem Socket eine Adresse zuzuweisen. Wenn ein Socket mit socket() erstellt wird, wird ihm nur das verwendete Protokoll und keine Adresse zugewiesen. Bevor Sie Verbindungen von anderen Hosts akzeptieren, müssen Sie zunächst bind() aufrufen, um dem Socket eine Adresse zuzuweisen.
grpc, thread-loader 中都有详细的应用。
  • close_cb 函数被调用在 stream 数据结束时或者出错时。
  • connection_cb 函数如本例子 tcp 流, 当 accept 接收到新连接时被调用。本例子中即为 on_new_connection
  • connect_req 结构主要用于 tcp 客户端相关连接回调等数据的挂载使用。
  • shutdown_req 结构主要用于流 destroy 时回调等数据的挂载使用。
  • accepted_fd 当 accept 接收到新连接时, 存储 accept(SocketFD, NULL, NULL) 返回的 ConnectFD。
  • queued_fds 用于保存等待处理的连接, 其主要用于 node cluster 集群 的实现。
  • %%PRE_BLOCK_2%%

    2、其中专门为 loop->emfile_fd 通过 uv__open_cloexec 方法创建一个指向空文件(/dev/null)的 idlefd 文件描述符, 追踪发现原来是解决 accept (EMFILE错误), 下面我们讲 uv__accept 的时候再细说这个 loop->emfile_fd 的妙用。

    accept处理连接时,若出现 EMFILE 错误不进行处理,则内核间隔性尝试连接,导致整个网络设计程序崩溃

    3、调用 uv__io_init 初始化的该 stream 的 i/o 观察者的回调函数为 uv__stream_io

    %%PRE_BLOCK_3%%

    uv__open_cloexec" >
    • 如 read_cb 函数, 在本例子中 on_new_connection > uv_read_start 函数就会真实的设置该 read_cb 为用户传入的参数 echo_read, 其被调用时机是该 stream 上设置的 io_watcher.fd 有数据写入时, 在事件循环阶段被 epoll 捕获后。
    • alloc_cb 函数的调用过程同 read_cb, alloc 类型函数一般是设置当前需要读取的内容长度, 在流数据传输时通常首先会写入本次传输数据的长度, 然后是具体的内容, 主要是为了接收方能够合理的申请内存进行存储。如 grpc, thread-loader 中都有详细的应用。
    • close_cb 函数被调用在 stream 数据结束时或者出错时。
    • connection_cb 函数如本例子 tcp 流, 当 accept 接收到新连接时被调用。本例子中即为 on_new_connection
    • connect_req 结构主要用于 tcp 客户端相关连接回调等数据的挂载使用。
    • shutdown_req 结构主要用于流 destroy 时回调等数据的挂载使用。
    • accepted_fd 当 accept 接收到新连接时, 存储 accept(SocketFD, NULL, NULL) 返回的 ConnectFD。
    • queued_fds 用于保存等待处理的连接, 其主要用于 node cluster 集群 的实现。
    %%PRE_BLOCK_2%%

    2、其中专门为 loop->emfile_fd 通过 uv__open_cloexec 方法创建一个指向空文件(/dev/null)的 idlefd 文件描述符, 追踪发现原来是解决 accept (EMFILE错误), 下面我们讲 uv__accept 的时候再细说这个 loop->emfile_fd 的妙用。

    accept处理连接时,若出现 EMFILE 错误不进行处理,则内核间隔性尝试连接,导致整个网络设计程序崩溃

    3、调用 uv__io_init 初始化的该 stream 的 i/o 观察者的回调函数为 uv__stream_io

    %%PRE_BLOCK_3%%

    uv__open_cloexec

    uv__stream_io
    uv_ip4_addr
    uv__tcp_bind
    new_socket
    uv__stream_open
    uv_listen
    uv_tcp_listen
    uv__server_io
    uv__emfile_trick
    on_new_connection
    uv_accept
    uv_read_start
    小结
    Heim Web-Frontend js-Tutorial Lassen Sie uns über Netzwerk und Streaming in Node.js sprechen

    Lassen Sie uns über Netzwerk und Streaming in Node.js sprechen

    Jul 22, 2021 am 10:35 AM
    node.js

    In diesem Artikel können Sie über das Netzwerk und den Ablauf in Node.js sprechen. Zu den beteiligten Wissenspunkten gehören die Implementierung des Netzwerks in ibuv, BSD-Sockets, die Verwendung des UNIX-Domänenprotokolls usw. Schauen wir uns das gemeinsam an!

    Lassen Sie uns über Netzwerk und Streaming in Node.js sprechen

    【Empfohlenes Lernen: „nodejs-Tutorial“】

    Quelle dieses Beispiels: http://docs.libuv.org/en/v1.x/guide/networking.html

    Beteiligtes Wissen Punkte: Implementierung des Netzwerks in libuv. Übergabe von „Dateideskriptoren“ zwischen Prozessen

    • Beispiel tcp-echo-server/main.c
    • libuv Asynchrone Nutzung von BSD-Sockets
    • Beispiel
    • Netzwerk in libuv und direkte Nutzung von BSD-Sockets Die Schnittstelle ist nein anders, einige Dinge sind einfacher und alle sind nicht blockierend, aber das Konzept ist dasselbe. Darüber hinaus bietet libuv auch einige nützliche Funktionen, um lästige, sich wiederholende Aufgaben auf niedriger Ebene zu abstrahieren, z. B. das Einrichten von Sockets mithilfe von BSD-Socket-Strukturen, DNS-Abfragen und das Anpassen verschiedener Socket-Parameter.
    • int main() {
          loop = uv_default_loop();
      
          uv_tcp_t server;
          uv_tcp_init(loop, &server);
      
          uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);
      
          uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
          int r = uv_listen((uv_stream_t*) &server, DEFAULT_BACKLOG, on_new_connection);
          if (r) {
              fprintf(stderr, "Listen error %s\n", uv_strerror(r));
              return 1;
          }
          return uv_run(loop, UV_RUN_DEFAULT);
      }
      
      void on_new_connection(uv_stream_t *server, int status) {
          if (status < 0) {
              fprintf(stderr, "New connection error %s\n", uv_strerror(status));
              // error!
              return;
          }
      
          uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
          uv_tcp_init(loop, client);
          if (uv_accept(server, (uv_stream_t*) client) == 0) {
              uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
      }
      Beispiel für Synchronisierung

    Dies ist ein Beispiel für normale Synchronisierung mit BSD-Sockets.

    Als Referenz finden Sie die folgenden Hauptschritte:

    Rufen Sie zunächst socket() auf, um einen Endpunkt für die Kommunikation zu erstellen und einen Dateideskriptor für den Socket zurückzugeben.

    Dann rufen Sie bind() auf, um einem Socket eine Adresse zuzuweisen. Wenn ein Socket mit socket() erstellt wird, wird ihm nur das verwendete Protokoll und keine Adresse zugewiesen. Bevor Sie Verbindungen von anderen Hosts akzeptieren, müssen Sie zunächst bind() aufrufen, um dem Socket eine Adresse zuzuweisen.

    Nachdem der Socket an eine Adresse gebunden ist, beginnt der Aufruf der Funktion listen() mit der Überwachung auf mögliche Verbindungsanfragen.

    Zuletzt rufen Sie „accept“ auf, das die Anwendung durch Ereignisse (wie den Unix-Systemaufruf „select()“) benachrichtigt, wenn sie auf Verbindungen von anderen Hosts wartet, die auf Datenströme zugreifen. Die Verbindung muss mit der Funktion Accept() initialisiert werden. Accept() erstellt für jede Verbindung einen neuen Socket und entfernt die Verbindung aus der Überwachungswarteschlange.

    • int main(void)
        {
          struct sockaddr_in stSockAddr;
          int SocketFD = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
        
          if(-1 == SocketFD)
          {
            perror("can not create socket");
            exit(EXIT_FAILURE);
          }
        
          memset(&stSockAddr, 0, sizeof(struct sockaddr_in));
        
          stSockAddr.sin_family = AF_INET;
          stSockAddr.sin_port = htons(1100);
          stSockAddr.sin_addr.s_addr = INADDR_ANY;
        
          if(-1 == bind(SocketFD,(const struct sockaddr *)&stSockAddr, sizeof(struct sockaddr_in)))
          {
            perror("error bind failed");
            close(SocketFD);
            exit(EXIT_FAILURE);
          }
        
          if(-1 == listen(SocketFD, 10))
          {
            perror("error listen failed");
            close(SocketFD);
            exit(EXIT_FAILURE);
          }
        
          for(;;)
          {
            int ConnectFD = accept(SocketFD, NULL, NULL);
        
            if(0 > ConnectFD)
            {
              perror("error accept failed");
              close(SocketFD);
              exit(EXIT_FAILURE);
            }
        
           /* perform read write operations ... */
        
            shutdown(ConnectFD, SHUT_RDWR);
        
            close(ConnectFD);
          }
      
          close(SocketFD);
          return 0;
        }

      uv_tcp_init
    • main > uv_tcp_init

    • 1. Die Domäne muss einer der folgenden drei Typen sein: AF_INET steht für das IPv4-Netzwerkprotokoll
    • AF_UNSPEC bedeutet anwendbare An-Adresse geeignet für die Angabe des Hostnamens und des Dienstnamens und geeignet für jede Protokollfamilie

    • 2. Rufen Sie uv__stream_init auf, um die Stream-Daten zu initialisieren Initialisierung des Streams Es gibt immer noch viele Orte, an denen Funktionen verwendet werden, und sie sind auch sehr wichtig. Die vollständige Implementierungsreferenz des folgenden I/O-Thread-Pools

      [Anmerkungen zur libuv-Quellcode-Studie] und I/O

    1 initialisiert die Rückruffunktion, die vom Stream aufgerufen wird

    • 如 read_cb 函数, 在本例子中 on_new_connection > uv_read_start 函数就会真实的设置该 read_cb 为用户传入的参数 echo_read, 其被调用时机是该 stream 上设置的 io_watcher.fd 有数据写入时, 在事件循环阶段被 epoll 捕获后。
    • alloc_cb 函数的调用过程同 read_cb, alloc 类型函数一般是设置当前需要读取的内容长度, 在流数据传输时通常首先会写入本次传输数据的长度, 然后是具体的内容, 主要是为了接收方能够合理的申请内存进行存储。如 grpc, thread-loader 中都有详细的应用。
    • close_cb 函数被调用在 stream 数据结束时或者出错时。
    • connection_cb 函数如本例子 tcp 流, 当 accept 接收到新连接时被调用。本例子中即为 on_new_connection
    • connect_req 结构主要用于 tcp 客户端相关连接回调等数据的挂载使用。
    • shutdown_req 结构主要用于流 destroy 时回调等数据的挂载使用。
    • accepted_fd 当 accept 接收到新连接时, 存储 accept(SocketFD, NULL, NULL) 返回的 ConnectFD。
    • queued_fds 用于保存等待处理的连接, 其主要用于 node cluster 集群 的实现。
    // queued_fds
    
    1. 当收到其他进程通过 ipc 写入的数据时, 调用 uv__stream_recv_cmsg 函数
    2. uv__stream_recv_cmsg 函数读取到进程传递过来的 fd 引用, 调用 uv__stream_queue_fd 函数保存。
    3. queued_fds 被消费主要在 src/stream_wrap.cc LibuvStreamWrap::OnUvRead > AcceptHandle 函数中。

    2、其中专门为 loop->emfile_fd 通过 uv__open_cloexec 方法创建一个指向空文件(/dev/null)的 idlefd 文件描述符, 追踪发现原来是解决 accept (EMFILE错误), 下面我们讲 uv__accept 的时候再细说这个 loop->emfile_fd 的妙用。

    accept处理连接时,若出现 EMFILE 错误不进行处理,则内核间隔性尝试连接,导致整个网络设计程序崩溃

    3、调用 uv__io_init 初始化的该 stream 的 i/o 观察者的回调函数为 uv__stream_io

    void uv__stream_init(uv_loop_t* loop,
                         uv_stream_t* stream,
                         uv_handle_type type) {
      int err;
    
      uv__handle_init(loop, (uv_handle_t*)stream, type);
      stream->read_cb = NULL;
      stream->alloc_cb = NULL;
      stream->close_cb = NULL;
      stream->connection_cb = NULL;
      stream->connect_req = NULL;
      stream->shutdown_req = NULL;
      stream->accepted_fd = -1;
      stream->queued_fds = NULL;
      stream->delayed_error = 0;
      QUEUE_INIT(&stream->write_queue);
      QUEUE_INIT(&stream->write_completed_queue);
      stream->write_queue_size = 0;
    
      if (loop->emfile_fd == -1) {
        err = uv__open_cloexec("/dev/null", O_RDONLY);
        if (err < 0)
            /* In the rare case that "/dev/null" isn&#39;t mounted open "/"
             * instead.
             */
            err = uv__open_cloexec("/", O_RDONLY);
        if (err >= 0)
          loop->emfile_fd = err;
      }
    
    #if defined(__APPLE__)
      stream->select = NULL;
    #endif /* defined(__APPLE_) */
    
      uv__io_init(&stream->io_watcher, uv__stream_io, -1);
    }

    uv__open_cloexec

    main > uv_tcp_init > uv__stream_init > uv__open_cloexec

    同步调用 open 方法拿到了 fd, 也许你会问为啥不像 【libuv 源码学习笔记】线程池与i/o 中调用 uv_fs_open 异步获取 fd, 其实 libuv 中并不全部是异步的实现, 比如当前的例子启动 tcp 服务前的一些初始化, 而不是用户请求过程中发生的任务, 同步也是能接受的。

    int uv__open_cloexec(const char* path, int flags) {
    #if defined(O_CLOEXEC)
      int fd;
    
      fd = open(path, flags | O_CLOEXEC);
      if (fd == -1)
        return UV__ERR(errno);
    
      return fd;
    #else  /* O_CLOEXEC */
      int err;
      int fd;
    
      fd = open(path, flags);
      if (fd == -1)
        return UV__ERR(errno);
    
      err = uv__cloexec(fd, 1);
      if (err) {
        uv__close(fd);
        return err;
      }
    
      return fd;
    #endif  /* O_CLOEXEC */
    }

    uv__stream_io

    main > uv_tcp_init > uv__stream_init > uv__stream_io

    双工流的 i/o 观察者回调函数, 如调用的 stream->connect_req 函数, 其值是例子中 uv_listen 函数的最后一个参数 on_new_connection。

    • 当发生 POLLIN | POLLERR | POLLHUP 事件时: 该 fd 有可读数据时调用 uv__read 函数

    • 当发生 POLLOUT | POLLERR | POLLHUP 事件时: 该 fd 有可读数据时调用 uv__write 函数

    static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
      uv_stream_t* stream;
    
      stream = container_of(w, uv_stream_t, io_watcher);
    
      assert(stream->type == UV_TCP ||
             stream->type == UV_NAMED_PIPE ||
             stream->type == UV_TTY);
      assert(!(stream->flags & UV_HANDLE_CLOSING));
    
      if (stream->connect_req) {
        uv__stream_connect(stream);
        return;
      }
    
      assert(uv__stream_fd(stream) >= 0);
    
      if (events & (POLLIN | POLLERR | POLLHUP))
        uv__read(stream);
    
      if (uv__stream_fd(stream) == -1)
        return;  /* read_cb closed stream. */
    
      if ((events & POLLHUP) &&
          (stream->flags & UV_HANDLE_READING) &&
          (stream->flags & UV_HANDLE_READ_PARTIAL) &&
          !(stream->flags & UV_HANDLE_READ_EOF)) {
        uv_buf_t buf = { NULL, 0 };
        uv__stream_eof(stream, &buf);
      }
    
      if (uv__stream_fd(stream) == -1)
        return;  /* read_cb closed stream. */
    
      if (events & (POLLOUT | POLLERR | POLLHUP)) {
        uv__write(stream);
        uv__write_callbacks(stream);
    
        /* Write queue drained. */
        if (QUEUE_EMPTY(&stream->write_queue))
          uv__drain(stream);
      }
    }

    uv_ip4_addr

    main > uv_ip4_addr

    uv_ip4_addr 用于将人类可读的 IP 地址、端口对转换为 BSD 套接字 API 所需的 sockaddr_in 结构。

    int uv_ip4_addr(const char* ip, int port, struct sockaddr_in* addr) {
      memset(addr, 0, sizeof(*addr));
      addr->sin_family = AF_INET;
      addr->sin_port = htons(port);
    #ifdef SIN6_LEN
      addr->sin_len = sizeof(*addr);
    #endif
      return uv_inet_pton(AF_INET, ip, &(addr->sin_addr.s_addr));
    }

    uv_tcp_bind

    main > uv_tcp_bind

    从 uv_ip4_addr 函数的实现, 其实是在 addr 的 sin_family 上面设置值为 AF_INET, 但在 uv_tcp_bind 函数里面却是从 addr 的 sa_family属性上面取的值, 这让 c 初学者的我又陷入了一阵思考 ...

    sockaddr_in 和 sockaddr 是并列的结构,指向 sockaddr_in 的结构体的指针也可以指向 sockaddr 的结构体,并代替它。也就是说,你可以使用 sockaddr_in 建立你所需要的信息,然后用 memset 函数初始化就可以了memset((char*)&mysock,0,sizeof(mysock));//初始化

    原来是这样, 这里通过强制指针类型转换 const struct sockaddr* addr 达到的目的, 函数的最后调用了 uv__tcp_bind 函数。

    int uv_tcp_bind(uv_tcp_t* handle,
                    const struct sockaddr* addr,
                    unsigned int flags) {
      unsigned int addrlen;
    
      if (handle->type != UV_TCP)
        return UV_EINVAL;
    
      if (addr->sa_family == AF_INET)
        addrlen = sizeof(struct sockaddr_in);
      else if (addr->sa_family == AF_INET6)
        addrlen = sizeof(struct sockaddr_in6);
      else
        return UV_EINVAL;
    
      return uv__tcp_bind(handle, addr, addrlen, flags);
    }

    uv__tcp_bind

    main > uv_tcp_bind > uv__tcp_bind

    • 调用 maybe_new_socket, 如果当前未设置 socketfd, 则调用 new_socket 获取

    • 调用 setsockopt 用于为指定的套接字设定一个特定的套接字选项

    • 调用 bind 为一个套接字分配地址。当使用socket()创建套接字后,只赋予其所使用的协议,并未分配地址。

    int uv__tcp_bind(uv_tcp_t* tcp,
                     const struct sockaddr* addr,
                     unsigned int addrlen,
                     unsigned int flags) {
      int err;
      int on;
    
      /* Cannot set IPv6-only mode on non-IPv6 socket. */
      if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6)
        return UV_EINVAL;
    
      err = maybe_new_socket(tcp, addr->sa_family, 0);
      if (err)
        return err;
    
      on = 1;
      if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
        return UV__ERR(errno);
    
    ...
    
      errno = 0;
      if (bind(tcp->io_watcher.fd, addr, addrlen) && errno != EADDRINUSE) {
        if (errno == EAFNOSUPPORT)
          return UV_EINVAL;
        return UV__ERR(errno);
      }
    ...
    }

    new_socket

    main > uv_tcp_bind > uv__tcp_bind > maybe_new_socket > new_socket

    • 通过 uv__socket 其本质调用 socket 获取到 sockfd

    • 调用 uv__stream_open 设置 stream i/o 观察的 fd 为步骤1 拿到的 sockfd

    static int new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
      struct sockaddr_storage saddr;
      socklen_t slen;
      int sockfd;
      int err;
    
      err = uv__socket(domain, SOCK_STREAM, 0);
      if (err < 0)
        return err;
      sockfd = err;
    
      err = uv__stream_open((uv_stream_t*) handle, sockfd, flags);
      
      ...
    
      return 0;
    }

    uv__stream_open

    main > uv_tcp_bind > uv__tcp_bind > maybe_new_socket > new_socket > uv__stream_open

    主要用于设置 stream->io_watcher.fd 为参数传入的 fd。

    int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
    #if defined(__APPLE__)
      int enable;
    #endif
    
      if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
        return UV_EBUSY;
    
      assert(fd >= 0);
      stream->flags |= flags;
    
      if (stream->type == UV_TCP) {
        if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
          return UV__ERR(errno);
    
        /* TODO Use delay the user passed in. */
        if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
            uv__tcp_keepalive(fd, 1, 60)) {
          return UV__ERR(errno);
        }
      }
    
    #if defined(__APPLE__)
      enable = 1;
      if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
          errno != ENOTSOCK &&
          errno != EINVAL) {
        return UV__ERR(errno);
      }
    #endif
    
      stream->io_watcher.fd = fd;
    
      return 0;
    }

    uv_listen

    main > uv_listen

    主要调用了 uv_tcp_listen 函数。

    int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
      int err;
    
      err = ERROR_INVALID_PARAMETER;
      switch (stream->type) {
        case UV_TCP:
          err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
          break;
        case UV_NAMED_PIPE:
          err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
          break;
        default:
          assert(0);
      }
    
      return uv_translate_sys_error(err);
    }

    uv_tcp_listen

    main > uv_listen > uv_tcp_listen

    • 调用 listen 开始监听可能的连接请求

    • 挂载例子中传入的回调 on_new_connection

    • 暴力改写 i/o 观察者的回调, 在上面的 uv__stream_init 函数中, 通过 uv__io_init 设置了 i/o 观察者的回调为 uv__stream_io, 作为普通的双工流是适用的, 这里 tcp 流直接通过 tcp->io_watcher.cb = uv__server_io 赋值语句设置 i/o 观察者回调为 uv__server_io

    • 调用 uv__io_start 注册 i/o 观察者, 开始监听工作。

    int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
      ...
    
      if (listen(tcp->io_watcher.fd, backlog))
        return UV__ERR(errno);
    
      tcp->connection_cb = cb;
      tcp->flags |= UV_HANDLE_BOUND;
    
      /* Start listening for connections. */
      tcp->io_watcher.cb = uv__server_io;
      uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN);
    
      return 0;
    }

    uv__server_io

    main > uv_listen > uv_tcp_listen > uv__server_io

    tcp 流的 i/o 观察者回调函数

    • 调用 uv__accept, 拿到该连接的 ConnectFD

    • 此时如果出现了上面 uv__stream_init 时说的 accept (EMFILE错误), 则调用 uv__emfile_trick 函数

    • 把步骤1拿到的 ConnectFD 挂载在了 stream->accepted_fd 上面

    • 调用例子中传入的回调 on_new_connection

    void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
      ...
      
      while (uv__stream_fd(stream) != -1) {
        assert(stream->accepted_fd == -1);
    
        err = uv__accept(uv__stream_fd(stream));
        if (err < 0) {
          if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
            return;  /* Not an error. */
    
          if (err == UV_ECONNABORTED)
            continue;  /* Ignore. Nothing we can do about that. */
    
          if (err == UV_EMFILE || err == UV_ENFILE) {
            err = uv__emfile_trick(loop, uv__stream_fd(stream));
            if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
              break;
          }
    
          stream->connection_cb(stream, err);
          continue;
        }
    
        UV_DEC_BACKLOG(w)
        stream->accepted_fd = err;
        stream->connection_cb(stream, 0);
    
        ...
    }

    uv__emfile_trick

    main > uv_listen > uv_tcp_listen > uv__server_io > uv__emfile_trick

    在上面的 uv__stream_init 函数中, 我们发现 loop 的 emfile_fd 属性上通过 uv__open_cloexec 方法创建一个指向空文件(/dev/null)的 idlefd 文件描述符。

    当出现 accept (EMFILE错误)即文件描述符用尽时的错误时

    首先将 loop->emfile_fd 文件描述符, 使其能 accept 新连接, 然后我们新连接将其关闭,以使其低于EMFILE的限制。接下来,我们接受所有等待的连接并关闭它们以向客户发出信号,告诉他们我们已经超载了--我们确实超载了,但是我们仍在继续工作。

    static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
      int err;
      int emfile_fd;
    
      if (loop->emfile_fd == -1)
        return UV_EMFILE;
    
      uv__close(loop->emfile_fd);
      loop->emfile_fd = -1;
    
      do {
        err = uv__accept(accept_fd);
        if (err >= 0)
          uv__close(err);
      } while (err >= 0 || err == UV_EINTR);
    
      emfile_fd = uv__open_cloexec("/", O_RDONLY);
      if (emfile_fd >= 0)
        loop->emfile_fd = emfile_fd;
    
      return err;
    }

    on_new_connection

    当收到一个新连接, 例子中的 on_new_connection 函数被调用

    • 通过 uv_tcp_init 初始化了一个 tcp 客户端流

    • 调用 uv_accept 函数

    void on_new_connection(uv_stream_t *server, int status) {
        if (status < 0) {
            fprintf(stderr, "New connection error %s\n", uv_strerror(status));
            // error!
            return;
        }
    
        uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
        uv_tcp_init(loop, client);
        if (uv_accept(server, (uv_stream_t*) client) == 0) {
            uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
    }

    uv_accept

    on_new_connection > uv_accept

    根据不同的协议调用不同的方法, 该例子 tcp 调用 uv__stream_open 方法

    uv__stream_open 设置给初始化完成的 client 流设置了 i/o 观察者的 fd。该 fd 即是 uv__server_io 中提到的 ConnectFD 。

    int uv_accept(uv_stream_t* server, uv_stream_t* client) {
      int err;
    
      assert(server->loop == client->loop);
    
      if (server->accepted_fd == -1)
        return UV_EAGAIN;
    
      switch (client->type) {
        case UV_NAMED_PIPE:
        case UV_TCP:
          err = uv__stream_open(client,
                                server->accepted_fd,
                                UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
          if (err) {
            /* TODO handle error */
            uv__close(server->accepted_fd);
            goto done;
          }
          break;
    
        case UV_UDP:
          err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
          if (err) {
            uv__close(server->accepted_fd);
            goto done;
          }
          break;
    
        default:
          return UV_EINVAL;
      }
    
      client->flags |= UV_HANDLE_BOUND;
    
    done:
      /* Process queued fds */
      if (server->queued_fds != NULL) {
        uv__stream_queued_fds_t* queued_fds;
    
        queued_fds = server->queued_fds;
    
        /* Read first */
        server->accepted_fd = queued_fds->fds[0];
    
        /* All read, free */
        assert(queued_fds->offset > 0);
        if (--queued_fds->offset == 0) {
          uv__free(queued_fds);
          server->queued_fds = NULL;
        } else {
          /* Shift rest */
          memmove(queued_fds->fds,
                  queued_fds->fds + 1,
                  queued_fds->offset * sizeof(*queued_fds->fds));
        }
      } else {
        server->accepted_fd = -1;
        if (err == 0)
          uv__io_start(server->loop, &server->io_watcher, POLLIN);
      }
      return err;
    }

    uv_read_start

    on_new_connection > uv_read_start

    开启一个流的监听工作

    • 挂载回调函数 read_cb 为例子中的 echo_read, 当流有数据写入时被调用

    • 挂载回调函数 alloc_cb 为例子中的 alloc_buffer

    • 调用 uv__io_start 函数, 这可是老朋友了, 通常用在 uv__io_init 初始化 i/o 观察者后面, 用于注册 i/o 观察者。

    uv_read_start 主要是调用了 uv__read_start 函数。开始了普通流的 i/o 过程。

    • 初始化 i/o 观察者在 uv_tcp_init > uv_tcp_init_ex > uv__stream_init > uv__io_init 设置其观察者回调函数为 uv__stream_io
    • 注册 i/o 观察者为 uv__io_start 开始监听工作。
    int uv__read_start(uv_stream_t* stream,
                       uv_alloc_cb alloc_cb,
                       uv_read_cb read_cb) {
      assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
          stream->type == UV_TTY);
    
      /* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just
       * expresses the desired state of the user.
       */
      stream->flags |= UV_HANDLE_READING;
    
      /* TODO: try to do the read inline? */
      /* TODO: keep track of tcp state. If we&#39;ve gotten a EOF then we should
       * not start the IO watcher.
       */
      assert(uv__stream_fd(stream) >= 0);
      assert(alloc_cb);
    
      stream->read_cb = read_cb;
      stream->alloc_cb = alloc_cb;
    
      uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
      uv__handle_start(stream);
      uv__stream_osx_interrupt_select(stream);
    
      return 0;
    }

    小结

    • uv_tcp_init 初始化 TCP Server handle, 其绑定的 fd 为 socket 返回的 socketFd。
    • uv_tcp_bind 调用 bind 为套接字分配一个地址
    • uv_listen 调用 listen 开始监听可能的连接请求
    • uv_accept 调用 accept 去接收一个新连接
    • uv_tcp_init 初始化 TCP Client handle, 其绑定的 fd 为 accept 返回的 acceptFd, 剩下的就是一个普通流的读写 i/o 观察。

    原文地址:https://juejin.cn/post/6982226661081088036

    作者:多小凯

    Weitere Kenntnisse zum Thema Programmierung finden Sie unter: Programmiervideos! !

    Das obige ist der detaillierte Inhalt vonLassen Sie uns über Netzwerk und Streaming in Node.js sprechen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

    Erklärung dieser Website
    Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn

    Heiße KI -Werkzeuge

    Undress AI Tool

    Undress AI Tool

    Ausziehbilder kostenlos

    Undresser.AI Undress

    Undresser.AI Undress

    KI-gestützte App zum Erstellen realistischer Aktfotos

    AI Clothes Remover

    AI Clothes Remover

    Online-KI-Tool zum Entfernen von Kleidung aus Fotos.

    Clothoff.io

    Clothoff.io

    KI-Kleiderentferner

    Video Face Swap

    Video Face Swap

    Tauschen Sie Gesichter in jedem Video mühelos mit unserem völlig kostenlosen KI-Gesichtstausch-Tool aus!

    Heiße Werkzeuge

    Notepad++7.3.1

    Notepad++7.3.1

    Einfach zu bedienender und kostenloser Code-Editor

    SublimeText3 chinesische Version

    SublimeText3 chinesische Version

    Chinesische Version, sehr einfach zu bedienen

    Senden Sie Studio 13.0.1

    Senden Sie Studio 13.0.1

    Leistungsstarke integrierte PHP-Entwicklungsumgebung

    Dreamweaver CS6

    Dreamweaver CS6

    Visuelle Webentwicklungstools

    SublimeText3 Mac-Version

    SublimeText3 Mac-Version

    Codebearbeitungssoftware auf Gottesniveau (SublimeText3)

    Detaillierte grafische Erläuterung des Speichers und des GC der Node V8-Engine Detaillierte grafische Erläuterung des Speichers und des GC der Node V8-Engine Mar 29, 2023 pm 06:02 PM

    Dieser Artikel vermittelt Ihnen ein detailliertes Verständnis des Speichers und Garbage Collectors (GC) der NodeJS V8-Engine. Ich hoffe, er wird Ihnen hilfreich sein!

    Ein Artikel über Speichersteuerung in Node Ein Artikel über Speichersteuerung in Node Apr 26, 2023 pm 05:37 PM

    Der nicht blockierende und ereignisgesteuerte Knotendienst hat den Vorteil eines geringen Speicherverbrauchs und eignet sich sehr gut für die Verarbeitung massiver Netzwerkanforderungen. Unter der Voraussetzung massiver Anfragen müssen Probleme im Zusammenhang mit der „Speicherkontrolle“ berücksichtigt werden. 1. Der Garbage-Collection-Mechanismus und die Speicherbeschränkungen von V8 Js wird von der Garbage-Collection-Maschine gesteuert

    Lassen Sie uns darüber sprechen, wie Sie das beste Node.js-Docker-Image auswählen. Lassen Sie uns darüber sprechen, wie Sie das beste Node.js-Docker-Image auswählen. Dec 13, 2022 pm 08:00 PM

    Die Auswahl eines Docker-Images für Node mag trivial erscheinen, aber die Größe und potenziellen Schwachstellen des Images können erhebliche Auswirkungen auf Ihren CI/CD-Prozess und Ihre Sicherheit haben. Wie wählen wir also das beste Node.js-Docker-Image aus?

    Lassen Sie uns darüber sprechen, wie Sie mit pkg Node.js-Projekte in ausführbare Dateien packen. Lassen Sie uns darüber sprechen, wie Sie mit pkg Node.js-Projekte in ausführbare Dateien packen. Dec 02, 2022 pm 09:06 PM

    Wie packe ich die ausführbare Datei von nodejs mit pkg? Im folgenden Artikel erfahren Sie, wie Sie mit pkg ein Node-Projekt in eine ausführbare Datei packen. Ich hoffe, dass er Ihnen weiterhilft!

    Lassen Sie uns über die Ereignisschleife in Node sprechen Lassen Sie uns über die Ereignisschleife in Node sprechen Apr 11, 2023 pm 07:08 PM

    Die Ereignisschleife ist ein grundlegender Bestandteil von Node.js und ermöglicht die asynchrone Programmierung, indem sie sicherstellt, dass der Hauptthread nicht blockiert wird. Das Verständnis der Ereignisschleife ist für die Erstellung effizienter Anwendungen von entscheidender Bedeutung. Der folgende Artikel wird Ihnen ein detailliertes Verständnis der Ereignisschleife in Node vermitteln. Ich hoffe, er wird Ihnen hilfreich sein!

    Was soll ich tun, wenn der Knoten den Befehl npm nicht verwenden kann? Was soll ich tun, wenn der Knoten den Befehl npm nicht verwenden kann? Feb 08, 2023 am 10:09 AM

    Der Grund, warum der Knoten den Befehl npm nicht verwenden kann, liegt darin, dass die Umgebungsvariablen nicht richtig konfiguriert sind. Die Lösung ist: 1. Öffnen Sie „Systemeigenschaften“ 2. Suchen Sie nach „Umgebungsvariablen“ -> „Systemvariablen“ und bearbeiten Sie dann die Umgebung Variablen; 3. Suchen Sie den Speicherort des NodeJS-Ordners. 4. Klicken Sie auf „OK“.

    Erfahren Sie mehr über Puffer in Node Erfahren Sie mehr über Puffer in Node Apr 25, 2023 pm 07:49 PM

    Zu Beginn lief JS nur auf der Browserseite. Es war einfach, Unicode-codierte Zeichenfolgen zu verarbeiten, aber es war schwierig, binäre und nicht Unicode-codierte Zeichenfolgen zu verarbeiten. Und Binär ist das Datenformat der niedrigsten Ebene des Computer-, Video-/Audio-/Programm-/Netzwerkpakets

    Lassen Sie uns ausführlich über das File-Modul in Node sprechen Lassen Sie uns ausführlich über das File-Modul in Node sprechen Apr 24, 2023 pm 05:49 PM

    Das Dateimodul ist eine Kapselung der zugrunde liegenden Dateioperationen, wie z. B. Lesen/Schreiben/Öffnen/Schließen/Löschen von Dateien, Hinzufügen usw. Das größte Merkmal des Dateimoduls besteht darin, dass alle Methoden zwei Versionen von **synchronem** und **bereitstellen. asynchron**, mit Methoden mit dem Suffix sync sind alle Synchronisationsmethoden, und diejenigen ohne sind alle heterogene Methoden.

    See all articles