在C ++上的套接字协议缓冲区

我试图在Linux平台上探索协议缓冲区(PB),我的编程语言是C ++。 我在协议缓冲区在线文档中find了一些例子,但没有具体给套接字发送和接收(或者我完全错过了:))。 所以我决定在实际消息之前添加消息长度并通过套接字发送。 如果有人能够提出比我打算做的更好的解决scheme,我将不胜感激,而且还有什么东西可以在PB上做成,用来创build这样的数据包。

但是我仍然要在服务器端解决数据包问题。 说客户端发送一个10字节的数据包,其中前4个字节是数据包的长度; 但是在解码数据包之前不可能知道长度。 所以,即使我读了前4个字节,我如何推断与读取数据包使用协议缓冲区的值。

最后我可以得到它的工作。 我在这里发布代码,以便人们可以审查和评论,以及如果有人想在c ++中实现它,这段代码可以帮助。 这是一个破旧的代码,我的意图是让Protobuf长时间工作前缀的方式。 我从我不记得的一些网站的客户端服务器的代码,我已经修改它,以适应protobuf。 在这里,服务器首先窥探到套接字并获取总数据包的长度,然后实际读取套接字以读取整个数据包。 可以有万亿种方法来做到这一点,但为了快速解决这个问题,我是这样做的。 但我需要找到一个更好的方法来避免每个包2 recv,但在我的情况下,所有的消息是不同的大小,所以这是我猜测的唯一方法。

Proto文件

message log_packet { required fixed64 log_time =1; required fixed32 log_micro_sec =2; required fixed32 sequence_no =3; required fixed32 shm_app_id =4; required string packet_id =5; required string log_level=6; required string log_msg=7; } 

协议缓冲区客户代码

 #include <unistd.h> #include "message.pb.h" #include <iostream> #include <google/protobuf/message.h> #include <google/protobuf/descriptor.h> #include <google/protobuf/io/zero_copy_stream_impl.h> #include <google/protobuf/io/coded_stream.h> #include <google/protobuf/io/zero_copy_stream_impl_lite.h> using namespace google::protobuf::io; using namespace std; int main(int argv, char** argc){ /* Coded output stram */ log_packet payload ; payload.set_log_time(10); payload.set_log_micro_sec(10); payload.set_sequence_no(1); payload.set_shm_app_id(101); payload.set_packet_id("TST"); payload.set_log_level("DEBUG"); payload.set_log_msg("What shall we say then"); cout<<"size after serilizing is "<<payload.ByteSize()<<endl; int siz = payload.ByteSize()+4; char *pkt = new char [siz]; google::protobuf::io::ArrayOutputStream aos(pkt,siz); CodedOutputStream *coded_output = new CodedOutputStream(&aos); coded_output->WriteVarint32(payload.ByteSize()); payload.SerializeToCodedStream(coded_output); int host_port= 1101; char* host_name="127.0.0.1"; struct sockaddr_in my_addr; char buffer[1024]; int bytecount; int buffer_len=0; int hsock; int * p_int; int err; hsock = socket(AF_INET, SOCK_STREAM, 0); if(hsock == -1){ printf("Error initializing socket %d\n",errno); goto FINISH; } p_int = (int*)malloc(sizeof(int)); *p_int = 1; if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )|| (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){ printf("Error setting options %d\n",errno); free(p_int); goto FINISH; } free(p_int); my_addr.sin_family = AF_INET ; my_addr.sin_port = htons(host_port); memset(&(my_addr.sin_zero), 0, 8); my_addr.sin_addr.s_addr = inet_addr(host_name); if( connect( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){ if((err = errno) != EINPROGRESS){ fprintf(stderr, "Error connecting socket %d\n", errno); goto FINISH; } } for (int i =0;i<10000;i++){ for (int j = 0 ;j<10;j++) { if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 ) { fprintf(stderr, "Error sending data %d\n", errno); goto FINISH; } printf("Sent bytes %d\n", bytecount); usleep(1); } } delete pkt; FINISH: close(hsock); } 

