【发布时间】:2021-03-09 16:32:14
【问题描述】:
我有一个 p2p 网络,我的套接字发送要么发送不完整的数据,要么在发送完整数据之前中断。我不确定发生了什么,或者这里是否有其他问题。下面是我的发送代码,我在其中循环并发送,直到发送整个 msg。我注意到在下面的侦听器代码中,我的 json_loads 正在引发异常,当我打印出缓冲区时,由于即时发送的字典不完整,因此发送中似乎缺少一些字节。不知道我在这里做错了什么。有人知道吗?
def unicast(self, msg, recipient_node):
#print('UNICAST HERE')
msg = json.dumps(msg)
msg_len = len(msg) # add the msg length header with the delimiter here
msg = f'{msg_len}--->{msg}'
byte_msg = bytes(msg, 'utf-8')
unicast_len = len(byte_msg)
total_sent = 0
with self.socket_lock:
try:
while total_sent < unicast_len:
#time.sleep(0.01) # necessary for GIL issues (connection reset by peer)
sent = recipient_node['socket'].send(byte_msg[total_sent:])
if sent == 0: # disconnected?
print('HERE BOIS')
break
total_sent += sent
except Exception as e:
print('HERE BOIS #2')
pass # disconnected socket or some error that will be handled by listener
这是我的侦听器代码,其中我有一个缓冲区,我相信一旦相应地接收到完整的 json 字符串,它就会解析它们。
msg_length = None
buffer = ''
while inputs:
#print(socket_to_identifier)
readable, writable, exceptional = select.select(inputs, outputs, inputs)
for s in readable:
if s is server_socket:
client_socket, client_address = server_socket.accept()
self.sockets_listening += 1
client_socket.setblocking(0)
inputs.append(client_socket)
message_queues[client_socket] = queue.Queue()
else:
data = s.recv(1024)
if data:
#print('NEW RECEIVE HERE')
message_queues[s].put(data)
if s not in outputs:
outputs.append(s)
decoded_recv = data.decode('utf-8')
buffer += decoded_recv
while True:
if msg_length is None:
if '--->' not in buffer:
break
length_str, ignored, buffer = buffer.partition('--->')
msg_length = int(length_str)
if len(buffer) < msg_length:
break
msg = buffer[:msg_length]
buffer = buffer[msg_length:]
msg_length = None
#print(msg)
msg = json.loads(msg)
if 'node_identifier' in msg: # this is the on connect msg sent
socket_to_identifier[s] = msg['node_identifier']
break
#print(msg)
#b_del_thread = Thread(target=self.basic_deliver, args=(msg,))
#b_del_thread.start()
self.basic_deliver(msg)
else:
self.handle_disconnect(socket_to_identifier.get(s, None))
print('disconnected boys')
if s in outputs:
outputs.remove(s)
inputs.remove(s)
s.close()
del message_queues[s]
for s in exceptional:
self.handle_disconnect(socket_to_identifier.get(s, None))
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
del message_queues[s]
【问题讨论】: