Fix various oversights/bugs in ZMQ RPC server code

- Add some RPC commands (and touch up a couple others)
- some bounds checking
- some better pointer management
- const correctness and error handling

-- Thanks @vtnerd for type help with serialization and CMake changes
This commit is contained in:
Thomas Winget
2017-09-05 12:20:40 -04:00
parent 77986023c3
commit 0299cb77ca
15 changed files with 536 additions and 225 deletions

View File

@@ -45,10 +45,6 @@ ZmqServer::ZmqServer(RpcHandler& h) :
ZmqServer::~ZmqServer()
{
for (zmq::socket_t* socket : sockets)
{
delete socket;
}
}
void ZmqServer::serve()
@@ -58,31 +54,36 @@ void ZmqServer::serve()
{
try
{
for (zmq::socket_t* socket : sockets)
zmq::message_t message;
if (!rep_socket)
{
zmq::message_t message;
throw std::runtime_error("ZMQ RPC server reply socket is null");
}
while (rep_socket->recv(&message))
{
std::string message_string(reinterpret_cast<const char *>(message.data()), message.size());
while (socket->recv(&message))
{
std::string message_string(reinterpret_cast<const char *>(message.data()), message.size());
MDEBUG(std::string("Received RPC request: \"") + message_string + "\"");
MDEBUG(std::string("Received RPC request: \"") + message_string + "\"");
std::string response = handler.handle(message_string);
std::string response = handler.handle(message_string);
zmq::message_t reply(response.size());
memcpy((void *) reply.data(), response.c_str(), response.size());
zmq::message_t reply(response.size());
memcpy((void *) reply.data(), response.c_str(), response.size());
socket->send(reply);
MDEBUG(std::string("Sent RPC reply: \"") + response + "\"");
}
rep_socket->send(reply);
MDEBUG(std::string("Sent RPC reply: \"") + response + "\"");
}
}
catch (boost::thread_interrupted& e)
catch (const boost::thread_interrupted& e)
{
MDEBUG("ZMQ Server thread interrupted.");
}
catch (const zmq::error_t& e)
{
MERROR(std::string("ZMQ error: ") + e.what());
}
boost::this_thread::interruption_point();
}
}
@@ -95,26 +96,20 @@ bool ZmqServer::addIPCSocket(std::string address, std::string port)
bool ZmqServer::addTCPSocket(std::string address, std::string port)
{
zmq::socket_t *new_socket = nullptr;
try
{
std::string addr_prefix("tcp://");
new_socket = new zmq::socket_t(context, ZMQ_REP);
rep_socket.reset(new zmq::socket_t(context, ZMQ_REP));
new_socket->setsockopt(ZMQ_RCVTIMEO, DEFAULT_RPC_RECV_TIMEOUT_MS);
rep_socket->setsockopt(ZMQ_RCVTIMEO, DEFAULT_RPC_RECV_TIMEOUT_MS);
std::string bind_address = addr_prefix + address + std::string(":") + port;
new_socket->bind(bind_address.c_str());
sockets.push_back(new_socket);
rep_socket->bind(bind_address.c_str());
}
catch (std::exception& e)
catch (const std::exception& e)
{
MERROR(std::string("Error creating ZMQ Socket: ") + e.what());
if (new_socket)
{
delete new_socket;
}
return false;
}
return true;