【问题标题】:Simple socket non-blocking I/O简单套接字非阻塞 I/O
【发布时间】:2011-07-16 00:17:38
【问题描述】:

我正在尝试在服务器和客户端之间进行非阻塞 I/O。两者连接后,我尝试使用 fork 处理 IO,但服务器端在尝试读取“传输端点未连接”时出错,并且发生两次(因为我猜是 fork?) .

服务器代码

//includes taken out

#define PORT "4950"
#define STDIN 0

struct sockaddr name;

void set_nonblock(int socket) {
    int flags;
    flags = fcntl(socket,F_GETFL,0);
    assert(flags != -1);
    fcntl(socket, F_SETFL, flags | O_NONBLOCK);
}


// get sockaddr, IPv4 or IPv6:
void *get_in_addr(struct sockaddr *sa) {
    if (sa->sa_family == AF_INET)
        return &(((struct sockaddr_in*)sa)->sin_addr);

    return &(((struct sockaddr_in6*)sa)->sin6_addr);
}


int main(int agrc, char** argv) {
    int status, sock, adrlen, new_sd;

    struct addrinfo hints;
    struct addrinfo *servinfo;  //will point to the results

    //store the connecting address and size
    struct sockaddr_storage their_addr;
    socklen_t their_addr_size;

    //socket infoS
    memset(&hints, 0, sizeof hints); //make sure the struct is empty
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM; //tcp
    hints.ai_flags = AI_PASSIVE;     //use local-host address

    //get server info, put into servinfo
    if ((status = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo)) != 0) {
        fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(status));
        exit(1);
    }

    //make socket
    sock = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
    if (sock < 0) {
        printf("\nserver socket failure %m", errno);
        exit(1);
    }

    //allow reuse of port
    int yes=1;
    if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int)) == -1) {
        perror("setsockopt");
        exit(1);
    }

    //unlink and bind
    unlink("127.0.0.1");
    if(bind (sock, servinfo->ai_addr, servinfo->ai_addrlen) < 0) {
        printf("\nBind error %m", errno);
        exit(1);
    }

    freeaddrinfo(servinfo);

    //listen
    if(listen(sock, 5) < 0) {
        printf("\nListen error %m", errno);
        exit(1);
    } 
    their_addr_size = sizeof(their_addr);
    //accept
    new_sd = accept(sock, (struct sockaddr*)&their_addr, &their_addr_size);
    if( new_sd < 0) {
        printf("\nAccept error %m", errno);
        exit(1);
    }

    cout<<"\nSuccessful Connection!";

    //set nonblock
    set_nonblock(new_sd);

    char* in = new char[255];
    char* out = new char[255];
    int numSent;
    int numRead;
    pid_t pid;

    fork();
    pid = getpid();

    if(pid == 0) {

        while( !(out[0] == 'q' && out[1] == 'u' && out[2] == 'i' && out[3] == 't') ) {

            fgets(out, 255, stdin);
            numSent = send(sock, out, strlen(out), 0);

            if(numSent < 0) {
                printf("\nError sending %m", errno);
                exit(1);
            }   //end error
        }   //end while
    }   //end child

    else {
        numRead = recv(sock, in, 255, 0);
        if(numRead < 0) {
            printf("\nError reading %m", errno);
            exit(1);
        }   //end error
        else {
            cout<<in;
            for(int i=0;i<255;i++)
                in[i] = '\0';

        }   //end else
    }   //end parent

    cout<<"\n\nExiting normally\n";
    return 0;
}

客户端代码

//包括取出

#define PORT "4950"

struct sockaddr name;

void set_nonblock(int socket) {
    int flags;
    flags = fcntl(socket,F_GETFL,0);
    assert(flags != -1);
    fcntl(socket, F_SETFL, flags | O_NONBLOCK);
}


// get sockaddr, IPv4 or IPv6:
void *get_in_addr(struct sockaddr *sa) {
    if (sa->sa_family == AF_INET)
        return &(((struct sockaddr_in*)sa)->sin_addr);

    return &(((struct sockaddr_in6*)sa)->sin6_addr);
}

