【问题标题】:Escape sequences appearing in socket messages出现在套接字消息中的转义序列
【发布时间】:2026-01-04 20:45:02
【问题描述】:

我正在处理两个文件,一个生成数据的生成器(用 Python 编写)和一个基于 C 的套接字服务器,它应该侦听然后处理生成器发送给它的数据。

生成器创建一批消息,其中每条消息由\n 分隔。所以一个示例批次是: x0x foo.bar.baz 45 1429694987 \nx1x foo.bar.baz 45 1429694987 \n 在我的 C 套接字中,我将 1000 个字节读入缓冲区。我想将整个消息拆分为单独的字符串,以分隔符 \n 分隔。我已经这样做了,但是当消息超过我之前提到的 1000 字节长度时,我遇到了问题。如果消息不完整,即它没有以\n 结尾,我想存储我收到的消息的任何部分,然后在接下来的 1000 个字节到达时使用它来重建完整的消息。

当批处理长度超过缓冲区的 1000 字节长度时,现在发生的情况是似乎引入了一些随机字符。示例输出(从终端复制)如下所示:

x0x foo.bar.baz 45 1429694987 
x1x foo.bar.baz 45 1429694987
x2x foo.bar.baz 45 1429694987
x3x foo.bar.baz 45 1429694987
x4x foo.bar.baz 45 1429694987
x5x foo.bar.baz 45 1429694987
x6x foo.bar.baz 45 1429694987
x7x foo.bar.baz 45 1429694987
x8x foo.bar.baz 45 1429694987
x9x foo.bar.baz 45 1429694987
x10x foo.bar.baz 45 1429694987
x11x foo.bar.baz 45 1429694987
x12x foo.bar.baz 45 1429694987
x13x foo.bar.baz 45 1429694987
x14x foo.bar.baz 45 1429694987
x15x foo.bar.baz 45 1429694987
x16x foo.bar.baz 45 1429694987
x17x foo.bar.baz 45 1429694987
x18x foo.bar.baz 45 1429694987
x19x foo.bar.baz 45 1429694987
x20x foo.bar.baz 45 1429694987
x21x foo.bar.baz 45 1429694987
x22x foo.bar.baz 45 1429694987
x23x foo.bar.baz 45 1429694987
x24x foo.bar.baz 45 1429694987
x25x foo.bar.baz 45 1429694987
x26x foo.bar.baz 45 1429694987
x27x foo.bar.baz 45 1429694987
x28x foo.bar.baz 45 1429694987
x29x foo.bar.baz 45 1429694987
x30x foo.bar.baz 45 1429694987
x31x foo.bar.baz 4▒▒▒p▒

我怀疑这个问题是由于某些字符在传输中以某种方式被切断造成的,但我无法弄清楚出了什么问题。我根本不是 C 专家,我正在努力寻找问题所在。我在下面包含了我的 C 代码:(对任何 n00b 错误表示歉意!)

#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <syslog.h>
#include <sys/time.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <netdb.h>

#include "../librdkafka/src/rdkafka.h"

static const int PORT = 3135;

 //delta between these two allows for prefixing of cut off data.
static const int BUFFER_SIZE = 1000;
static const int FULL_MSG_SIZE = 1200;
static const char *BROKERS = "192.168.50.11:9092";
static const char *TOPIC = "test";

/**
 * Socket error handling
 */
void error(const char *msg){
    perror(msg);
    exit(1);
}

/**
 * Kafka logger calback
 */
static void logger (const rd_kafka_t *rk, int level,
                    const char *fac, const char *buf) {
        struct timeval tv;
        gettimeofday(&tv, NULL);
        fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
                (int)tv.tv_sec, (int)(tv.tv_usec / 1000),
                level, fac, rd_kafka_name(rk), buf);
}

/**
 * Creates thread for each incoming connection and pushes data to Kafka.
 */
void *streamHandler(void *pnewsock){
    int number = * (int *) pnewsock;
    printf("Starting thread %d\n", number);
    /* Configuration */
    rd_kafka_topic_t *rkt;
    rd_kafka_t *rk;
    rd_kafka_conf_t *conf;
    rd_kafka_topic_conf_t *topic_conf;
    int partition = RD_KAFKA_PARTITION_UA;
    int quiet = 0;
    char errstr[512];

    /* Socket config */
    char buffer[BUFFER_SIZE];
    bzero(buffer, BUFFER_SIZE);
    int msg;

/* initialize kafka conf variables */
conf = rd_kafka_conf_new();
topic_conf = rd_kafka_topic_conf_new();

/* Create Kafka handle */
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
                        errstr, sizeof(errstr)))) {
        fprintf(stderr,
                "%% Failed to create new producer: %s\n",
                errstr);
        exit(1);
}

/* Set logger */
rd_kafka_set_logger(rk, logger);
rd_kafka_set_log_level(rk, LOG_DEBUG);

/* Add brokers */
if (rd_kafka_brokers_add(rk, BROKERS) == 0) {
        fprintf(stderr, "%% No valid brokers specified\n");
        exit(1);
}

/* Create topic */
rkt = rd_kafka_topic_new(rk, TOPIC, topic_conf);

