Paramiko挂起时执行一个大的wget命令

您好,我有问题执行一个命令,通过Ubuntu 10服务器执行一个100MB文件的wget。 更短的命令工作正常,除了这个。 下面的类包含了我如何使用paramiko和克服这个问题的不同尝试(请参阅不同的run或exec方法)。 在exec_cmd的情况下执行挂在这一行上:

out = self.in_buffer.read(nbytes, self.timeout) 

从paramiko的channel.py模块的recv方法。

同样的wget命令在使用Mac的普通ssh实用程序的shell中完美工作。

 """ Management of SSH connections """ import logging import os import paramiko import socket import time import StringIO class SSHClient(): def __init__(self): self._ssh_client = paramiko.SSHClient() self._ssh_client.load_system_host_keys() self._ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.time_out = 300 self.wait = 5 def connect(self, hostname, user, pkey): retry = self.time_out self.hostname = hostname logging.info("connecting to:%s user:%s key:%s" % (hostname, user, pkey)) while retry > 0: try: self._ssh_client.connect(hostname, username=user, key_filename=os.path.expanduser(pkey), timeout=self.time_out) return except socket.error, (value,message): if value == 61 or value == 111: logging.warning('SSH Connection refused, will retry in 5 seconds') time.sleep(self.wait) retry -= self.wait else: raise except paramiko.BadHostKeyException: logging.warning("%s has an entry in ~/.ssh/known_hosts and it doesn't match" % self.server.hostname) logging.warning('Edit that file to remove the entry and then try again') retry = 0 except EOFError: logging.warning('Unexpected Error from SSH Connection, retry in 5 seconds') time.sleep(self.wait) retry -= self.wait logging.error('Could not establish SSH connection') def exists(self, path): status = self.run('[ -a %s ] || echo "FALSE"' % path) if status[1].startswith('FALSE'): return 0 return 1 def shell(self): """ Start an interactive shell session on the remote host. """ channel = self._ssh_client.invoke_shell() interactive_shell(channel) def run(self, command): """ Execute a command on the remote host. Return a tuple containing an integer status and a string containing all output from the command. """ logging.info('running:%s on %s' % (command, self.hostname)) log_fp = StringIO.StringIO() status = 0 try: t = self._ssh_client.exec_command(command) except paramiko.SSHException: logging.error("Error executing command: " + command) status = 1 log_fp.write(t[1].read()) log_fp.write(t[2].read()) t[0].close() t[1].close() t[2].close() logging.info('output: %s' % log_fp.getvalue()) return (status, log_fp.getvalue()) def run_pty(self, command): """ Execute a command on the remote host with a pseudo-terminal. Returns a string containing the output of the command. """ logging.info('running:%s on %s' % (command, self.hostname)) channel = self._ssh_client.get_transport().open_session() channel.get_pty() status = 0 try: channel.exec_command(command) except: logging.error("Error executing command: " + command) status = 1 return status, channel.recv(1024) def close(self): transport = self._ssh_client.get_transport() transport.close() def run_remote(self, cmd, check_exit_status=True, verbose=True, use_sudo=False): logging.info('running:%s on %s' % (cmd, self.hostname)) ssh = self._ssh_client chan = ssh.get_transport().open_session() stdin = chan.makefile('wb') stdout = chan.makefile('rb') stderr = chan.makefile_stderr('rb') processed_cmd = cmd if use_sudo: processed_cmd = 'sudo -S bash -c "%s"' % cmd.replace('"', '\\"') chan.exec_command(processed_cmd) result = { 'stdout': [], 'stderr': [], } exit_status = chan.recv_exit_status() result['exit_status'] = exit_status def print_output(): for line in stdout: result['stdout'].append(line) logging.info(line) for line in stderr: result['stderr'].append(line) logging.info(line) if verbose: print processed_cmd print_output() return exit_status,result def exec_cmd(self, cmd): import select ssh = self._ssh_client channel = ssh.get_transport().open_session() END = "CMD_EPILOGqwkjidksjk58754dskhjdksjKDSL" cmd += ";echo " + END logging.info('running:%s on %s' % (cmd, self.hostname)) channel.exec_command(cmd) out = "" buf = "" while END not in buf: rl, wl, xl = select.select([channel],[],[],0.0) if len(rl) > 0: # Must be stdout buf = channel.recv(1024) logging.info(buf) out += buf return 0, out 

  1. 在这种情况下,我会去列表附加,然后串联。 为什么? 那么,Python中的字符串是不可变的。 这意味着每当你使用+=你基本上都会创建两个新字符串并读取第三个字符串。 另一方面,如果您创建一个列表并追加它,则会将创建的字符串数量减半。
  2. 你真的需要打电话多次选择? 我的理解是,如果这个过程是线程阻塞,你并不在乎。 由于select或多或少是同名的C方法的包装器:

select()和pselect()允许一个程序监视多个文件描述符,等待一个或多个文件描述符为某类I / O操作(例如可能的输入)“准备就绪”。 如果可以在不阻塞的情况下执行相应的I / O操作(例如,读取(2)),则认为文件描述符已准备就绪。

  • 您没有在您的代码中侦听一个socket.timeout异常。
  • 写入标准输出/文件系统可能是昂贵的,但是你正在记录recv返回的每一行。 你能移动日志行吗?
  • 你有没有考虑过手动阅读频道? 您技术上唯一需要的代码是
    try:
    out = self.in_buffer.read(nbytes, self.timeout)
    except PipeTimeout, e:
    # do something with error
    这是不能保证的,但会削减额外的处理。
  • 我有同样的问题,当我在远程SSH客户端上运行的shell脚本时,我的python脚本挂起,在400Mb文件上做了一个wget命令。

    我发现给wget命令添加超时修复了这个问题。 本来我有:

    wget http:// blah:8888 / file.zip

    现在用这个:

    wget -q -T90 http:// blah:8888 / file.zip

    它就像一个魅力!

    希望能帮助到你。