int main(int agrc, char** argv) {
    int status, sock, adrlen;

    struct addrinfo hints;
    struct addrinfo *servinfo;  //will point to the results

    memset(&hints, 0, sizeof hints); //make sure the struct is empty
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM; //tcp
    hints.ai_flags = AI_PASSIVE;     //use local-host address

    //get server info, put into servinfo
    if ((status = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo)) != 0) {
        fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(status));
        exit(1);
    }

    //make socket
    sock = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
    if (sock < 0) {
        printf("\nserver socket failure %m", errno);
        exit(1);
    }

    if(connect(sock, servinfo->ai_addr, servinfo->ai_addrlen) < 0) {
        printf("\nclient connection failure %m", errno);
        exit(1);
    }

    cout<<"\nSuccessful connection!";

    //set nonblock
    set_nonblock(sock);

    char* out = new char[255];
    char* in = new char[255];
    int numRead;
    int numSent;
    pid_t pid;

    fork();
    pid = getpid();

    if(pid == 0) {

        while( !(out[0] == 'q' && out[1] == 'u' && out[2] == 'i' && out[3] == 't') ) {

            fgets(out, 255, stdin);
            numSent = send(sock, out, strlen(out), 0);

            if(numSent < 0) {
                printf("\nError sending %m", errno);
                exit(1);
            }   //end error
        }   //end while
    }   //end child process

    else {
        while( !(in[0] == 'q' && in[1] == 'u' && in[2] == 'i' && in[3] == 't') ) {
        numRead = recv(sock, in, 255, 0);
            cout<<in;
            for(int i=0;i<255;i++)
                in[i] = '\0';
        }
    }   //end parent process

    cout<<"\n\nExiting normally\n";
    return 0;
}

我还尝试使用线程来执行 I/O。那里的问题是,当我运行程序时,就像线程没有发生一样。该程序仅以“成功连接”运行,然后“正常退出”。我将一些 cout 语句放入 while(1) 循环中,它们确实打印了几次,但它们只是由于某种原因停止了。我不确定这是我的线程还是我的套接字的问题。代码(与上面非常相似)在这里 -

服务器

//includes taken out

#define PORT "4950"
#define STDIN 0

pthread_t readthread;
pthread_t sendthread;

char* in = new char[255];
char* out = new char[255];
int numSent;
int numRead;

struct sockaddr name;
int sock, new_sd;

void* readThread(void* threadid) {

    while(1) {

        numRead = recv(new_sd, in, 255, 0);

        if(numRead > 0) {
            cout<<"\n"<<in;
            for(int i=0;i<strlen(in);i++)
                in[i] = '\0';
        }   //end if
        else if(numRead < 0) {
            printf("\nError reading %m", errno);
            exit(1);
        }

    }   //end while
}   //END READTHREAD


void* sendThread(void* threadid) {

     while(1) {

        cin.getline(out, 255);

        numSent = send(new_sd, out, 255, 0);

        if(numSent > 0) {
            for(int i=0;i<strlen(out);i++)
                out[i] = '\0';
        }   //end if
        else if(numSent < 0) {
            printf("\nError sending %m", errno);
            exit(1);
        }
    }   //end while
}   //END SENDTHREAD

void set_nonblock(int socket) {
    int flags;
    flags = fcntl(socket,F_GETFL,0);
    assert(flags != -1);
    fcntl(socket, F_SETFL, flags | O_NONBLOCK);
}

// get sockaddr, IPv4 or IPv6:
void *get_in_addr(struct sockaddr *sa) {
    if (sa->sa_family == AF_INET)
        return &(((struct sockaddr_in*)sa)->sin_addr);

    return &(((struct sockaddr_in6*)sa)->sin6_addr);
}

