有没有一种方法在Python中不过滤转义序列从一个subprocess

这里是情况,我正在包装一个子窗口,我正在远程沟通。

connection_process = subprocess.Popen(r"C:\Windows\System32\cmd.exe", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) 

这是运行多个线程轮询的标准输出,stderr和写入标准input。 远程端从Linux运行,将发送包括ansi转义序列在内的所有数据。 有关这些转义序列的更多信息http://en.wikipedia.org/wiki/ANSI_escape_sequences ..问题有两个。 我首先需要一种方法来将这些转义序列翻译成任何窗口使用之前,他们被发送的subprocess,反之亦然。 有没有在Windows中做这个好方法? 我一直在看colorama,但有没有一个很好的例子,如何做到这一点? 其次,我需要以一种方式打开我的subprocess,在完整的命令提示符下给我提供一切,包括每个命令行开头的傻C:>。 由于某种原因,python的popen不会把这个给你。 它过滤出来,这是人们想要的,99%的时间…什么方式告诉popen不要过滤它? 这里的参考是我的沟通代码和如何使用它的一个例子。 input完成后,显示C:>提示符。 为了重申我的问题,我怎样才能让subprocess像命令窗口那样无懈可击?

 ''' Created on Mar 2, 2013 @author: rweber ''' import subprocess import Queue from Queue import Empty import threading class Process_Communicator(): def join(self): self.te.join() self.to.join() self.running = False self.aggregator.join() self.ti.join() def enqueue_in(self): while self.running and self.p.stdin is not None: while not self.stdin_queue.empty(): s = self.stdin_queue.get() self.p.stdin.write(str(s) + '\n\r') pass def enqueue_output(self): if not self.p.stdout or self.p.stdout.closed: return out = self.p.stdout for line in iter(out.readline, b''): self.qo.put(line) def enqueue_err(self): if not self.p.stderr or self.p.stderr.closed: return err = self.p.stderr for line in iter(err.readline, b''): self.qe.put(line) def aggregate(self): while (self.running): self.update() self.update() def update(self): line = "" try: while self.qe.not_empty: line = self.qe.get_nowait() # or q.get(timeout=.1) self.unbblocked_err += line except Empty: pass line = "" try: while self.qo.not_empty: line = self.qo.get_nowait() # or q.get(timeout=.1) self.unbblocked_out += line except Empty: pass while not self.stdin_queue.empty(): s = self.stdin_queue.get() self.p.stdin.write(str(s) + '\n\r') def get_stdout(self, clear=True): ret = self.unbblocked_out if clear: self.unbblocked_out = "" return ret def has_stdout(self): ret = self.get_stdout(False) if ret == '': return None else: return ret def get_stderr(self, clear=True): ret = self.unbblocked_out if clear: self.unbblocked_out = "" return ret def has_stderr(self): ret = self.get_stdout(False) if ret == '': return None else: return ret def __init__(self, subp): '''This is a simple class that collects and aggregates the output from a subprocess so that you can more reliably use the class without having to block for subprocess.communicate.''' self.p = subp self.unbblocked_out = "" self.unbblocked_err = "" self.running = True self.qo = Queue.Queue() self.to = threading.Thread(name="out_read", target=self.enqueue_output, args=()) self.to.daemon = True # thread dies with the program self.to.start() self.qe = Queue.Queue() self.te = threading.Thread(name="err_read", target=self.enqueue_err, args=()) self.te.daemon = True # thread dies with the program self.te.start() self.stdin_queue = Queue.Queue() self.aggregator = threading.Thread(name="aggregate", target=self.aggregate, args=()) self.aggregator.daemon = True # thread dies with the program self.aggregator.start() pass def write_stdin(p,c): while p.poll() == None: i = raw_input("send to process:") if i is not None: c.stdin_queue.put(i) p = subprocess.Popen("cmd.exe", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) c = Process_Communicator(p) stdin = threading.Thread(name="write_stdin", target=write_stdin, args=(p,c)) stdin.daemon = True # thread dies with the program stdin.start() while p.poll() == None: if c.has_stdout(): print c.get_stdout() if c.has_stderr(): print c.get_stderr() c.join() print "Exit" 

