【发布时间】:2017-01-20 22:27:35
【问题描述】:
我们正在尝试创建一个通过 tcp 套接字对文件进行简单修改的项目。我们使用 Asio 创建了向端口发送和接收数据的类:
//ReceiveData.hpp
#pragma once
#include <string>
#include <thread>
#include <chrono>
#include <vector>
#include <asio.hpp>
#include <iostream>
#include "SendReceiveConsts.hpp" //contains global io service object
using asio::ip::tcp;
typedef unsigned short ushort;
class ReceiveData {
private:
asio::io_service service;
tcp::acceptor acceptor;
tcp::socket socket;
public:
ReceiveData(ushort port = 8008) : acceptor(SendReceive::global_io_service, tcp::endpoint(tcp::v4(), port)),
socket(SendReceive::global_io_service) { }
// Can return any amount on the socket stream
template<size_t N>
inline std::string receive() {
std::string message;
try {
if (!this->socket.is_open()) {
this->acceptor.accept(socket);
}
SendReceive::global_io_service.run();
std::array<char, N> buf;
asio::error_code error;
size_t len = this->socket.read_some(asio::buffer(buf), error);
if(error)
throw asio::system_error(error);
std::copy(buf.begin(), buf.end(), std::back_inserter(message));
} catch(asio::error_code& e) {
std::cout << e.message() << std::endl;
return "-1";
} catch (std::exception& e) {
std::cout << e.what() << std::endl;
return "-1";
}
return message;
}
inline void stop() {
asio::error_code error;
socket.shutdown(tcp::socket::shutdown_type::shutdown_send, error);
}
};
.
//SendData.hpp
#pragma once
#ifdef __GNUC__
#define DEPRECATED(func) func __attribute__ ((deprecated))
#elif defined(_MSC_VER)
#define DEPRECATED(func) __declspec(deprecated) func
#else
#pragma message("WARNING: You need to implement DEPRECATED for this compiler")
#define DEPRECATED(func) func
#endif
#include <asio.hpp>
#include <chrono>
#include <vector>
#include <string>
#include <tuple>
#include <iostream>
#include "SendReceiveConsts.hpp"
using asio::ip::tcp;
typedef unsigned short ushort;
class SendData {
private:
tcp::resolver resolver;
tcp::resolver::query query;
tcp::socket socket;
tcp::resolver::iterator endpoint_iterator;
std::string IP;
ushort port;
inline void send_string(std::string dataToSend, const char &separator = '\0') {
if(!this->socket.is_open()) {
asio::connect(this->socket, this->endpoint_iterator);
}
SendReceive::global_io_service.run();
std::string MISTCompliant = dataToSend;
MISTCompliant.push_back(separator);
printf("Sent %lu to %s\n", asio::write(socket, asio::buffer(MISTCompliant.c_str(), MISTCompliant.length())), IP.c_str());
};
public:
SendData(std::string IP, ushort port)
: resolver(SendReceive::global_io_service),
query(IP, std::to_string(port)),
socket(SendReceive::global_io_service) {
this->IP = IP;
this->port = port;
this->endpoint_iterator = resolver.resolve(this->query);
}
~SendData() { stop(); }
DEPRECATED(void simple_send(std::string data));
inline void send(std::string data, const char &separator = '\0') {
send_string(data, separator);
}
inline void stop() {
asio::error_code error;
this->socket.shutdown(tcp::socket::shutdown_type::shutdown_receive, error);
if(error) {
printf("An error occurred when shutting down SendData socket: %s (File: %s, Line %i)\n", error.message().c_str(), __FILE__, __LINE__);
}
this->socket.close();
printf("Socket closed.\n");
}
inline std::tuple<std::string, ushort> get_raw_info() {
return std::tuple<std::string, ushort>(this->IP, this->port);
}
};
使用这些类,我们能够从主两台从机发送数据,它们能够可靠地接收数据。但是,主设备永远无法从从设备接收数据。这是在master上运行的代码:
//HashFile.cpp
#include <string>
#include <fstream>
#include <time.h>
#include <thread>
#include <chrono>
#include <networking/SendData.hpp>
#include <networking/ReceiveData.hpp>
#include <MIST.hpp>
#include <Machine.hpp>
#define FILE_SIZE 60
std::string random_salt(std::string s) {
std::string copy = "";
std::string chars = "abcdefghijklmnopqrstufwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12345678910!@#$%^&*()_+-=";
for(std::string::iterator i = s.begin(); i != s.end(); i++) {
copy.push_back(*i);
srand(time(0));
if((rand() % 100) < 10) {
srand(time(0));
copy.push_back(chars.at(rand() % chars.length()));
}
}
return copy;
}
std::string add_salt(std::string s) {
std::string copy = "";
for(std::string::iterator i = s.begin(); i != s.end(); i++) {
copy.push_back(*i);
if(*i == '6') {
copy.push_back('w');
copy.push_back('h');
copy.push_back('a');
copy.push_back('t');
} else if(*i == 'c' && *i == '9' && *i == 'D') { //c9D
//HG6v
copy.pop_back();
copy.push_back(*i);
copy.push_back('G');
copy.push_back('6');
copy.push_back('v');
} else if(tolower(*i) == 'm' && tolower(*(i + 1)) == 'i') {
copy.push_back('s');
copy.push_back('t');
}
}
return copy;
}
int main() {
std::vector<MIST::Machine> machines_used = { MIST::Machine("local"), MIST::Machine("Helper 1", "25.88.30.47", false), MIST::Machine("Helper 2", "25.88.123.114", false) }; //Hamachi IP addresses
auto mist = MIST::MIST(true, machines_used);
std::ifstream hash;
std::string data1 = "";
std::string data2 = "";
std::string mydata = "";
printf("Dangerously large file being imported into code...\n");
hash.open("testfile_smol", std::fstream::binary);
if(hash.is_open()) {
try {
char chunk;
int counter = 0;
while(hash.get(chunk)) {
if(counter < FILE_SIZE / 3) {
data1 += chunk;
counter++;
} else if(counter < FILE_SIZE * (2.0f / 3.0f)) {
data2 += chunk;
counter++;
} else {
mydata += chunk;
counter++;
}
}
} catch(std::exception& e) {
std::cerr << "Error encountered: " << e.what() << std::endl;
}
}
hash.close();
printf("data1: %s data2: %s mydata: %s", data1.substr(0, 10).c_str(), data2.substr(0, 10).c_str(), mydata.substr(0, 10).c_str());
ProtobufMIST::Task task;
task.set_task_name("hash");
std::string serialized;
task.SerializeToString(&serialized);
const char c = 185;
printf("Send all!\n");
std::string s1 = "1" + data1 + c + serialized;
std::string s2 = "2" + data2 + c + serialized;
mist.send_task(s1, "Helper 1", 1025);
printf("Updated first task!\n");
mist.send_task(s2, "Helper 2", 1025); //Just a wrapper for SendData, as described in SendData.h
printf("Updated first task!\n");
std::string mydata_salted = add_salt(random_salt(mydata)); //TODO: Add pepper
printf("Old mydata size: %zu\nNew mydata size: %zu\n", mydata.length(), mydata_salted.length());
std::string one(""), two("");
unsigned short port1 = 1026;
unsigned short port2 = 1027;
auto receive_slaves = [=](unsigned short& port, std::string& out) {
bool got = false;
printf("Looking for string on port %u\n", port);
while(!got) {
auto slave = new ReceiveData(port); //As defined in ReceiveData.hpp
std::string x = slave->receive<1>();
printf("Got chunk: %s\n", x.c_str());
if(!(x.find((char)182) != std::string::npos || x == "-1")) {
out += x;
} else {
got = true;
}
delete slave;
}
printf("Received full string!\n");
};
printf("Openning both receive channels...\n");
printf("Waiting for strings...\n");
//THIS IS WHERE IT BREAKS
receive_slaves(port2, two); //Never gets response
receive_slaves(port1, one); //Never gets response
printf("Received all parts!\n");
printf("Removing delimiters...\n");
one.erase(std::remove(one.begin(), one.end(), (char)182), one.end());
two.erase(std::remove(two.begin(), two.end(), (char)182), two.end());
std::ofstream output;
output.open("Hashed");
output << one << two << mydata_salted;
output.close();
printf("Aloha!\n");
return 0;
}
如您所见,整个文件旨在读取固定大小的文件,将第三个发送到两个从属设备,并用随机字符和其他转换“加盐”最后三分之一。但是,如上所述,程序在接收其他已完成的部分时会卡住。这是奴隶代码的样子
//HashFile.cpp (Slave)
#include <string>
#include <fstream>
#include <thread>
#include <chrono>
#include <time.h>
#include <MIST.pb.h>
#include <networking/SendData.hpp>
#include <networking/ReceiveData.hpp>
#include <MIST.hpp>
#include <Machine.hpp>
#include <stdlib.h>
std::string data;
std::string task;
std::string firstTwoChars;
ProtobufMIST::Task _task;
const char d = 182;
const char d_spc = 185;
int part;
std::string random_salt(std::string s) {
std::string copy = "";
std::string chars = "abcdefghijklmnopqrstufwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12345678910!@#$%^&*()_+-=";
for (std::string::iterator i = s.begin(); i != s.end(); i++) {
copy.push_back(*i);
srand(time(0));
if ((rand() % 100) < 10) {
srand(time(0));
copy.push_back(chars.at(rand() % chars.length()));
}
}
return copy;
}
std::string add_salt(std::string s) {
std::string copy = "";
for (std::string::iterator i = s.begin(); i != s.end(); i++) {
copy.push_back(*i);
if (*i == '6') {
copy.push_back('w');
copy.push_back('h');
copy.push_back('a');
copy.push_back('t');
}
else if (*i == 'c' && *i == '9' && *i == 'D') { //c9D
copy.pop_back();
copy.push_back(*i);
copy.push_back('G');
copy.push_back('6');
copy.push_back('v');
}
else if (tolower(*i) == 'm' && tolower(*(i + 1)) == 'i') {
copy.push_back('s');
copy.push_back('t');
}
}
return copy;
}
//Previously defined salting functions
void hash()
{
data = random_salt(data); //randomly salt
data = add_salt(data); //add random chars
}
int main()
{
MIST::Task taskThing("hash", *hash);
ReceiveData * rObj = new ReceiveData(1025);
SendData sObj("25.88.220.173", 1027);
std::cout << "Receiving first char \n";
firstTwoChars = rObj->receive<1>();
//delete rObj;
std::cout << firstTwoChars << std::endl;
std::cout << "Received first char \n";
int slavePart;
if (firstTwoChars == "1") {
slavePart = 1;
}
else if (firstTwoChars == "2") {
slavePart = 2;
}
else
std::cout << "You messed up, what part is it? \n";
std::cout << "Is part " << slavePart << std::endl;
bool dataRecieved = false;
while (!dataRecieved)
{
std::string chunk = rObj->receive<1>();
if (chunk == "-1" || chunk.find((char)182) != std::string::npos) {
std::cout << "Data recieved \n";
dataRecieved = true;
}
else
{
data += chunk;
std::cout << "Added chunk: " << chunk << std::endl;
}
chunk.clear();
}
std::cout << "All Data recieved! \n Parsing now \n";
if (data.find(d_spc) != std::string::npos)
{
size_t data_before = data.find(d_spc); //find where data ends and task begins
std::cout << "Data found at " << data_before << "bytes. \n";
std::string task = data.substr(data_before); //copy task to new string
std::cout << "Task copied: " << task << std::endl;
data.erase(data_before);//erase everything that was the task from it's original string
}
else {
std::cout << "Did not find d_spc \n";
std::abort();
}
std::cout << "Data parsed \n Data: \n";
std::cout << data << std::endl;
std::cout << "Task: " << task << std::endl;
if (_task.ParseFromString(task))
{
std::cout << "Task parsed properly \n";
}
else {
std::cout << "I messed up parsing, trying again \n";
if (_task.ParseFromString(task)) {
task.pop_back();
std::cout << "Worked the second time! \n";
}
else {
std::cout << "Still messed up \n";
std::abort();
}
}
if(_task.task_name() == "hash")
taskThing.run();
std::cout << taskThing.getID();
std::cout << "Sending... \n";
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::string t_str = std::to_string(slavePart) + data;
//THIS IS WHERE IT BREAKS
sObj.send(t_str, d); //error
int x;
std::cin >> x;
std::cout << "Sent! \n";
}
这是尝试重新连接到主服务器时出现的错误:
connect:连接尝试失败,因为连接方成功了 一段时间后没有正确响应,或建立连接 失败,因为主机没有响应。:connect: 一个连接 尝试失败,因为连接方没有正确响应 一段时间后,或建立的连接失败,因为 主机没有响应。
从我们所处的位置来看,这没有任何意义,因为在 master 上运行了一个监听进程。我们感觉这与我们在发送和接收类中包装 Asio 的方式有关,但我们不确定在哪里。
作为参考,这是一个跨平台项目。主服务器在 Ubuntu 16.10 上运行,两个从服务器在 Windows 10 上运行。我们禁用了防火墙并在 Hamachi 上运行项目。另请注意,每个从属设备都在自己的端口上发送完成的字符串部分。
【问题讨论】:
标签: sockets c++14 boost-asio winsock