int main(int agrc, char** argv) {
    int status, adrlen;

    struct addrinfo hints;
    struct addrinfo *servinfo;  //will point to the results

    //store the connecting address and size
    struct sockaddr_storage their_addr;
    socklen_t their_addr_size;

    //socket infoS
    memset(&hints, 0, sizeof hints); //make sure the struct is empty
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM; //tcp
    hints.ai_flags = AI_PASSIVE;     //use local-host address

    //get server info, put into servinfo
    if ((status = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo)) != 0) {
        fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(status));
        exit(1);
    }

    //make socket
    sock = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
    if (sock < 0) {
        printf("\nserver socket failure %m", errno);
        exit(1);
    }

    //allow reuse of port
    int yes=1;
    if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int)) == -1) {
        perror("setsockopt");
        exit(1);
    }

    //unlink and bind
    unlink("127.0.0.1");
    if(bind (sock, servinfo->ai_addr, servinfo->ai_addrlen) < 0) {
        printf("\nBind error %m", errno);
        exit(1);
    }

    freeaddrinfo(servinfo);

    //listen
    if(listen(sock, 5) < 0) {
        printf("\nListen error %m", errno);
        exit(1);
    }

    their_addr_size = sizeof(their_addr);
    //accept
    new_sd = accept(sock, (struct sockaddr*)&their_addr, &their_addr_size);
    if( new_sd < 0) {
        printf("\nAccept error %m", errno);
        exit(1);
    }  

    cout<<"\nSuccessful Connection!";

    //set nonblock
    set_nonblock(new_sd);

    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    pthread_create(&readthread, &attr, readThread, (void*)0);
    pthread_create(&sendthread, &attr, sendThread, (void*)1);

    cout<<"\n\nExiting normally\n";
    return 0;
}

客户

#define PORT "4950"

pthread_t readthread;
pthread_t sendthread;
char* in = new char[255];
char* out = new char[255];
int numSent;
int numRead;
struct sockaddr name;
int sock;

void* readThread(void* threadid) {

    while(1) {

        numRead = recv(sock, in, 255, 0);

        if(numRead > 0) {
            cout<<"\n"<<in;
            for(int i=0;i<strlen(in);i++)
                in[i] = '\0';
        }   //end if
        else if(numRead < 0) {
            printf("\nError reading %m", errno);
            exit(1);
        }
    }   //end while
}   //END READTHREAD

void* sendThread(void* threadid) {

    while(1) {

        cin.getline(out, 255);
        numSent = send(sock, out, 255, 0);

        if(numSent > 0) {
            for(int i=0;i<strlen(out);i++)
                out[i] = '\0';
        }   //end if
        else if(numSent < 0) {
            printf("\nError sending %m", errno);
            exit(1);
        }

    }   //end while
}   //END SENDTHREAD

void set_nonblock(int socket) {
    int flags;
    flags = fcntl(socket,F_GETFL,0);
    assert(flags != -1);
    fcntl(socket, F_SETFL, flags | O_NONBLOCK);
}


// get sockaddr, IPv4 or IPv6:
void *get_in_addr(struct sockaddr *sa) {
    if (sa->sa_family == AF_INET)
        return &(((struct sockaddr_in*)sa)->sin_addr);

    return &(((struct sockaddr_in6*)sa)->sin6_addr);
}

int main(int agrc, char** argv) {
    int status, adrlen;

    struct addrinfo hints;
    struct addrinfo *servinfo;  //will point to the results

    memset(&hints, 0, sizeof hints); //make sure the struct is empty
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM; //tcp
    hints.ai_flags = AI_PASSIVE;     //use local-host address

    //get server info, put into servinfo
    if ((status = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo)) != 0) {
        fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(status));
        exit(1);
    }

    //make socket
    sock = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
    if (sock < 0) {
        printf("\nserver socket failure %m", errno);
        exit(1);
    }

    if(connect(sock, servinfo->ai_addr, servinfo->ai_addrlen) < 0) {
        printf("\nclient connection failure %m", errno);
        exit(1);
    }

    cout<<"\nSuccessful connection!";

    //set nonblock
    set_nonblock(sock);

    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    pthread_create(&readthread, &attr, readThread, (void*)0);
    pthread_create(&sendthread, &attr, sendThread, (void*)1);

    cout<<"\n\nExiting normally\n";
    return 0;
}

我为这篇冗长的帖子道歉,但我已经在这几天了,不知道如何继续。我尝试使用 select(),但对于 1 个客户端和一个服务器 I/O 来说,这似乎有点多。如果有人能指出上面可能出现的问题或任何其他提示(或者我只是对 select 完全错误 :)),我将不胜感激。

