Threading的Timer是否可以被wait和communicate阻塞

13k 词

作者:糖果

测试的内容是将Timer的使用和Popen的使用混合在一起,测试当wait()和communicate()被调用时,是否会阻塞主进程的Timer。

找到Timer在Python2.7里的源码位置:

# The timer class was contributed by Itamar Shtull-Trauring

def Timer(*args, **kwargs):
    """Factory function to create a Timer object.

    Timers call a function after a specified number of seconds:

        t = Timer(30.0, f, args=[], kwargs={})
        t.start()
        t.cancel()     # stop the timer's action if it's still waiting

    """
    return _Timer(*args, **kwargs)

class _Timer(Thread):
    """Call a function after a specified number of seconds:

            t = Timer(30.0, f, args=[], kwargs={})
            t.start()
            t.cancel()     # stop the timer's action if it's still waiting

    """

    return _Timer(*args, **kwargs)

class _Timer(Thread):
    """Call a function after a specified number of seconds:

            t = Timer(30.0, f, args=[], kwargs={})
            t.start()
            t.cancel()     # stop the timer's action if it's still waiting

    """

    def __init__(self, interval, function, args=[], kwargs={}):
        Thread.__init__(self)
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.finished = Event()

    def cancel(self):
        """Stop the timer if it hasn't finished yet"""
        self.finished.set()

    def run(self):
        self.finished.wait(self.interval)
        if not self.finished.is_set():
            self.function(*self.args, **self.kwargs)
        self.finished.set()

Timer重要作用还是“倒计时”执行某个函数,最多支持4个入参。

看一下communicate的源码,多少能看出他和wait()的关系。


def communicate(self, input=None):
    """Interact with process: Send data to stdin.  Read data from
    stdout and stderr, until end-of-file is reached.  Wait for
    process to terminate.  The optional input argument should be a
    string to be sent to the child process, or None, if no data
    should be sent to the child.

    communicate() returns a tuple (stdout, stderr)."""

    # Optimization: If we are only using one pipe, or no pipe at
    # all, using select() or threads is unnecessary.
    if [self.stdin, self.stdout, self.stderr].count(None) >= 2:
        stdout = None
        stderr = None
        if self.stdin:
            if input:
                try:
                    self.stdin.write(input)
                except IOError as e:
                    if e.errno != errno.EPIPE and e.errno != errno.EINVAL:
                        raise
            self.stdin.close()
        elif self.stdout:
            stdout = _eintr_retry_call(self.stdout.read)
            self.stdout.close()
        elif self.stderr:
            stderr = _eintr_retry_call(self.stderr.read)
            self.stderr.close()
        self.wait()
        return (stdout, stderr)

    return self._communicate(input)
def _communicate(self, input):
    stdout = None # Return
    stderr = None # Return

    if self.stdout:
        stdout = []
        stdout_thread = threading.Thread(target=self._readerthread,
                                         args=(self.stdout, stdout))
        stdout_thread.setDaemon(True)
        stdout_thread.start()
    if self.stderr:
        stderr = []
        stderr_thread = threading.Thread(target=self._readerthread,
                                         args=(self.stderr, stderr))
        stderr_thread.setDaemon(True)
        stderr_thread.start()
    if self.stdin:
        if input is not None:
            try:
                self.stdin.write(input)
            except IOError as e:
                if e.errno != errno.EPIPE:
                    raise
        self.stdin.close()

    if self.stdout:
        stdout_thread.join()
    if self.stderr:
        stderr_thread.join()

    # All data exchanged.  Translate lists into strings.
    if stdout is not None:
        stdout = stdout[0]
    if stderr is not None:
        stderr = stderr[0]

    # Translate newlines, if requested.  We cannot let the file
    # object do the translation: It is based on stdio, which is
    # impossible to combine with select (unless forcing no
    # buffering).
    if self.universal_newlines and hasattr(file, 'newlines'):
        if stdout:
            stdout = self._translate_newlines(stdout)
        if stderr:
            stderr = self._translate_newlines(stderr)

    self.wait()
    return (stdout, stderr)

区别就是前面对stdout和stdout处理,最后调用了wait()。

下面这个程序,是一个用Timer循环调用自己的程序,并且调用函数时,用args函数给被调用函数传参。

def CandyLab(a,b):
        print(a,b)
        print "CandyLab"
        return

def Orchina():
        print "Orchina"
        t = Timer( 1, Orchina);
        t.start()
        return

t = Timer( 1, CandyLab, args=("valueA","valueB"));
t.start()

Timer在定义调用函数后,当执行t.start()时, timer进入倒时间状态,这个时间是在形参中指定的,下面这段代码中Orchina这个函数是在starty调用后的10秒时间后,被调用,而t.start()调用后,不会阻塞计数,还会继续调用后面的函数。

