subprocess的asynchronous双向IOredirect

我想弄清楚一个subprocess的asynchronous双向IOredirect的一般化方法。 基本上,我想产生一个等待input的交互式subprocess,任何输出都应该被回读。 我试图通过产生一个新的Python进程来实验python.subprocess。 一个简单的例子试图实现如下

process = subprocess.Popen(['/usr/bin/python'],shell=False,stdin=subprocess.PIPE, stdout=subprocess.PIPE) while True: output = process.stdout.readline() print output input = sys.stdin.readline() process.stdin.write(input) 

并执行上面的代码片断挂起没有任何输出。 我尝试使用/usr/bash/usr/bin/irb但结果完全相同。 我的猜测是,IOredirect并不能很好地缓冲IO。

所以我的问题是,读取subprocess的输出而不刷新缓冲区或退出subprocess是否可行?

下面的post提到IPC套接字,但为此我将不得不改变subprocess,这可能是不可行的。 有没有其他方法可以实现呢?

注意***我的最终目标是创build一个可以与远程Web客户端交互的服务器REPL过程。 虽然给出的例子是Python,但我的最终目标是通过一个通用的包装器来包装所有可用的REPL。


在答案中的一些build议的帮助下,我想出了以下内容

 #!/usr/bin/python import subprocess, os, select proc = subprocess.Popen(['/usr/bin/python'],shell=False,stdin=subprocess.PIPE, stdout=subprocess.PIPE,stderr=subprocess.PIPE) for i in xrange(0,5): inputready, outputready, exceptready = select.select([proc.stdout, proc.stderr],[proc.stdout, proc.stderr],[proc.stdout, proc.stderr],0) if not inputready: print "No Data", print inputready, outputready, exceptready for s in inputready: print s.fileno(),s.readline() proc.terminate() print "After Terminating" for i in xrange(0,5): inputready, outputready, exceptready = select.select([proc.stdout, proc.stderr],[proc.stdout, proc.stderr],[proc.stdout, proc.stderr],0) if not inputready: print "No Data", print inputready, outputready, exceptready for s in inputready: print s.fileno(),s.readline() 

现在,虽然程序没有陷入僵局,但不幸的是没有输出。 运行上面的代码,我得到

 No Data [] [] [] No Data [] [] [] No Data [] [] [] No Data [] [] [] No Data [] [] [] After Terminating No Data [] [] [] No Data [] [] [] No Data [] [] [] No Data [] [] [] No Data [] [] [] 

只是FYI,运行Python为

 /usr/bin/python 2>&1|tee test.out 

似乎工作得很好。

我也想出了一个'C'的代码。 但结果并没有什么不同。

 int kbhit() { struct timeval tv; fd_set fds; tv.tv_sec = tv.tv_usec = 0; FD_ZERO(&fds); FD_SET(STDIN_FILENO, &fds); select(STDIN_FILENO+1, &fds, NULL, NULL, &tv); return FD_ISSET(STDIN_FILENO, &fds); } void receive(char *str) { char ch; fprintf(stderr,"IN1\n"); if(!kbhit()) return; fprintf(stderr,"IN2\n"); fprintf(stderr,"%d\n",kbhit()); for(;kbhit() && (ch=fgetc(stdin))!=EOF;) { fprintf(stderr,"%c,%d",ch,kbhit()); } fprintf(stderr,"Done\n"); } int main(){ pid_t pid; int rv, pipeP2C[2],pipeC2P[2]; pipe(pipeP2C); pipe(pipeC2P); pid=fork(); if(pid){ dup2(pipeP2C[1],1); /* Replace stdout with out side of the pipe */ close(pipeP2C[0]); /* Close unused side of pipe (in side) */ dup2(pipeC2P[0],0); /* Replace stdin with in side of the pipe */ close(pipeC2P[1]); /* Close unused side of pipe (out side) */ setvbuf(stdout,(char*)NULL,_IONBF,0); /* Set non-buffered output on stdout */ sleep(2); receive("quit()\n"); wait(&rv); /* Wait for child process to end */ fprintf(stderr,"Child exited with a %d value\n",rv); } else{ dup2(pipeP2C[0],0); /* Replace stdin with the in side of the pipe */ close(pipeP2C[1]); /* Close unused side of pipe (out side) */ dup2(pipeC2P[1],1); /* Replace stdout with the out side of the pipe */ close(pipeC2P[0]); /* Close unused side of pipe (out side) */ setvbuf(stdout,(char*)NULL,_IONBF,0); /* Set non-buffered output on stdout */ close(2), dup2(1,2); /*Redirect stderr to stdout */ if(execl("/usr/bin/python","/usr/bin/python",NULL) == -1){ fprintf(stderr,"execl Error!"); exit(1); } } return 0; } 