【问题讨论】:

  • 如果您喜欢阅读源代码,请查看 node.js 项目。他们在 IO 方面做得非常好,而且一切都有很好的记录。你可以借用一些类似的概念,或者直接拿代码,它是 MIT。

标签: c++ sockets tcp io fork


【解决方案1】:

我认为你的问题在这里:

numSent = send(sock, out, strlen(out), 0);

这里:

numRead = recv(sock, in, 255, 0);

你试图在你的监听套接字上sendrecv。您需要使用new_sd,这是接受的套接字。请参阅accept(2) 的手册页。

【讨论】:

  • 谢谢,我把服务器端切换到那个了。但是,现在尝试接收“资源暂时不可用”时出现错误。什么资源不可用?
  • 那个错误(EAGAIN)表示套接字是非阻塞的,没有数据。如果你检查手册页,你会看到它在那里拼写出来。
【解决方案2】:

我已经有一段时间没有做过任何 POSIX 编程了,但是下面的代码似乎有点奇怪:

fork();
pid = getpid();
if(pid == 0) {

IIRC,你应该检查子进程的 fork: 0 的返回值,父进程中子进程的 pid。

还没有看过你的其余代码:)

【讨论】:

  • 我试过了,没有帮助。我这样做只是因为我之前看到的某个线程就是这样做的。
  • 好吧,你的 getpid() 永远不应该返回 0,检查来自 fork() 的返回是正确的方法......错误在别处:)。我建议最终查看一个更高级别的异步框架,但跟踪当前错误仍然是一个很好的练习 - 抱歉,我现在没有时间彻底查看代码。
【解决方案3】:

我查看了前两个代码服务器和客户端。不幸的是,您错误地设计了程序。服务器应该执行以下操作(在多进程的情况下):

  1. 创建套接字
  2. 绑定到端口号
  3. 更改为被动模式(通过 listen())。
  4. 转到循环以接受传入连接。这里sock 只负责监听新连接,它不负责像你一样发送/接收。
    • 接受新连接。接受()
    • 创建一个新进程
    • 新进程负责发送/接收。父进程应返回第 4 步以侦听新连接。

这是一个伪代码

while (1) {
    new_sd = accept(....);
    if (new_sd < 0) continue; // or quit
    if (fork() == 0) {
         // do sendin and receiving USING new_sd
         exit(0);  // child terminates here
    }
}

更新:由于您正在寻找非阻塞场景,我建议您使用 select() 或 poll() 系统调用。

我觉得你想创建两个进程,一个进程用于发送,另一个用于接收。如果是这样,那么您不需要将 new_sd 设置为非阻塞模式,因为这两个进程同时运行。如果您打算这样做,那么子进程将创建另一个进程,以便第一个子进程发送,第二个子进程接收。如下:

while (1) {
    new_sd = accept(....);
    if (new_sd < 0) continue; // or quit
    if (fork() == 0) {
         pid = fork();
         if (pid == 0) then do_sending(new_sd);
         else do_receiving(new_sd);
         exit(0);  // child terminates here
    }
}

【讨论】:

  • 服务器代码现在看起来像这样。在客户端,我做了同样的事情减去了接受调用(它只是 while(1) 和 io 的分叉)。但是,当客户端连接时,当我尝试读取或发送错误号为“资源暂时不可用”时,我只会收到一个无限错误。你知道这意味着什么吗?
  • 这绝对是一个更好的实现。正如我上面所说,“资源暂时不可用”是由错误 EAGAIN 导致的,这表明套接字是非阻塞的并且没有数据。这似乎有点奇怪,但并非所有错误都是平等的。如果您阅读 send(2) 和 recv(2) 的手册页,您会发现不同的错误代码意味着不同的事情,其中​​一些需要以不同于打印错误消息的方式处理。
  • 每当我将其更改为阻塞时,我都会因对等错误而重置连接并冻结我的计算机。我试过用谷歌搜索,但大多数人只是问如何创建该错误而不是如何解决它。
猜你喜欢
  • 2011-09-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-05-24
  • 1970-01-01
  • 2020-12-08
  • 1970-01-01
相关资源
最近更新 更多