def Orchina():
        print "Orchina"
        return

t = Timer( 10, Orchina);
t.start()

只有当主要的运行t.cancel函数时,计时才会被取消,定时期停止,CandyLab函数也不会被执行。

进一步的简单的实例代码:

import time
from threading import Thread, Timer

def CandyLab():
        print "CandyLab"
        return

def Orchina():
        print "Orchina"
        return

t = Timer( 10, CandyLab);
t.start()
Orchina()
time.sleep(3)
t.cancel()

运行结果:

Orchina

不会立刻执行CanyLab(),正常执行CandyLab()这个函数需要10秒的时间,从t.start()执行时开始记时,程序不柱塞等10秒执行CanyLab(),是按顺序立刻的执行t.start()后面的语句,Orchina()显示输出"Orchina", 等3秒结束10秒记时,t.cancel()执行后,CanyLab()这个函数,之后就不会被执行了。

如果没有sleep3秒后的t.cancel操作,在10秒钟记时完毕时,CandyLab()这个函数会被执行。

import time
from threading import Thread, Timer

def CandyLab():
        print "CandyLab"
        return

def Orchina():
        print "Orchina"
        return

t = Timer( 10, CandyLab);
t.start()
Orchina()

结果:

Orchina
CandyLab

还有一种情况就是循环的调用自己,当满足某个条件时,再进行t.cancel的取消动作。


def CandyLab():
        print "CandyLab"
        t = Timer( 1, CandyLab);
        t.start()
        return

CandyLab()

结果:
就是循环输出

CandyLab
CandyLab
CandyLab
...

下面是Timer调用函数进传参,用到了args函数:

def CandyLab(a,b):
        print(a,b)
        print "CandyLab"
        return

def Orchina():
        print "Orchina"
        t = Timer( 1, Orchina);
        t.start()
        return

t = Timer( 1, CandyLab, args=("valueA","valueB"));
t.start()

输出结果是:

('valueA', 'valueB')
CandyLab

下面我们将Timer和SubProcess混合使用:
Timer不在subprocess.py里,是在threading.py中,wait()在阻塞主进程时,会不会阻塞Timer.

先看一下 os.system和os.popen的区别,os.system是直接执行相应的系统命令,把输出结果返回到标准输出上,返回值就是一个表示调用状态的整数,而os.popen调用是返回的是一个对象,对象持出管道中输出的内容,而不是直接输出到stdout上。

import os
ret = os.popen('ls')
print ret.read()

ret = os.system('ls')
print ret 

看一下os.py中popen2的源码:

def popen2(cmd, mode="t", bufsize=-1):
    """Execute the shell command 'cmd' in a sub-process.  On UNIX, 'cmd'
    may be a sequence, in which case arguments will be passed directly to
    the program without shell intervention (as with os.spawnv()).  If 'cmd'
    is a string it will be passed to the shell (as with os.system()). If
    'bufsize' is specified, it sets the buffer size for the I/O pipes.  The
    file objects (child_stdin, child_stdout) are returned."""
    import warnings
    msg = "os.popen2 is deprecated.  Use the subprocess module."
    warnings.warn(msg, DeprecationWarning, stacklevel=2)

    import subprocess
    PIPE = subprocess.PIPE
    p = subprocess.Popen(cmd, shell=isinstance(cmd, basestring),
                         bufsize=bufsize, stdin=PIPE, stdout=PIPE,
                         close_fds=True)
    return p.stdin, p.stdout
__all__.append("popen2")

通过源码可以看出来, 其内部调用的还是subprocess.Popen,既然是这样,在某些
场合其实不用再去调用subprocess,直接调用os.popen也是可以的方案之一。

接下来,我们就看subproces中的Popen, subprocess中,已经对Popen这个类有了很
多的注释,下面是原文:

class Popen(args, bufsize=0, executable=None,
stdin=None, stdout=None, stderr=None,
preexec_fn=None, close_fds=False, shell=False,
cwd=None, env=None, universal_newlines=False,
startupinfo=None, creationflags=0):

Arguments are:

args should be a string, or a sequence of program arguments. The
program to execute is normally the first item in the args sequence or
string, but can be explicitly set by using the executable argument.

On UNIX, with shell=False (default): In this case, the Popen class
uses os.execvp() to execute the child program. args should normally
be a sequence. A string will be treated as a sequence with the string
as the only item (the program to execute).

On UNIX, with shell=True: If args is a string, it specifies the
command string to execute through the shell. If args is a sequence,
the first item specifies the command string, and any additional items
will be treated as additional shell arguments.

