• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python support.start_threads函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中test.support.start_threads函数的典型用法代码示例。如果您正苦于以下问题:Python start_threads函数的具体用法?Python start_threads怎么用?Python start_threads使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了start_threads函数的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_lru_cache_threaded

    def test_lru_cache_threaded(self):
        n, m = 5, 11

        class C:

            def orig(self, x, y):
                return 3 * x + y

        instance = C()

        f = per_instance_lru_cache(maxsize=n*m)(C.orig)
        hits, misses, maxsize, currsize = f.cache_info(instance)
        self.assertEqual(currsize, 0)

        start = threading.Event()
        def full(k):
            start.wait(10)
            for _ in range(m):
                self.assertEqual(f(instance, k, 0), instance.orig(k, 0))

        def clear():
            start.wait(10)
            for _ in range(2*m):
                f.cache_clear(instance)

        orig_si = sys.getswitchinterval()
        sys.setswitchinterval(1e-6)
        try:
            # create n threads in order to fill cache
            threads = [threading.Thread(target=full, args=[k]) for k in range(n)]
            with support.start_threads(threads):
                start.set()

            hits, misses, maxsize, currsize = f.cache_info(instance)

            # XXX: Why can be not equal?
            self.assertLessEqual(misses, n)
            self.assertLessEqual(hits, m*n - misses)
            self.assertEqual(currsize, n)

            # create n threads in order to fill cache and 1 to clear it
            threads = [threading.Thread(target=clear)]
            threads += [threading.Thread(target=full, args=[k]) for k in range(n)]
            start.clear()
            with support.start_threads(threads):
                start.set()
        finally:
            sys.setswitchinterval(orig_si)
开发者ID:TangledWeb,项目名称:tangled,代码行数:48,代码来源:test_per_instance_lru_cache.py


示例2: check_parallel_module_init

    def check_parallel_module_init(self, mock_os):
        if imp.lock_held():
            # This triggers on, e.g., from test import autotest.
            raise unittest.SkipTest("can't run when import lock is held")

        done = threading.Event()
        for N in (20, 50) * 3:
            if verbose:
                print("Trying", N, "threads ...", end=' ')
            # Make sure that random and modulefinder get reimported freshly
            for modname in ['random', 'modulefinder']:
                try:
                    del sys.modules[modname]
                except KeyError:
                    pass
            errors = []
            done_tasks = []
            done.clear()
            t0 = time.monotonic()
            with start_threads(threading.Thread(target=task,
                                                args=(N, done, done_tasks, errors,))
                               for i in range(N)):
                pass
            completed = done.wait(10 * 60)
            dt = time.monotonic() - t0
            if verbose:
                print("%.1f ms" % (dt*1e3), flush=True, end=" ")
            dbg_info = 'done: %s/%s' % (len(done_tasks), N)
            self.assertFalse(errors, dbg_info)
            self.assertTrue(completed, dbg_info)
            if verbose:
                print("OK.")
开发者ID:M31MOTH,项目名称:cpython,代码行数:32,代码来源:test_threaded_import.py


示例3: test_lru_cache_threaded2

    def test_lru_cache_threaded2(self):
        # Simultaneous call with the same arguments
        n, m = 5, 7
        start = threading.Barrier(n+1)
        pause = threading.Barrier(n+1)
        stop = threading.Barrier(n+1)

        class C:

            @per_instance_lru_cache(maxsize=m*n)
            def f(self, x):
                pause.wait(10)
                return 3 * x

        instance = C()

        self.assertEqual(C.f.cache_info(instance), (0, 0, m*n, 0))

        def test():
            for i in range(m):
                start.wait(10)
                self.assertEqual(C.f(instance, i), 3 * i)
                stop.wait(10)

        threads = [threading.Thread(target=test) for k in range(n)]
        with support.start_threads(threads):
            for i in range(m):
                start.wait(10)
                stop.reset()
                pause.wait(10)
                start.reset()
                stop.wait(10)
                pause.reset()
                self.assertEqual(C.f.cache_info(instance), (0, (i+1)*n, m*n, i+1))
开发者ID:TangledWeb,项目名称:tangled,代码行数:34,代码来源:test_per_instance_lru_cache.py


示例4: check_parallel_module_init

    def check_parallel_module_init(self):
        if imp.lock_held():
            # This triggers on, e.g., from test import autotest.
            raise unittest.SkipTest("can't run when import lock is held")

        done = threading.Event()
        for N in (20, 50) * 3:
            if verbose:
                print("Trying", N, "threads ...", end=' ')
            # Make sure that random and modulefinder get reimported freshly
            for modname in ['random', 'modulefinder']:
                try:
                    del sys.modules[modname]
                except KeyError:
                    pass
            errors = []
            done_tasks = []
            done.clear()
            with start_threads(threading.Thread(target=task,
                                                args=(N, done, done_tasks, errors,))
                               for i in range(N)):
                pass
            self.assertTrue(done.wait(60))
            self.assertFalse(errors)
            if verbose:
                print("OK.")
开发者ID:chidea,项目名称:GoPythonDLLWrapper,代码行数:26,代码来源:test_threaded_import.py


示例5: run_threads

    def run_threads(self, n_feeders, n_consumers, q, inputs,
                    feed_func, consume_func):
        results = []
        sentinel = None
        seq = inputs + [sentinel] * n_consumers
        seq.reverse()
        rnd = random.Random(42)

        exceptions = []
        def log_exceptions(f):
            def wrapper(*args, **kwargs):
                try:
                    f(*args, **kwargs)
                except BaseException as e:
                    exceptions.append(e)
            return wrapper

        feeders = [threading.Thread(target=log_exceptions(feed_func),
                                    args=(q, seq, rnd))
                   for i in range(n_feeders)]
        consumers = [threading.Thread(target=log_exceptions(consume_func),
                                      args=(q, results, sentinel))
                     for i in range(n_consumers)]

        with support.start_threads(feeders + consumers):
            pass

        self.assertFalse(exceptions)
        self.assertTrue(q.empty())
        self.assertEqual(q.qsize(), 0)

        return results
