Source code for testplan.common.utils.thread

"""Threading utilities."""

import time
import threading

from .timing import TimeoutException


[docs]def execute_as_thread( target, args=None, kwargs=None, daemon=False, join=True, break_join=None, join_sleep=0.01, timeout=None, ): """ Execute target callable in a separate thread. :param target: Target callable. :type target: ``callable`` :param args: Callable args. :type args: ``tuple`` :param kwargs: Callable kwargs. :type kwargs: ``kwargs`` :param daemon: Set daemon thread. :type daemon: ``bool`` :param join: Join thread before return. :type join: ``bool`` :param break_join: Condition for join early break. :type break_join: ``callable`` :param join_sleep: Join break condition check sleep time. :type join_sleep: ``int`` :param timeout: Timeout duration. :type timeout: :py:class:`~testplan.common.utils.timing.TimeoutException` """ thr = threading.Thread( target=target, args=args or tuple(), kwargs=kwargs or {} ) thr.daemon = daemon thr.start() if join is True: start_time = time.time() while True: if not thr.is_alive(): return if break_join is not None and break_join(): break if timeout and time.time() - start_time > timeout: raise TimeoutException( "Thread {} timeout after {}s".format(thr, timeout) ) time.sleep(join_sleep)
[docs]def interruptible_join(thread, timeout=None): """ Joining a thread without ignoring signal interrupts. :param thread: Thread object to wait to terminate. :type thread: ``threading.Thread`` :param timeout: If specified, TimeoutException will be raised if the thread does not terminate within the specified timeout. :type timeout: ``Optional[numbers.Number]`` """ if timeout is None: end_time = None else: end_time = time.time() + timeout while end_time is None or time.time() < end_time: time.sleep(0.1) if not thread.is_alive(): thread.join() break if thread.is_alive(): raise TimeoutException( "Thread {thr} timed out after {timeout} seconds.".format( thr=thread, timeout=timeout ) )
[docs]class Barrier: """ Implements a re-usable, two-phase barrier. Allows a fixed number of threads to wait for each other to reach a certain point. For python >= 3.2 you can just use threading.Barrier instead, this class is provided for compatibility with Python 2. :param n: Number of threads to wait for at the barrier. :type n: ``int`` """ def __init__(self, n): self.n = n self._count = 0 self._mutex = threading.Lock() self._turnstile = threading.Semaphore(0) self._turnstile2 = threading.Semaphore(0)
[docs] def wait(self): """Wait for all threads to reach the barrier before returning.""" self._phase1() self._phase2()
def _phase1(self): """ Phase 1: waits for all threads to reach this point and increment the count. """ with self._mutex: self._count += 1 if self._count == self.n: for _ in range(self.n): self._turnstile.release() self._turnstile.acquire() def _phase2(self): """Phase 2: resets the count so that the barrier can be reused.""" with self._mutex: self._count -= 1 if self._count == 0: for _ in range(self.n): self._turnstile2.release() self._turnstile2.acquire()