从调用时序上来看,在声明Popen对象时,在__init__函数就调用了_execute_child
函数, 而_execute_child函数,靠的就是用 os.execvp() 来调用子程序。

我们节选一下Popen的代码:

class Popen(object):
    _child_created = False  # Set here since __del__ checks it

    def __init__(self, args, bufsize=0, executable=None,
                 stdin=None, stdout=None, stderr=None,
                 preexec_fn=None, close_fds=False, shell=False,
                 cwd=None, env=None, universal_newlines=False,
                 startupinfo=None, creationflags=0):
        """Create new Popen instance."""
        _cleanup()

        if not isinstance(bufsize, (int, long)):
            raise TypeError("bufsize must be an integer")

        if mswindows:
            if preexec_fn is not None:
                raise ValueError("preexec_fn is not supported on Windows "
                                 "platforms")
            if close_fds and (stdin is not None or stdout is not None or
                              stderr is not None):
                raise ValueError("close_fds is not supported on Windows "
                                 "platforms if you redirect stdin/stdout/stderr")
        else:
            # POSIX
            if startupinfo is not None:
                raise ValueError("startupinfo is only supported on Windows "
                                 "platforms")
            if creationflags != 0:
                raise ValueError("creationflags is only supported on Windows "
                                 "platforms")
        self.stdin = None
        self.stdout = None
        self.stderr = None
        self.pid = None
        self.returncode = None
        self.universal_newlines = universal_newlines


        (p2cread, p2cwrite,
         c2pread, c2pwrite,
         errread, errwrite), to_close = self._get_handles(stdin, stdout, stderr)

        try:
            self._execute_child(args, executable, preexec_fn, close_fds,
                                cwd, env, universal_newlines,
                                startupinfo, creationflags, shell, to_close,
                                p2cread, p2cwrite,
                                c2pread, c2pwrite,
                                errread, errwrite)
        except Exception:
            # Preserve original exception in case os.close raises.
            exc_type, exc_value, exc_trace = sys.exc_info()

            for fd in to_close:
                try:
                    if mswindows:
                        fd.Close()
                    else:
                        os.close(fd)
                except EnvironmentError:
                    pass

            raise exc_type, exc_value, exc_trace

        if mswindows:
            if p2cwrite is not None:
                p2cwrite = msvcrt.open_osfhandle(p2cwrite.Detach(), 0)
            if c2pread is not None:
                c2pread = msvcrt.open_osfhandle(c2pread.Detach(), 0)
            if errread is not None:
                errread = msvcrt.open_osfhandle(errread.Detach(), 0)

        if p2cwrite is not None:
            self.stdin = os.fdopen(p2cwrite, 'wb', bufsize)
        if c2pread is not None:
            if universal_newlines:
                self.stdout = os.fdopen(c2pread, 'rU', bufsize)
            else:
                self.stdout = os.fdopen(c2pread, 'rb', bufsize)
        if errread is not None:
            if universal_newlines:
                self.stderr = os.fdopen(errread, 'rU', bufsize)
            else:
                self.stderr = os.fdopen(errread, 'rb', bufsize)

然后是在_execute_child函数中调用os.execvp()