在你发布的Python代码中,你没有使用正确的流:

 inputready, outputready, exceptready = select.select( [proc.stdout, proc.stderr], # read list [proc.stdout, proc.stderr], # write list [proc.stdout, proc.stderr], # error list. 0) # time out. 

我还没有尝试修复它,但我敢打赌,阅读和写入同一组流是不正确的。


您的示例中出现了多种错误。 第一个是你作为一个子进程启动的python可执行文件没有输出。 第二个是有一个竞争条件,因为你可以在子进程产生输出之前连续调用5次select() ,在这种情况下,你将在读取任何东西之前终止进程。

我解决了上面提到的三个问题(写清单,开始一个产生输出和竞争条件的过程)。 试试这个例子,看看它是否适合你:

 #!/usr/bin/python import subprocess, os, select, time path = "/usr/bin/python" proc = subprocess.Popen([path, "foo.py"], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) for i in xrange(0,5): time.sleep(1) inputready, outputready, exceptready = select.select( [proc.stdout, proc.stderr], [proc.stdin,], [proc.stdout, proc.stderr, proc.stdin], 0) if not inputready: print "No Data", print inputready, outputready, exceptready for s in inputready: print s.fileno(),s.readline() proc.terminate() print "After Terminating" for i in xrange(0,5): inputready, outputready, exceptready = select.select( [proc.stdout, proc.stderr], [proc.stdin,], [proc.stdout, proc.stderr, proc.stdin], 0) if not inputready: print "No Data", print inputready, outputready, exceptready for s in inputready: print s.fileno(),s.readline() 

我用的foo.py文件包含这个:

 #!/usr/bin/python print "Hello, world!" 

以下版本(大部分删除冗余输出以使结果更容易阅读):

 #!/usr/bin/python import subprocess, os, select, time path = "/usr/bin/python" proc = subprocess.Popen([path, "foo.py"], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) for i in xrange(0,5): time.sleep(1) inputready, outputready, exceptready = select.select( [proc.stdout, proc.stderr], [proc.stdin,], [proc.stdout, proc.stderr, proc.stdin], 0) for s in inputready: line = s.readline() if line: print s.fileno(), line proc.terminate() print "After Terminating" for i in xrange(0,5): time.sleep(1) inputready, outputready, exceptready = select.select( [proc.stdout, proc.stderr], [proc.stdin,], [proc.stdout, proc.stderr, proc.stdin], 0) for s in inputready: line = s.readline() if line: print s.fileno(), line 

给出以下输出:

5你好,世界!

终止后

请注意,出于某种原因,在select.select()使用timeout参数在我的系统上不会产生预期的结果,而是使用了time.sleep()


只是FYI,运行Python为

 /usr/bin/python 2>&1|tee test.out 

似乎工作得很好。

你不能得到这个效果,因为这个例子仍然给python解释器一个控制的tty。 如果没有控制tty,python解释器不会打印Python版本,也不会显示>>>提示符。

一个接近的例子就像下面这样。 您可以用包含要发送给解释器的命令的文件替换/dev/null

 /usr/bin/python </dev/null 2>&1|tee test.out 

如果您将控制tty(键盘)以外的任何东西重定向到进程的标准输入,则不会从python解释器获得输出。 这就是为什么你的代码似乎不工作。

有不同的方式来做到这一点。 你可以,例如:

  • 使用SysV消息队列和队列上的超时轮询消息到达
  • 使用O_NONBLOCK标志为子节点创建一个pipe(),为父节点创建一个pipe(),然后在文件描述符上选择()以获得数据(如果没有数据到达,甚至可以处理超时)
  • 使用套接字()AF_UNIX或AF_INET,将其设置为非阻塞,并选择()或epoll()数据到达
  • 当数据到达时,mmap()MAP_SHARED内存段并发信号通知另一个进程,注意带有锁定机制的共享段。

我用双管道在C中写了一个样本:

 #include <stdio.h> #include <stdlib.h> #include <errno.h> #include <string.h> #include <unistd.h> #include <sys/time.h> #include <sys/types.h> #include <sys/wait.h> #include <sys/stat.h> #include <sys/select.h> #include <fcntl.h> #include <signal.h> #define BUFLEN (6*1024) #define EXECFILE "/usr/bin/python" char *itoa(int n, char *s, int b) { static char digits[] = "0123456789abcdefghijklmnopqrstuvwxyz"; int i=0, sign; if ((sign = n) < 0) n = -n; do { s[i++] = digits[n % b]; } while ((n /= b) > 0); if (sign < 0) s[i++] = '-'; s[i] = '\0'; return s; } /* int set_nonblock(int sockfd) { // set socket to non blocking int arg,i; if ((arg=fcntl(sockfd, F_GETFL, NULL)) < 0) { printf("error getting socket flag for fd %i: fcntl(..., F_GETFL): %i\n", sockfd, errno); return -1; } // set O_NONBLOCK flag arg |= O_NONBLOCK; if ((i=fcntl(sockfd, F_SETFL, arg)) < 0) { printf("error setting socket flag for fd %i: fcntl(..., F_SETFL): %i\n", sockfd, errno); return -1; } return i; } int set_block(int sockfd) { // set socket to blocking int arg,i; if ((arg=fcntl(sockfd, F_GETFL, NULL)) < 0) { printf("error getting socket flag for fd %i: fcntl(..., F_GETFL): %i\n", sockfd, errno); return -1; } // clean O_NONBLOCK flag arg &= (~O_NONBLOCK); if ((i=fcntl(sockfd, F_SETFL, arg)) < 0) { printf("error setting socket flag for fd %i: fcntl(..., F_SETFL): %i\n", sockfd, errno); return -1; } return i; } */ int main() { FILE *input; char slice[BUFLEN]; int status = 0; pid_t pid; int err; int newfd; // if you want you can pass arguments to the program to execute // char *const arguments[] = {EXECFILE, "-v", NULL}; char *const arguments[] = {EXECFILE, NULL}; int father2child_pipefd[2]; int child2father_pipefd[2]; char *read_data = NULL; FILE *retclam; fd_set myset; int x=1; signal(SIGPIPE, SIG_IGN); newfd = dup(0); input = fdopen(newfd, "r"); pipe(father2child_pipefd); // Father speaking to child pipe(child2father_pipefd); // Child speaking to father pid = fork(); if (pid > 0) { // Father close(father2child_pipefd[0]); close(child2father_pipefd[1]); // Write to the pipe reading from stdin retclam = fdopen(child2father_pipefd[0], "r"); // set the two fd non blocking //set_nonblock(0); //set_nonblock(child2father_pipefd[0]); //set_nonblock(fileno(retclam)); while(x==1) { // clear the file descriptor set FD_ZERO(&myset); // add the stdin to the set FD_SET(fileno(input), &myset); // add the child pipe to the set FD_SET(fileno(retclam), &myset); // here we wait for data to arrive from stdin or from the child pipe. The last argument is a timeout, if you like err = select(fileno(retclam)+1, &myset, NULL, NULL, NULL); switch(err) { case -1: // Problem with select(). The errno variable knows why //exit(1); x=0; break; case 0: // timeout on select(). Data did not arrived in time, only valid if the last attribute of select() was specified break; default: // data is ready to be read bzero(slice, BUFLEN); if (FD_ISSET(fileno(retclam), &myset)) { // data ready on the child //set_block(fileno(retclam)); read_data = fgets(slice, BUFLEN, retclam); // read a line from the child (max BUFLEN bytes) //set_nonblock(fileno(retclam)); if (read_data == NULL) { //exit(0); x=0; break; } // write data back to stdout write (1, slice, strlen(slice)); if(feof(retclam)) { //exit(0); x=0; break; } break; } bzero(slice, BUFLEN); if (FD_ISSET(fileno(input), &myset)) { // data ready on stdin //printf("father\n"); //set_block(fileno(input)); read_data = fgets(slice, BUFLEN, input); // read a line from stdin (max BUFLEN bytes) //set_nonblock(fileno(input)); if (read_data == NULL) { //exit (0); close(father2child_pipefd[1]); waitpid(pid, &status, 0); //fclose(input); break; } // write data to the child write (father2child_pipefd[1], slice, strlen(slice)); /* if(feof(input)) { exit(0); }*/ break; } } } close(father2child_pipefd[1]); fclose(input); fsync(1); waitpid(pid, &status, 0); // child process terminated fclose (retclam); // Parse output data from child // write (1, "you can append somethind else on stdout if you like"); if (WEXITSTATUS(status) == 0) { exit (0); // child process exited successfully } } if (pid == 0) { // Child close (0); // stdin is not needed close (1); // stdout is not needed // Close the write side of this pipe close(father2child_pipefd[1]); // Close the read side of this pipe close(child2father_pipefd[0]); // Let's read on stdin, but this stdin is associated to the read pipe dup2(father2child_pipefd[0], 0); // Let's speak on stdout, but this stdout is associated to the write pipe dup2(child2father_pipefd[1], 1); // if you like you can put something back to the father before execve //write (child2father_pipefd[1], "something", 9); //fsync(child2father_pipefd[1]); err = execve(EXECFILE, arguments, NULL); // we'll never be here again after execve succeeded!! So we get here only if the execve() failed //fprintf(stderr, "Problem executing file %s: %i: %s\n", EXECFILE, err, strerror(errno)); exit (1); } if (pid < 0) { // Error exit (1); } fclose(input); return 0; } 

我在bash中使用2-way io:

 mkfifo hotleg mkfifo coldleg program <coldleg |tee hotleg & while read LINE; do case $LINE in *)call_a_function $LINE;; esac done <hotleg |tee coldleg & 