协议缓冲服务器代码

 #include <fcntl.h> #include <string.h> #include <stdlib.h> #include <errno.h> #include <stdio.h> #include <netinet/in.h> #include <resolv.h> #include <sys/socket.h> #include <arpa/inet.h> #include <unistd.h> #include <pthread.h> #include "message.pb.h" #include <iostream> #include <google/protobuf/io/coded_stream.h> #include <google/protobuf/io/zero_copy_stream_impl.h> using namespace std; using namespace google::protobuf::io; void* SocketHandler(void*); int main(int argv, char** argc){ int host_port= 1101; struct sockaddr_in my_addr; int hsock; int * p_int ; int err; socklen_t addr_size = 0; int* csock; sockaddr_in sadr; pthread_t thread_id=0; hsock = socket(AF_INET, SOCK_STREAM, 0); if(hsock == -1){ printf("Error initializing socket %d\n", errno); goto FINISH; } p_int = (int*)malloc(sizeof(int)); *p_int = 1; if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )|| (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){ printf("Error setting options %d\n", errno); free(p_int); goto FINISH; } free(p_int); my_addr.sin_family = AF_INET ; my_addr.sin_port = htons(host_port); memset(&(my_addr.sin_zero), 0, 8); my_addr.sin_addr.s_addr = INADDR_ANY ; if( bind( hsock, (sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){ fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %d\n",errno); goto FINISH; } if(listen( hsock, 10) == -1 ){ fprintf(stderr, "Error listening %d\n",errno); goto FINISH; } //Now lets do the server stuff addr_size = sizeof(sockaddr_in); while(true){ printf("waiting for a connection\n"); csock = (int*)malloc(sizeof(int)); if((*csock = accept( hsock, (sockaddr*)&sadr, &addr_size))!= -1){ printf("---------------------\nReceived connection from %s\n",inet_ntoa(sadr.sin_addr)); pthread_create(&thread_id,0,&SocketHandler, (void*)csock ); pthread_detach(thread_id); } else{ fprintf(stderr, "Error accepting %d\n", errno); } } FINISH: ;//oops } google::protobuf::uint32 readHdr(char *buf) { google::protobuf::uint32 size; google::protobuf::io::ArrayInputStream ais(buf,4); CodedInputStream coded_input(&ais); coded_input.ReadVarint32(&size);//Decode the HDR and get the size cout<<"size of payload is "<<size<<endl; return size; } void readBody(int csock,google::protobuf::uint32 siz) { int bytecount; log_packet payload; char buffer [siz+4];//size of the payload and hdr //Read the entire buffer including the hdr if((bytecount = recv(csock, (void *)buffer, 4+siz, MSG_WAITALL))== -1){ fprintf(stderr, "Error receiving data %d\n", errno); } cout<<"Second read byte count is "<<bytecount<<endl; //Assign ArrayInputStream with enough memory google::protobuf::io::ArrayInputStream ais(buffer,siz+4); CodedInputStream coded_input(&ais); //Read an unsigned integer with Varint encoding, truncating to 32 bits. coded_input.ReadVarint32(&siz); //After the message's length is read, PushLimit() is used to prevent the CodedInputStream //from reading beyond that length.Limits are used when parsing length-delimited //embedded messages google::protobuf::io::CodedInputStream::Limit msgLimit = coded_input.PushLimit(siz); //De-Serialize payload.ParseFromCodedStream(&coded_input); //Once the embedded message has been parsed, PopLimit() is called to undo the limit coded_input.PopLimit(msgLimit); //Print the message cout<<"Message is "<<payload.DebugString(); } void* SocketHandler(void* lp){ int *csock = (int*)lp; char buffer[4]; int bytecount=0; string output,pl; log_packet logp; memset(buffer, '\0', 4); while (1) { //Peek into the socket and get the packet size if((bytecount = recv(*csock, buffer, 4, MSG_PEEK))== -1){ fprintf(stderr, "Error receiving data %d\n", errno); }else if (bytecount == 0) break; cout<<"First read byte count is "<<bytecount<<endl; readBody(*csock,readHdr(buffer)); } FINISH: free(csock); return 0; } 

不幸的是,protobuf不提供“打包”(分隔)您的消息的方式:

如果要将多条消息写入单个文件或流,则由您来确定一条消息结束和下一条消息开始的位置。 协议缓冲区的连线格式不是自定义的,因此协议缓冲区分析程序不能自行确定消息的结束位置。 解决这个问题最简单的方法就是在写消息之前写下每条消息的大小。

(从他们的文档 )

所以,他们基本上推荐你到达的相同的解决方案。