def _execute_child(self, args, executable, preexec_fn, close_fds,
                   cwd, env, universal_newlines,
                   startupinfo, creationflags, shell, to_close,
                   p2cread, p2cwrite,
                   c2pread, c2pwrite,
                   errread, errwrite):
    """Execute program (POSIX version)"""

    if isinstance(args, types.StringTypes):
        args = [args]
    else:
        args = list(args)

    if shell:
        args = ["/bin/sh", "-c"] + args
        if executable:
            args[0] = executable

    if executable is None:
        executable = args[0]

    def _close_in_parent(fd):
        os.close(fd)
        to_close.remove(fd)

    # For transferring possible exec failure from child to parent
    # The first char specifies the exception type: 0 means
    # OSError, 1 means some other error.
    errpipe_read, errpipe_write = self.pipe_cloexec()

    try:
        try:
            gc_was_enabled = gc.isenabled()
            # Disable gc to avoid bug where gc -> file_dealloc ->
            # write to stderr -> hang.  http://bugs.python.org/issue1336
            gc.disable()
            try:
                self.pid = os.fork()
            except:
                if gc_was_enabled:
                    gc.enable()
                raise
            self._child_created = True
            if self.pid == 0:
                # Child
                try:
                    # Close parent's pipe ends
                    if p2cwrite is not None:
                        os.close(p2cwrite)
                    if c2pread is not None:
                        os.close(c2pread)
                    if errread is not None:
                        os.close(errread)
                    os.close(errpipe_read)

                    # When duping fds, if there arises a situation
                    # where one of the fds is either 0, 1 or 2, it
                    # is possible that it is overwritten (#12607).
                    if c2pwrite == 0:
                        c2pwrite = os.dup(c2pwrite)
                    if errwrite == 0 or errwrite == 1:
                        errwrite = os.dup(errwrite)

                    # Dup fds for child
                    def _dup2(a, b):
                        # dup2() removes the CLOEXEC flag but
                        # we must do it ourselves if dup2()
                        # would be a no-op (issue #10806).
                        if a == b:
                            self._set_cloexec_flag(a, False)
                        elif a is not None:
                            os.dup2(a, b)
                    _dup2(p2cread, 0)
                    _dup2(c2pwrite, 1)
                    _dup2(errwrite, 2)

                    # Close pipe fds.  Make sure we don't close the
                    # same fd more than once, or standard fds.
                    closed = { None }
                    for fd in [p2cread, c2pwrite, errwrite]:
                        if fd not in closed and fd > 2:
                            os.close(fd)
                            closed.add(fd)

                    if cwd is not None:
                        os.chdir(cwd)

                    if preexec_fn:
                        preexec_fn()

                    # Close all other fds, if asked for - after
                    # preexec_fn(), which may open FDs.
                    if close_fds:
                        self._close_fds(but=errpipe_write)

                    if env is None:
                        os.execvp(executable, args)
                    else:
                        os.execvpe(executable, args, env)
                except:
                    exc_type, exc_value, tb = sys.exc_info()
                    # Save the traceback and attach it to the exception object
                    exc_lines = traceback.format_exception(exc_type,
                                                           exc_value,
                                                           tb)
                    exc_value.child_traceback = ''.join(exc_lines)
                    os.write(errpipe_write, pickle.dumps(exc_value))

                # This exitcode won't be reported to applications, so it
                # really doesn't matter what we return.
                os._exit(255)

            # Parent
            if gc_was_enabled:
                gc.enable()
        finally:
            # be sure the FD is closed no matter what
            os.close(errpipe_write)

        # Wait for exec to fail or succeed; possibly raising exception
        # Exception limited to 1M
        data = _eintr_retry_call(os.read, errpipe_read, 1048576)
    finally:
        if p2cread is not None and p2cwrite is not None:
            _close_in_parent(p2cread)
        if c2pwrite is not None and c2pread is not None:
            _close_in_parent(c2pwrite)
        if errwrite is not None and errread is not None:
            _close_in_parent(errwrite)

        # be sure the FD is closed no matter what
        os.close(errpipe_read)

    if data != "":
        try:
            _eintr_retry_call(os.waitpid, self.pid, 0)
        except OSError as e:
            if e.errno != errno.ECHILD:
                raise
        child_exception = pickle.loads(data)
        raise child_exception

最后我们模拟工程中的一段代码,测度wait()子程序,把主程序阻塞,会不会影响主进程的timer的计时:

先一个subprocess.Popen


import shlex
cmd_str='ps -aux'
#p=subprocess.Popen(args)
p=subprocess.Popen(args,shell=True,stdout=subprocess.PIPE)
out=p.stdout.readlines()
print out

在Popen初始形加,加入stdout的定义,所有命令行输出的文本会被定向到PIPE管道中。
通过下面实际的例子,观察程序的结果:
一种是timer是否能提示结束子程序,另一种是communicate是否能阻塞timer。

import shlex
from threading import Timer
import subprocess
cmd_str='ps -aux'

def CandyLab(p):
        print "CandyLab"
        p.terminate()
        return

def Orchina():
        print "Orchina"
        t = Timer( 1, Orchina);
        t.start()
        return

p=subprocess.Popen("python popentst.py",shell=True)

t = Timer( 3, CandyLab, args=(p,));
t.start()
p.communicate()
t.cancel

popentst.py

import time
time.sleep(5)
print "sleep"

test.py

import shlex
from threading import Timer
import subprocess
cmd_str='ps -aux'

def CandyLab(p):
        print "CandyLab"
        #p.terminate()
        return

def Orchina():
        print "Orchina"
        t = Timer( 1, Orchina);
        t.start()
        return

p=subprocess.Popen("python popentst.py",shell=True)

t = Timer( 1, CandyLab, args=(p,));
t.start()
p.communicate()
t.cancel

Timer的时间比Sleep设置的时间短,如果阻塞timer,子程序的print应该先打印出来,结果没有。
无论是commicate()还是wait()都是不能阻塞timer的,想用timer提前结束一个子程序,也要经过处理。

Timer直接关联的类是Event,然后是Conditon,之后是Lock,太长不分析,本篇的最初的测试目地已经达到了。