(请注意,您可以“>”而不是三通,但您可能首先看到输出)

你猜的缓冲I / O是非常可能是正确的。 你写循环的方式,读取将会阻塞,直到它填充所需的缓冲区,并且直到它返回,你将不能处理任何输入。 这很容易造成死锁。

Popen.communicate通过使一个线程与每个管道工作,并确保它具有写入到标准输入的所有数据,以便当文件对象等待缓冲区填充时的实际写入不能被延迟为了文件对象被刷新/关闭。 我认为如果需要的话,你可以制定一个涉及线程工作的解决方案,但这并不是非同步的,可能也不是最简单的解决方案。

你可以通过不使用Popen提供的文件对象来访问管道,而是使用fileno()方法来获取它们的缓冲区。 然后你可以用os.read,os.write和select.select来使用fd。 os.read和os.write函数不会执行缓冲,但会阻塞,直到至少有一个字节可以被读写。 在调用之前,您需要确保管道是可读/可写的。 最简单的方法是使用select.select()等待所有要读/写的管道,并在select()返回时对每个已准备好的管道执行一次读或写调用。 你应该能够找到选择循环的例子,如果你搜索(他们可能会使用套接字而不是管道,但原则是相同的)。 (另外,不要在没有先检查它不会阻塞的情况下进行读或写操作,否则最终会导致子进程造成死锁的情况。即使没有,也必须准备好读取数据还写了你想要的一切。)

如果你需要控制一个Python解释器会话,你可能会更好

  • 将Python嵌入到你的程序中 (如果是Python本身就是简单的eval ),或者
  • 像PyScripter一样使用rpyc等RPC工具。

顺便说一句,在后一种情况下,服务器可以在任何地方运行,PyScripter已经有一个工作服务器模块(客户端模块在帕斯卡尔,将需要翻译)。