更好的..这里是我到目前为止,我正在工作的SSH服务器的代码..

 #!/usr/bin/env python # Copyright (C) 2003-2007 Robey Pointer # # This file is part of paramiko. # # Paramiko is free software; you can redistribute it and/or modify it under the # terms of the GNU Lesser General Public License as published by the Free # Software Foundation; either version 2.1 of the License, or (at your option) # any later version. # # Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. import base64 from binascii import hexlify import os import socket import sys import threading import traceback import win32com import win32net import win32process import win32security import win32api import paramiko import ntsecuritycon import Queue import subprocess try: import colorama colorama.init() except: try: import tendo.ansiterm except: pass #@todo max number of connections #this is a map of user's and windows handles for global user_handles user_handles = {} def main(): # setup logging paramiko.util.log_to_file('demo_server.log') # @todo: make the server so that it will automatically generate it's own keys ''' if not os.path.isfile(os.path.join(os.getcwd() , 'test_rsa.key')): from Crypto import Random random_generator = Random.new().read key = paramiko.rsakey.RSA.generate(2048, random_generator) ''' #@todo: read key from a file first.. if file doesn't exist, make a new key and save it to the file. host_key = paramiko.RSAKey(filename='test_rsa', password="password") #host_key = paramiko.DSSKey(filename='test_dss.key') print 'Read key: ' + hexlify(host_key.get_fingerprint()) run_server(host_key) class Server (paramiko.ServerInterface): # 'data' is the output of base64.encodestring(str(key)) # (using the "user_rsa_key" files) data = 'AAAAB3NzaC1yc2EAAAABIwAAAIEAyO4it3fHlmGZWJaGrfeHOVY7RWO3P9M7hp' + \ 'fAu7jJ2d7eothvfeuoRFtJwhUmZDluRdFyhFY/hFAh76PJKGAusIqIQKlkJxMC' + \ 'KDqIexkgHAfID/6mqvmnSJf0b5W8v5h2pI/stOSwTQ+pxVhwJ9ctYDhRSlF0iT' + \ 'UWT10hcuO4Ks8=' good_pub_key = paramiko.RSAKey(data=base64.decodestring(data)) def __init__(self): self.event = threading.Event() '''self.AdjustPrivilege( ntsecuritycon.SE_CHANGE_NOTIFY_NAME ) self.AdjustPrivilege( ntsecuritycon.SE_ASSIGNPRIMARYTOKEN_NAME ) self.AdjustPrivilege( ntsecuritycon.SE_TCB_NAME ) ntsecuritycon.se def adjust_windows_privilages( self, priv): flags = ntsecuritycon.TOKEN_ADJUST_PRIVILEGES | ntsecuritycon.TOKEN_QUERY htoken = win32security.OpenProcessToken(win32api.GetCurrentProcess(), flags) id = win32security.LookupPrivilegeValue(None, priv) newPrivileges = [(id, ntsecuritycon.SE_PRIVILEGE_ENABLED)] win32security.AdjustTokenPrivileges(htoken, 0, newPrivileges)''' def check_channel_request(self, kind, chanid): if kind == 'session': return paramiko.OPEN_SUCCEEDED return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED def check_auth_password(self, username, password): import re authorized = False try: domain = win32api.GetDomainName() #@todo except: import traceback traceback.print_exc() print domain if not authorized: try: hUser = win32security.LogonUser ( username, domain, password, win32security.LOGON32_LOGON_INTERACTIVE , win32security.LOGON32_PROVIDER_DEFAULT) print hUser except win32security.error: print "Failed" import traceback traceback.print_exc() authorized = False except: import traceback traceback.print_exc() authorized = False else: print "Succeeded" authorized = True if not authorized: try: hUser = win32security.LogonUser ( username, domain, password, win32security.LOGON32_LOGON_INTERACTIVE , win32security.LOGON32_PROVIDER_DEFAULT) print hUser except win32security.error: print "Failed" import traceback traceback.print_exc() authorized = False except: import traceback traceback.print_exc() authorized = False else: print "Succeeded" authorized = True if authorized and hUser is not None: user_handles[username] = hUser return paramiko.AUTH_SUCCESSFUL else: return paramiko.AUTH_FAILED def check_auth_publickey(self, username, key): print 'Auth attempt with key: ' + hexlify(key.get_fingerprint()) #todo check the list of users to make sure that there is a user with that credential on the domian ''' try: local_users = win32net.NetGroupGetUsers( "localhost", 'none', 0) domain_controler = win32net.NetGetDCName() domain_user = win32net.NetUseGetInfo(domain_controler, username, 1) users = [] users.extend(local_users) users.append(domain_user) except: import traceback traceback.print_exc() for windows_user in users: windows_user_name = str(windows_user['name']).lower() print windows_user_name if (windows_user_name == str(username).lower()) and (key == self.good_pub_key): print "Key was accepted" return paramiko.AUTH_SUCCESSFUL else: print windows_user_name + " != " + str(username).lower() return paramiko.AUTH_FAILED''' if (key == self.good_pub_key): return paramiko.AUTH_SUCCESSFUL else: return paramiko.AUTH_FAILED def get_allowed_auths(self, username): return 'password,publickey' def check_channel_shell_request(self, channel): self.event.set() return True def check_channel_pty_request(self, channel, term, width, height, pixelwidth, pixelheight, modes): return True def run_server(host_key): # now connect while True: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #address = socket.get sock.bind(('', 22)) except Exception, e: print '*** Bind failed: ' + str(e) traceback.print_exc() sys.exit(1) try: #@todo max number of connections sock.listen(100) print 'Listening for connection ...' # always be listening for connections connections = [] while True: client, addr = sock.accept() #@todo add this to a thread that will handle each connection. new_connection = threading.Thread(name="connection_%s" % str(len(connections)), target=handle_connection, args=(client,addr,host_key)) new_connection.setDaemon(True) # thread dies with the program new_connection.start() connections.append(new_connection) print 'Got a connection!' except Exception, e: print '*** Listen/accept failed: ' + str(e) traceback.print_exc() sys.exit(1) print 'Got a connection!' def handle_connection(client, addr, host_key): try: t = paramiko.Transport(client) try: t.load_server_moduli() username = t.get_username() except: print '(Failed to load moduli -- gex will be unsupported.)' raise t.add_server_key(host_key) server = Server() try: t.start_server(server=server) except paramiko.SSHException, x: print '*** SSH negotiation failed.' sys.exit(1) # wait for auth chan = t.accept() if chan is None: print '*** No channel.' sys.exit(1) print 'Authenticated!' #@todo: spawn a process and while the process is running communicate with it #open up a channel for stdin f = chan.makefile('rU') '''import pywintypes import win32pipe import pywintypes import win32process import win32con import win32api import win32file import string import types security_attributes = pywintypes.SECURITY_ATTRIBUTES() security_attributes.bInheritHandle = 1 Pyh_stdout_read_end, Pyh_stdout_write_end = win32pipe.CreatePipe(security_attributes, 0) Pyh_stderr_read_end, Pyh_stderr_write_end = win32pipe.CreatePipe(security_attributes, 0) Pyh_stdin_read_end, Pyh_stdin_write_end = win32pipe.CreatePipe(security_attributes, 0)''' connection_process = subprocess.Popen(r"C:\Windows\System32\cmd.exe", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) '''creationFlags = win32process.CREATE_NO_WINDOW startupInfo = win32process.STARTUPINFO() startupInfo.dwFlags = win32process.STARTF_USESTDHANDLES startupInfo.hStdInput = Pyh_stdout_read_end startupInfo.hStdOutput = Pyh_stdout_write_end startupInfo.hStdError = Pyh_stderr_write_end''' #PyHANDLE, PyHANDLE, int, int = CreateProcessAsUser(hToken, appName , #commandLine , processAttributes , threadAttributes , bInheritHandles , dwCreationFlags , newEnvironment , currentDirectory , startupinfo ) #@todo: use the correct user's handle in creating the process so that it spawns as that user. #hProcess, hThread, dwProcessId, dwThreadId = win32process.CreateProcessAsUser(user_handles[username], None, r"C:\Windows\System32\cmd.exe", None, None, 1, 0, None, #hProcess, hThread, dwProcessId, dwThreadId = win32process.CreateProcess(None, r"C:\Windows\System32\cmd.exe", None, None, 1, 0, None, # None, startupInfo) #stdout = open(Pyh_stdout_read_end) #temp = stdout.readline() #print temp communicator = Process_Communicator(connection_process) #communicator = win32_Process_Communicator(Pyh_stdout_read_end,Pyh_stderr_read_end, Pyh_stdin_write_end) communicator.stdin_queue.put("\r") #username = f.readline().strip('\r\n') while not connection_process.poll(): server.event.wait(10) if not server.event.isSet(): print '*** Client never asked for a shell.' sys.exit(1) if communicator.has_stdout() and chan.send_ready(): chan.send(communicator.get_stdout()) if communicator.has_stderr() and chan.send_ready(): #chan.send_stderr(communicator.get_stderr()) chan.send(communicator.get_stderr()) stdin_string = '' while f.channel.recv_ready(): stdin_string += f.read(1) #str = f.readline() translator = colorama.ansitowin32.AnsiToWin32() translator.write_and_convert(stdin_string) communicator.stdin_queue.put(stdin_string) chan.close() except Exception, e: print '*** Caught exception: ' + str(e.__class__) + ': ' + str(e) traceback.print_exc() try: t.close() except: pass sys.exit(1) class Process_Communicator(): def join(self): self.te.join() self.to.join() self.running = False self.aggregator.join() self.ti.join() def enqueue_in(self): while self.running and self.p.stdin is not None: while not self.stdin_queue.empty(): s = self.stdin_queue.get() self.p.stdin.write(str(s) + '\n\r') pass def enqueue_output(self): if not self.p.stdout or self.p.stdout.closed: return out = self.p.stdout for line in iter(out.readline, b''): self.qo.put(line) def enqueue_err(self): if not self.p.stderr or self.p.stderr.closed: return err = self.p.stderr for line in iter(err.readline, b''): self.qe.put(line) def aggregate(self): while (self.running): self.update() self.update() def update(self): line = "" try: while self.qe.not_empty: line = self.qe.get_nowait() # or q.get(timeout=.1) self.unbblocked_err += line except Queue.Empty: pass line = "" try: while self.qo.not_empty: line = self.qo.get_nowait() # or q.get(timeout=.1) self.unbblocked_out += line except Queue.Empty: pass while not self.stdin_queue.empty(): s = self.stdin_queue.get() self.p.stdin.write(str(s)) def get_stdout(self, clear=True): ret = self.unbblocked_out if clear: self.unbblocked_out = "" return ret def has_stdout(self): ret = self.get_stdout(False) if ret == '': return None else: return ret def get_stderr(self, clear=True): ret = self.unbblocked_out if clear: self.unbblocked_out = "" return ret def has_stderr(self): ret = self.get_stdout(False) if ret == '': return None else: return ret def __init__(self, subp): '''This is a simple class that collects and aggregates the output from a subprocess so that you can more reliably use the class without having to block for subprocess.communicate.''' self.p = subp self.unbblocked_out = "" self.unbblocked_err = "" self.running = True self.qo = Queue.Queue() self.to = threading.Thread(name="out_read", target=self.enqueue_output, args=()) self.to.daemon = True # thread dies with the program self.to.start() self.qe = Queue.Queue() self.te = threading.Thread(name="err_read", target=self.enqueue_err, args=()) self.te.daemon = True # thread dies with the program self.te.start() self.stdin_queue = Queue.Queue() self.aggregator = threading.Thread(name="aggregate", target=self.aggregate, args=()) self.aggregator.daemon = True # thread dies with the program self.aggregator.start() pass class win32_Process_Communicator(): def __init__(self,read_end_of_stdout, read_end_of_stderr, write_end_of_stdin): pass main()