开发者ID:Apoorvadabhere,项目名称:cpython,代码行数:32,代码来源:test_queue.py


示例6: test_trashcan_threads

    def test_trashcan_threads(self):
        # Issue #13992: trashcan mechanism should be thread-safe
        NESTING = 60
        N_THREADS = 2

        def sleeper_gen():
            """A generator that releases the GIL when closed or dealloc'ed."""
            try:
                yield
            finally:
                time.sleep(0.000001)

        class C(list):
            # Appending to a list is atomic, which avoids the use of a lock.
            inits = []
            dels = []
            def __init__(self, alist):
                self[:] = alist
                C.inits.append(None)
            def __del__(self):
                # This __del__ is called by subtype_dealloc().
                C.dels.append(None)
                # `g` will release the GIL when garbage-collected.  This
                # helps assert subtype_dealloc's behaviour when threads
                # switch in the middle of it.
                g = sleeper_gen()
                next(g)
                # Now that __del__ is finished, subtype_dealloc will proceed
                # to call list_dealloc, which also uses the trashcan mechanism.

        def make_nested():
            """Create a sufficiently nested container object so that the
            trashcan mechanism is invoked when deallocating it."""
            x = C([])
            for i in range(NESTING):
                x = [C([x])]
            del x

        def run_thread():
            """Exercise make_nested() in a loop."""
            while not exit:
                make_nested()

        old_switchinterval = sys.getswitchinterval()
        sys.setswitchinterval(1e-5)
        try:
            exit = []
            threads = []
            for i in range(N_THREADS):
                t = threading.Thread(target=run_thread)
                threads.append(t)
            with start_threads(threads, lambda: exit.append(1)):
                time.sleep(1.0)
        finally:
            sys.setswitchinterval(old_switchinterval)
        gc.collect()
        self.assertEqual(len(C.inits), len(C.dels))
开发者ID:MaximVanyushkin,项目名称:Sharp.RemoteQueryable,代码行数:57,代码来源:test_gc.py


示例7: testThreading

 def testThreading(self):
     # Issue #7205: Using a BZ2File from several threads shouldn't deadlock.
     data = b"1" * 2**20
     nthreads = 10
     with BZ2File(self.filename, 'wb') as f:
         def comp():
             for i in range(5):
                 f.write(data)
         threads = [threading.Thread(target=comp) for i in range(nthreads)]
         with support.start_threads(threads):
             pass
开发者ID:10sr,项目名称:cpython,代码行数:11,代码来源:test_bz2.py


示例8: test_main

    def test_main(self):
        threads = [TempFileGreedy() for i in range(NUM_THREADS)]
        with start_threads(threads, startEvent.set):
            pass
        ok = sum(t.ok_count for t in threads)
        errors = [str(t.name) + str(t.errors.getvalue())
                  for t in threads if t.error_count]

        msg = "Errors: errors %d ok %d\n%s" % (len(errors), ok,
            '\n'.join(errors))
        self.assertEqual(errors, [], msg)
        self.assertEqual(ok, NUM_THREADS * FILES_PER_THREAD)
开发者ID:10sr,项目名称:cpython,代码行数:12,代码来源:test_threadedtempfile.py


示例9: test_lru_cache_threaded3

    def test_lru_cache_threaded3(self):
        class C:

            @per_instance_lru_cache(maxsize=2)
            def f(self, x):
                time.sleep(.01)
                return 3 * x

        instance = C()

        def test(i, x):
            with self.subTest(thread=i):
                self.assertEqual(instance.f(x), 3 * x, i)
        threads = [threading.Thread(target=test, args=(i, v))
                   for i, v in enumerate([1, 2, 2, 3, 2])]
        with support.start_threads(threads):
            pass
开发者ID:TangledWeb,项目名称:tangled,代码行数:17,代码来源:test_per_instance_lru_cache.py


示例10: test_pendingcalls_threaded

    def test_pendingcalls_threaded(self):

        #do every callback on a separate thread
        n = 32 #total callbacks
        threads = []
        class foo(object):pass
        context = foo()
        context.l = []
        context.n = 2 #submits per thread
        context.nThreads = n // context.n
        context.nFinished = 0
        context.lock = threading.Lock()
        context.event = threading.Event()

        threads = [threading.Thread(target=self.pendingcalls_thread,
                                    args=(context,))
                   for i in range(context.nThreads)]
        with support.start_threads(threads):
            self.pendingcalls_wait(context.l, n, context)
开发者ID:lupulin,项目名称:cpython,代码行数:19,代码来源:test_capi.py


示例11: test_derived

    def test_derived(self):
        # Issue 3088: if there is a threads switch inside the __init__
        # of a threading.local derived class, the per-thread dictionary
        # is created but not correctly set on the object.
        # The first member set may be bogus.
        import time
        class Local(self._local):
            def __init__(self):
                time.sleep(0.01)
        local = Local()

        def f(i):
            local.x = i
            # Simply check that the variable is correctly set
            self.assertEqual(local.x, i)

        with support.start_threads(threading.Thread(target=f, args=(i,))
                                   for i in range(10)):
            pass
开发者ID:MaximVanyushkin,项目名称:Sharp.RemoteQueryable,代码行数:19,代码来源:test_threading_local.py



注:本文中的test.support.start_threads函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python support.strip_python_stderr函数代码示例发布时间:2022-05-27
下一篇:
Python support.run_unittest函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap