如何正确使用boost :: asio在multithreading程序中处理fork()?

我有一些困难,如何正确处理从使用multithreading方式使用Boost Asio的multithreading程序创buildsubprocess。

如果我理解正确,在Unix世界中启动subprocess的方法是调用fork()然后调用exec*() 。 另外,如果我理解正确,调用fork()将复制所有文件描述符等等,这些需要在subprocess中closures除非标记为FD_CLOEXEC (并因此在调用exec*()时被primefacesclosures)。

当调用fork()以通过调用notify_fork()正确操作时,Boost Asio需要被通知。 但是,在一个multithreading的程序中,这会产生几个问题:

  1. 如果我理解正确,套接字默认由subprocessinheritance。 可以将它们设置为SOCK_CLOEXEC但不直接在创build *,因此如果从另一个线程创buildsubprocess,则会导致计时窗口。

  2. notify_fork()要求没有其他线程调用任何其他io_service函数 ,也没有任何与io_service关联的其他I / O对象上的任何函数 。 这似乎并不可行,毕竟这个程序是multithreading的。

  3. 如果我理解正确, fork()exec*()之间的任何函数调用都需要是asynchronous信号安全的(请参阅fork()文档 )。 没有关于asynchronous信号安全的notify_fork()调用的文档。 实际上,如果我查看Boost Asio的源代码(至less在版本1.54中),可能会有对pthread_mutex_lock的调用,如果我理解正确的话,这不是asynchronous信号安全的(请参阅Signal Concepts ,还有其他调用不在白名单上)。

问题#1我大概可以通过分离创buildsubprocess和套接字+文件来解决问题,这样我就可以确保在创build的套接字和设置SOCK_CLOEXEC之间的窗口中没有创buildsubprocess。 问题#2是棘手的,我可能需要确保所有的 asio处理程序线程都停止,做叉子,然后再次重新创build它们,这是最好的,真的很糟糕(我的待定计时器呢? )。 问题#3似乎使完全不可能使用这个正确的。

如何正确使用multithreading程序中的Boost Asio和fork() + exec*() …还是我“分叉”?

请让我知道,如果我误解了任何基本概念(我在Windows编程提出,而不是* nix …)。

编辑:* – 实际上可以创build套接字SOCK_CLOEXEC直接在Linux上设置,自2.6.27以来可用(请参阅socket()文档 )。 在Windows上,相应的标志WSA_FLAG_NO_HANDLE_INHERIT在Windows 7 SP 1 / Windows Server 2008 R2 SP 1之后可用(请参阅WSASocket()文档 )。 虽然OS X似乎不支持这一点。

在多线程程序中, io_service::notify_fork()在子中调用不安全。 然而,Boost.Asio期望基于fork()支持来调用它,因为这是当孩子关闭父母以前的内部文件描述符并创建新的时候。 虽然Boost.Asio明确列出了调用io_service::notify_fork()的前提条件,在fork()过程中保证其内部组件的状态,但是简单的看一下这个实现就表明std::vector::push_back()可能从免费商店分配内存,并且分配不保证是异步信号安全的。

有了这个说法,可能值得考虑的一个解决方案是fork()这个过程,它仍然是单线程的。 子进程将保持单线程,并在父进程通过进程间通信通知时执行fork()exec() 。 这种分离通过在执行fork()exec()不需要管理多个线程的状态来简化问题。


下面是一个演示这种方法的完整示例,其中多线程服务器将通过UDP接收文件名,子进程将执行fork()exec()来运行文件名中的/usr/bin/touch 。 为了使这个例子更具可读性,我选择了使用堆栈协程 。

 #include <unistd.h> // execl, fork #include <iostream> #include <string> #include <boost/bind.hpp> #include <boost/asio.hpp> #include <boost/asio/spawn.hpp> #include <boost/make_shared.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> /// @brief launcher receives a command from inter-process communication, /// and will then fork, allowing the child process to return to /// the caller. class launcher { public: launcher(boost::asio::io_service& io_service, boost::asio::local::datagram_protocol::socket& socket, std::string& command) : io_service_(io_service), socket_(socket), command_(command) {} void operator()(boost::asio::yield_context yield) { std::vector<char> buffer; while (command_.empty()) { // Wait for server to write data. std::cout << "launcher is waiting for data" << std::endl; socket_.async_receive(boost::asio::null_buffers(), yield); // Resize buffer and read all data. buffer.resize(socket_.available()); socket_.receive(boost::asio::buffer(buffer)); io_service_.notify_fork(boost::asio::io_service::fork_prepare); if (fork() == 0) // child { io_service_.notify_fork(boost::asio::io_service::fork_child); command_.assign(buffer.begin(), buffer.end()); } else // parent { io_service_.notify_fork(boost::asio::io_service::fork_parent); } } } private: boost::asio::io_service& io_service_; boost::asio::local::datagram_protocol::socket& socket_; std::string& command_; }; using boost::asio::ip::udp; /// @brief server reads filenames from UDP and then uses /// inter-process communication to delegate forking and exec /// to the child launcher process. class server { public: server(boost::asio::io_service& io_service, boost::asio::local::datagram_protocol::socket& socket, short port) : io_service_(io_service), launcher_socket_(socket), socket_(boost::make_shared<udp::socket>( boost::ref(io_service), udp::endpoint(udp::v4(), port))) {} void operator()(boost::asio::yield_context yield) { udp::endpoint sender_endpoint; std::vector<char> buffer; for (;;) { std::cout << "server is waiting for data" << std::endl; // Wait for data to become available. socket_->async_receive_from(boost::asio::null_buffers(), sender_endpoint, yield); // Resize buffer and read all data. buffer.resize(socket_->available()); socket_->receive_from(boost::asio::buffer(buffer), sender_endpoint); std::cout << "server got data: "; std::cout.write(&buffer[0], buffer.size()); std::cout << std::endl; // Write filename to launcher. launcher_socket_.async_send(boost::asio::buffer(buffer), yield); } } private: boost::asio::io_service& io_service_; boost::asio::local::datagram_protocol::socket& launcher_socket_; // To be used as a coroutine, server must be copyable, so make socket_ // copyable. boost::shared_ptr<udp::socket> socket_; }; int main(int argc, char* argv[]) { std::string filename; // Try/catch provides exception handling, but also allows for the lifetime // of the io_service and its IO objects to be controlled. try { if (argc != 2) { std::cerr << "Usage: <port>\n"; return 1; } boost::thread_group threads; boost::asio::io_service io_service; // Create two connected sockets for inter-process communication. boost::asio::local::datagram_protocol::socket parent_socket(io_service); boost::asio::local::datagram_protocol::socket child_socket(io_service); boost::asio::local::connect_pair(parent_socket, child_socket); io_service.notify_fork(boost::asio::io_service::fork_prepare); if (fork() == 0) // child { io_service.notify_fork(boost::asio::io_service::fork_child); parent_socket.close(); boost::asio::spawn(io_service, launcher(io_service, child_socket, filename)); } else // parent { io_service.notify_fork(boost::asio::io_service::fork_parent); child_socket.close(); boost::asio::spawn(io_service, server(io_service, parent_socket, std::atoi(argv[1]))); // Spawn additional threads. for (std::size_t i = 0; i < 3; ++i) { threads.create_thread( boost::bind(&boost::asio::io_service::run, &io_service)); } } io_service.run(); threads.join_all(); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << "\n"; } // Now that the io_service and IO objects have been destroyed, all internal // Boost.Asio file descriptors have been closed, so the execl should be // in a clean state. If the filename has been set, then exec touch. if (!filename.empty()) { std::cout << "creating file: " << filename << std::endl; execl("/usr/bin/touch", "touch", filename.c_str(), static_cast<char*>(0)); } } 

1号航站楼:

  $ ls
 a.out example.cpp
 $ ./a.out 12345
服务器正在等待数据
发射器正在等待数据
服务器获取数据:a
服务器正在等待数据
发射器正在等待数据
创建文件:a
服务器获取数据:b
服务器正在等待数据
发射器正在等待数据
创建文件:b
服务器获得数据:c
服务器正在等待数据
发射器正在等待数据
创建文件:c
 ctrl + c
 $ ls
 a.out bc example.cpp 

2号航站楼:

  $ nc -u 127.0.0.1 12345
 ctrl + d b ctrl + d c ctrl + d 

考虑以下:

  • fork()在子进程中只创建一个线程。 您将需要重新创建其他线程。
  • 由父进程中的其他线程持有的互斥锁在子进程中永远保持永久锁定状态,因为拥有的线程无法在fork()生存。 使用pthread_atfork()注册的回调函数可以释放互斥锁,但大多数的函数库永远不会使用pthread_atfork() 。 换句话说,当你调用malloc()new时,你的子进程可能永远挂起,因为标准堆分配器确实使用了互斥体。

鉴于上述情况,多线程进程中唯一健壮的选项是调用fork() ,然后调用exec()

请注意,只要不使用pthread_atfork()处理函数,您的父进程不受fork()影响。


关于分叉和boost::asio ,有一个io_service::notify_fork()函数需要在父代和父代分叉之前调用。 它最终取决于所使用的反应堆。 对于Linux / UNIX反应器, epoll_reactorepoll_reactordev_poll_reactorkqueue_reactor这个函数在fork之前对父kqueue_reactor不做任何事情,但是在子dev_poll_reactor ,它重新创建reactor状态并重新注册文件描述符。 不过,我不确定它在Windows上的功能。

其用法的一个例子可以在process_per_connection.cpp中找到,你可以复制它:

 void handle_accept(const boost::system::error_code& ec) { if (!ec) { // Inform the io_service that we are about to fork. The io_service cleans // up any internal resources, such as threads, that may interfere with // forking. io_service_.notify_fork(boost::asio::io_service::fork_prepare); if (fork() == 0) { // Inform the io_service that the fork is finished and that this is the // child process. The io_service uses this opportunity to create any // internal file descriptors that must be private to the new process. io_service_.notify_fork(boost::asio::io_service::fork_child); // The child won't be accepting new connections, so we can close the // acceptor. It remains open in the parent. acceptor_.close(); // The child process is not interested in processing the SIGCHLD signal. signal_.cancel(); start_read(); } else { // Inform the io_service that the fork is finished (or failed) and that // this is the parent process. The io_service uses this opportunity to // recreate any internal resources that were cleaned up during // preparation for the fork. io_service_.notify_fork(boost::asio::io_service::fork_parent); socket_.close(); start_accept(); } } else { std::cerr << "Accept error: " << ec.message() << std::endl; start_accept(); } }