strcpy(buffer, "");
char *last_msg = "";
char full_msg[FULL_MSG_SIZE] ;
bzero(full_msg, FULL_MSG_SIZE);
char * pch;
char delimiter[2] = "\n";
char *final;
int last_msg_complete = 0;
int res = 0;
while(1){
    if (recv(number, buffer, sizeof(buffer), MSG_PEEK | MSG_DONTWAIT) == 0){
        // connection has closed, so kill thread
        break;
    }

    msg = read(number, buffer, BUFFER_SIZE-1);
    if(msg < 0){
            error("ERROR reading from socket");
            break;
    }
    // prefix the last message (empty if no partial message from previous   transmit)
    printf("Concatenating now %s || %s", last_msg, buffer);
    strcat(full_msg, last_msg);
    strcat(full_msg, buffer);
    final = &full_msg[(strlen(full_msg)-1)];

    last_msg_complete = strcmp(delimiter, final); // 0 if ends in \n, other value otherwise

    // consume the received data and send to kafka.
    pch = strtok(full_msg, "\n");
    while(pch != NULL){            last_msg = '\0';
        last_msg = strdup(pch);

        pch = strtok(NULL, "\n");
        if ((pch != NULL) || (last_msg_complete == 0)){
                // only send this message if it isn't the last one
                // OR if it is the last one, only if it ends in \n
                res = rd_kafka_produce(rkt, partition,
                                RD_KAFKA_MSG_F_COPY,
                                last_msg, strlen(last_msg),
                                NULL, 0,
                                NULL);
                if (res == -1){
                        fprintf(stderr,
                         "%% Failed to produce to topic %s "
                        "partition %i:%s \n",
                        rd_kafka_topic_name(rkt), partition,
                        rd_kafka_err2str(rd_kafka_errno2err(errno)));
                }
                if (!quiet){
                    printf("full_msg %s\n", full_msg);
                    printf("buffer %s\n", buffer);
                    printf("last_msg %s\n", last_msg);
                }
                rd_kafka_poll(rk, 0);
                free(last_msg);
        }
    }
    free(pch);
    // empty the buffer and full_msg.
    bzero(buffer, BUFFER_SIZE);
    bzero(full_msg, FULL_MSG_SIZE);
}
printf("Killed thread connection %d\n", number);

/* Destroy topic */
rd_kafka_topic_destroy(rkt);

/* Destroy the handle */
rd_kafka_destroy(rk);
pthread_exit(&number);
return NULL;
}


int main(){
    int sock, newsock;
    pthread_t thread;
    int reuseaddr = 1;

struct sockaddr_in serv_addr;
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(PORT);

    // create the listening socket
    sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock == -1){
            perror("socket creation issue");
            return 1;
    }

    // enable the socket to reuse the address
    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(int)) == -1){
            perror("socket address reuse");
            return 1;
    }

    // bind to address
    if (bind(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0){
            perror("bind error");
            return 1;
    }

    // Listen for new connections.
    listen(sock, 5);

    // When new connections arrive, create a thread with a socket object
    while(1){
            struct sockaddr_in their_addr;
            socklen_t size = sizeof(their_addr);
            puts("waiting");
            newsock = accept(sock, (struct sockaddr*)&their_addr, &size);
            puts("done waiting");

            if(newsock == -1){
                    perror("main loop accept error");
            } else{
                    printf("Connected to %s on port %d\n", inet_ntoa(their_addr.sin_addr), their_addr.sin_port);
                    if(pthread_create(&thread, NULL, streamHandler, &newsock) != 0){
                            fprintf(stderr, "Could not create thread \n");
                    }
            }
    }

    close(sock);

    return 0;

}

【问题讨论】:

  • 您需要使用read 的返回,而不仅仅是检查它是否有错误。我也不认为将strcat 与缓冲区一起使用是一个好主意,因为read 不会引入任何\0。您可以使用strncat 并给它msg,这是read 实际读取的字节数
  • 好的,那我应该如何使用read的返回值呢?关于缺少\0 我认为公平点,但我显示的输出是缓冲区的内容,所以它不会导致我猜的奇怪字符。
  • @danielvdende:实际上这可能是奇怪字符的原因。 strcat 将继续读取字节,直到到达 \0。由于您在缓冲区的末尾没有一个,因此您将获得缓冲区的内容以及缓冲区之后的任何内容,直到内存中碰巧有一个 \0 ......所以你会得到奇怪的字符适合缓冲区的最后一部分消息的结尾。
  • @danielvdende read 返回它读取的字节数,换句话说,它在缓冲区中更新的实际数据字节数。您不应该使用所有缓冲区,就好像read 确实读取了 1000 个字节。例如,如果您的消息是"foo\n"read 可以返回从负数到 4 的任何值,不一定在一次调用中读取所有值。
  • @psmears - 我不太确定 - 我认为代码 bzero() 是缓冲区,但流程不清楚。跳过bzero() 调用肯定会更清楚,只需在检查read() 的返回错误后执行buffer[msg]='\0'; 以NUL 终止字符串。此外,当strtok() 返回NULL 和@987654347 时,last_msg 泄漏@call 似乎没有非 malloc 的内存。如果read() 调用返回零会发生什么?里面有很多问题。

标签: python c sockets


【解决方案1】:

当连接关闭时read() 返回零时会发生什么?带有MSG_PEEKrecv() 调用不会告诉您连接已关闭-您很可能会陷入read(),阻塞,并让read() 返回零当连接关闭时。

换句话说,使用MSG_PEEK 执行非阻塞recv() 来检查关闭的连接是行不通的。

【讨论】:

  • 好的,好点。我在网上搜索时发现了这个,但正如我所说,我不是 C 专家。您应该如何检查已关闭的连接?
  • 检查来自read() 的返回是否为零。如果为零,则连接关闭。 ... else if ( 0 == msg ) { break; } 在 if 语句之后检查 msg = read(...) 是否有错误。
最近更新 更多