• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python sys.getcheckinterval函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sys.getcheckinterval函数的典型用法代码示例。如果您正苦于以下问题:Python getcheckinterval函数的具体用法?Python getcheckinterval怎么用?Python getcheckinterval使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了getcheckinterval函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_setcheckinterval

 def test_setcheckinterval(self):
     import sys
     raises(TypeError, sys.setcheckinterval)
     orig = sys.getcheckinterval()
     for n in 0, 100, 120, orig: # orig last to restore starting state
         sys.setcheckinterval(n)
         assert sys.getcheckinterval() == n
开发者ID:yuyichao,项目名称:pypy,代码行数:7,代码来源:test_sysmodule.py


示例2: test_usage

    def test_usage(self):
        import sys

        ticks = sys.getcheckinterval()

        with self.assertRaisesRegexp(RuntimeError, "timeout"):
            start = time.time()
            with timeout(0.01, 0.01):
                time.sleep(1)
        self.assertLess(time.time() - start, 1)
        self.assertEqual(ticks, sys.getcheckinterval())

        with self.assertRaises(KeyboardInterrupt):
            with timeout(0.01, 0.01):
                raise KeyboardInterrupt
        self.assertEqual(ticks, sys.getcheckinterval())

        with self.assertRaisesRegexp(RuntimeError, "timeout"):
            start = time.time()
            with timeout(0.01, 0.01, 10):
                while True:
                    pass
        self.assertLess(time.time() - start, 0.03)
        self.assertEqual(ticks, sys.getcheckinterval())

        # 0 for forever
        with timeout(0, 0.001):
            time.sleep(0.01)
        self.assertEqual(ticks, sys.getcheckinterval())
开发者ID:MrLYC,项目名称:ycyc,代码行数:29,代码来源:test_contextutils.py


示例3: test_setcheckinterval

 def test_setcheckinterval(self):
     with warnings.catch_warnings():
         warnings.simplefilter("ignore")
         self.assertRaises(TypeError, sys.setcheckinterval)
         orig = sys.getcheckinterval()
         for n in 0, 100, 120, orig: # orig last to restore starting state
             sys.setcheckinterval(n)
             self.assertEqual(sys.getcheckinterval(), n)
开发者ID:MaximVanyushkin,项目名称:Sharp.RemoteQueryable,代码行数:8,代码来源:test_sys.py


示例4: test_setcheckinterval

 def test_setcheckinterval(self):
     if test.test_support.due_to_ironpython_bug("http://tkbgitvstfat01:8080/WorkItemTracking/WorkItem.aspx?artifactMoniker=148342"):
         return
     self.assertRaises(TypeError, sys.setcheckinterval)
     orig = sys.getcheckinterval()
     for n in 0, 100, 120, orig: # orig last to restore starting state
         sys.setcheckinterval(n)
         self.assertEquals(sys.getcheckinterval(), n)
开发者ID:BillyboyD,项目名称:main,代码行数:8,代码来源:test_sys.py


示例5: TimeTest002

def TimeTest002():
    import time
    import sys
    print sys.getcheckinterval()
    for i in range (100):
         # now = time.time()
         now = time.clock()
         # b = time.__doc__()
         print i, " - ", now
    print 'STOP'
开发者ID:MakSim345,项目名称:Python,代码行数:10,代码来源:pygame01.py


示例6: testConfigureInterpreter

 def testConfigureInterpreter(self):
     oldcheckinterval = sys.getcheckinterval()
     newcheckinterval = oldcheckinterval + 1
     conf = self.load_config_text("""
                 instancehome <<INSTANCE_HOME>>
                 python-check-interval %d
                 """ % newcheckinterval)
     try:
         starter = self.get_starter(conf)
         starter.setupInterpreter()
         self.assertEqual(sys.getcheckinterval(), newcheckinterval)
     finally:
         sys.setcheckinterval(oldcheckinterval)
开发者ID:dhavlik,项目名称:Zope,代码行数:13,代码来源:test_starter.py


示例7: atomic

 def atomic(new_checkinterval=sys.getcheckinterval(),
            maxint=sys.maxint,
            getcheckinterval=sys.getcheckinterval,
            setcheckinterval=sys.setcheckinterval):
     setcheckinterval(maxint)
     try:
         reset_check_interval = True
         if ctypes.addressof(P_GIL.contents) == ctypes.addressof(GIL):
             lock = thread.allocate_lock()
             lock.acquire()
             gil = P_GIL.contents
             P_GIL.contents = get_pointer_to_lock(lock)
             try:
                 setcheckinterval(new_checkinterval)
                 reset_check_interval = False
                 yield True
             finally:
                 P_GIL.contents = gil
         else:
             setcheckinterval(new_checkinterval)
             reset_check_interval = False
             yield True
     finally:
         if reset_check_interval:
             setcheckinterval(new_checkinterval)
开发者ID:akruis,项目名称:bad_ideas,代码行数:25,代码来源:gil_hacks.py


示例8: diag_encoding

def diag_encoding(ct):

    ct.h2("System Encodings")
    
    s="    locale.getdefaultlocale(): "\
       + repr(locale.getdefaultlocale())

    s+="\n    sys.getdefaultencoding() : "+ sys.getdefaultencoding()
    s+="\n    sys.getfilesystemencoding() : " + sys.getfilesystemencoding()
    s+="\n    sys.stdout.encoding : "
    try:
        s+=str(sys.stdout.encoding)
    except AttributeError:
        s+=("(undefined)")
    s+="\n    sys.stdin.encoding : "
    try:
        s+=str(sys.stdin.encoding)
    except AttributeError:
        s+="(undefined)"
    s+="\n    sys.getcheckinterval() : %r " % sys.getcheckinterval()
    s+="\n    sys.getwindowsversion() : %r " % (sys.getwindowsversion(),)
    
    
    s+="\n"
    ct.pre(s)
开发者ID:BackupTheBerlios,项目名称:lino-svn,代码行数:25,代码来源:sysinfo.py


示例9: xdump

def xdump(path, show_scheme=True, show_data=True):
    # print "query_res " + str(xquery_res)
    xobj, scheme, ret_type = list_path(path)
    if xobj is None:
        return None
    if ret_type == "DIR":
        ret_fields = [['dir']]
        for (son_dir_name, son_dir) in xobj.items():
            ret_fields.append([add_cross_if_dir(son_dir_name, son_dir)])
        return ret_fields
    ret_fields = list()
    if show_scheme:
        ret_fields.append(list(scheme.keys()))
    if ret_type == "LOGS":
        ret_fields.extend(xobj)
        return ret_fields
    def_interval = sys.getcheckinterval()
    # TODO: maybe copy before and no need to lock?
    sys.setcheckinterval(1000000000)
    try:
        ret_fields.extend(decompose_fields(xobj,
                                           show_scheme=False,
                                           show_data=show_data))
    except Exception as e:
        raise e
    finally:
        sys.setcheckinterval(def_interval)
    return ret_fields
开发者ID:kfir-drivenets,项目名称:pyxray,代码行数:28,代码来源:xnode.py


示例10: inject_jump

    def inject_jump(self, where, dest):
        """
        Monkeypatch bytecode at ``where`` to force it to jump to ``dest``.

        Returns function which puts things back how they were.
        """
        # We're about to do dangerous things to a functions code content.
        # We can't make a lock to prevent the interpreter from using those
        # bytes, so the best we can do is to set the check interval to be high
        # and just pray that this keeps other threads at bay.
        old_check_interval = sys.getcheckinterval()
        sys.setcheckinterval(2**20)

        pb = ctypes.pointer(self.ob_sval)
        orig_bytes = [pb[where+i][0] for i in xrange(where)]

        v = struct.pack("<BH", opcode.opmap["JUMP_ABSOLUTE"], dest)

        # Overwrite code to cause it to jump to the target
        for i in xrange(3):
            pb[where+i][0] = ord(v[i])

        def tidy_up():
            """
            Put the bytecode back how it was. Good as new.
            """
            sys.setcheckinterval(old_check_interval)
            for i in xrange(3):
                pb[where+i][0] = orig_bytes[i]

        return tidy_up
开发者ID:akubera,项目名称:rootpy,代码行数:31,代码来源:magic.py


示例11: get_py_internals

def get_py_internals():
    py_internals = []
    if hasattr(sys, 'builtin_module_names'):
        py_internals.append(
            ('Built-in Modules', ', '.join(sys.builtin_module_names)))
    py_internals.append(('Byte Order', sys.byteorder + ' endian'))

    if hasattr(sys, 'getcheckinterval'):
        py_internals.append(('Check Interval', sys.getcheckinterval()))

    if hasattr(sys, 'getfilesystemencoding'):
        py_internals.append(
            ('File System Encoding', sys.getfilesystemencoding()))

    max_integer_size = str(sys.maxsize) + ' (%s)' % \
        hex(sys.maxsize).upper()
    py_internals.append(('Maximum Integer Size', max_integer_size))

    if hasattr(sys, 'getrecursionlimit'):
        py_internals.append(('Maximum Recursion Depth',
                            sys.getrecursionlimit()))

    if hasattr(sys, 'tracebacklimit'):
        traceback_limit = sys.tracebacklimit
    else:
        traceback_limit = 1000
    py_internals.append(('Maximum Traceback Limit', traceback_limit))

    py_internals.append(('Maximum Code Point', sys.maxunicode))
    return py_internals
开发者ID:lifenglifeng001,项目名称:flask-hello,代码行数:30,代码来源:pyinfo.py


示例12: __call__

    def __call__(self, *args, **kwargs):
        """
        Execute the callback.
        """
        if self.finished:
            # We're finished before we even started.  The only sane reason for
            # this is that the we were aborted, so check for for this, and if
            # it's not the case, log an error.
            if self.failed and self._exception[0] == InProgressAborted:
                # Aborted, fine.
                return

            # This shouldn't happen.  If it does, it's certainly an error
            # condition.  But as we are inside the thread now and already
            # finished, we can't really raise an exception.  So logging the
            # error will have to suffice.
            log.error('Attempting to start thread which has already finished')

        if self._callback is None:
            # Attempting to invoke multiple times?  Shouldn't happen.
            return None

        try:
            result = self._callback()
            # Kludge alert: InProgressAborted gets raised asynchronously inside
            # the thread.  Assuming it doesn't inadvertently get cleared out
            # by PyErr_Clear(), it may take up to check-interval bytecodes for
            # it to trigger.  So we do a dummy loop to chew up that many byte
            # codes (roughly) to cause any pending async InProgressAborted to
            # raise here, which we'll catch next.  The overhead added by this
            # loop is negligible.  [About 10us on my system]
            for i in xrange(sys.getcheckinterval()):
                pass
        except InProgressAborted:
            # InProgressAborted was raised inside the thread (from the InProgress
            # abort handler).  This means we're already finished, so there's no
            # need to do anything further.
            pass
        except:
            # FIXME: should we really be catching KeyboardInterrupt and SystemExit?
            MainThreadCallback(self.throw)(*sys.exc_info())
        else:
            if type(result) == types.GeneratorType or isinstance(result, InProgress):
                # Looks like the callback is yielding something, or callback is a
                # coroutine-decorated function.  Not supported (yet?).  In the
                # case of coroutines, the first entry will execute in the
                # thread, but subsequent entries (via the generator's next())
                # will be from the mainthread, which is almost certainly _not_
                # what is intended by threading a coroutine.
                log.warning('NYI: coroutines cannot (yet) be executed in threads.')

            # If we're finished, it means we were aborted, but probably caught the
            # InProgressAborted inside the threaded callback.  If so, we discard the
            # return value from the callback, as we're considered finished.  Otherwise
            # finish up in the mainthread.
            if not self.finished:
                MainThreadCallback(self.finish)(result)

        self._callback = None
开发者ID:jpmunz,项目名称:smartplayer,代码行数:59,代码来源:thread.py


示例13: short_checkinterval

def short_checkinterval(request):
    """
    Sets a small interval using sys.setcheckinterval to cause many context
    switches.
    """
    old_interval = sys.getcheckinterval()
    sys.setcheckinterval(0)
    request.addfinalizer(lambda: sys.setcheckinterval(old_interval))
开发者ID:Bluehorn,项目名称:calltrace,代码行数:8,代码来源:test_current_frames.py


示例14: _collect

 def _collect(self):
     gc.collect()
     check_interval = sys.getcheckinterval()
     sys.setcheckinterval(sys.maxint)
     try:
         return {id(object) for object in gc.get_objects() if not isinstance(object, EXCLUDE_TYPES)}
     finally:
         sys.setcheckinterval(check_interval)
开发者ID:VDOMBoxGroup,项目名称:runtime2.0,代码行数:8,代码来源:snapshots.py


示例15: run

    def run(self):
        pthread_setname_np(self.ident, "Manhole ----")

        client = self.client
        client.settimeout(None)
        pid, uid, gid = get_peercred(client)
        euid = os.geteuid()
        client_name = "PID:%s UID:%s GID:%s" % (pid, uid, gid)
        if uid not in (0, euid):
            raise SuspiciousClient(
                "Can't accept client with %s. "
                "It doesn't match the current EUID:%s or ROOT." % (
                    client_name, euid
            ))

        cry("Accepted connection %s from %s" % (client, client_name))
        pthread_setname_np(self.ident, "Manhole %s" % pid)
        client.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 0)
        client.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 0)
        backup = []
        try:
            client_fd = client.fileno()
            for mode, names in (
                ('w', (
                    'stderr',
                    'stdout',
                    '__stderr__',
                    '__stdout__'
                )),
                ('r', (
                    'stdin',
                    '__stdin__'
                ))
            ):
                for name in names:
                    backup.append((name, getattr(sys, name)))
                    setattr(sys, name, os.fdopen(client_fd, mode, 0))

            run_repl()
            cry("DONE.")
        finally:
            cry("Cleaning up.")
            old_interval = sys.getcheckinterval()
            sys.setcheckinterval(2147483647)
            junk = [] # keep the old file objects alive for a bit
            for name, fh in backup:
                junk.append(getattr(sys, name))
                setattr(sys, name, fh)
            del backup
            for fh in junk:
                try:
                    fh.close()
                except IOError:
                    pass
                del fh
            del junk
            self.client = None
            sys.setcheckinterval(old_interval)
开发者ID:knzm,项目名称:python-manhole,代码行数:58,代码来源:manhole.py


示例16: printCheckInterval

def printCheckInterval(self):
    """ Get/set python check interval """
    request = self.REQUEST
    interval = request.get('interval', None)

    if interval:
        return sys.setcheckinterval(int(interval))
    else:
        return sys.getcheckinterval()
开发者ID:eea,项目名称:Products.EEAPloneAdmin,代码行数:9,代码来源:setupmethods.py


示例17: setUp

 def setUp(self):
     # Set a very small check interval, this will make it more likely
     # that the interpreter crashes when threading is done incorrectly.
     if sys.version_info[:2] >= (3, 2):
         self._int = sys.getswitchinterval()
         sys.setswitchinterval(0.0000001)
     else:
         self._int = sys.getcheckinterval()
         sys.setcheckinterval(1)
开发者ID:aosm,项目名称:pyobjc,代码行数:9,代码来源:test_threading.py


示例18: test_trashcan_threads

    def test_trashcan_threads(self):
        # Issue #13992: trashcan mechanism should be thread-safe
        NESTING = 60
        N_THREADS = 2

        def sleeper_gen():
            """A generator that releases the GIL when closed or dealloc'ed."""
            try:
                yield
            finally:
                time.sleep(0.000001)

        class C(list):
            # Appending to a list is atomic, which avoids the use of a lock.
            inits = []
            dels = []
            def __init__(self, alist):
                self[:] = alist
                C.inits.append(None)
            def __del__(self):
                # This __del__ is called by subtype_dealloc().
                C.dels.append(None)
                # `g` will release the GIL when garbage-collected.  This
                # helps assert subtype_dealloc's behaviour when threads
                # switch in the middle of it.
                g = sleeper_gen()
                next(g)
                # Now that __del__ is finished, subtype_dealloc will proceed
                # to call list_dealloc, which also uses the trashcan mechanism.

        def make_nested():
            """Create a sufficiently nested container object so that the
            trashcan mechanism is invoked when deallocating it."""
            x = C([])
            for i in range(NESTING):
                x = [C([x])]
            del x

        def run_thread():
            """Exercise make_nested() in a loop."""
            while not exit:
                make_nested()

        old_checkinterval = sys.getcheckinterval()
        sys.setcheckinterval(3)
        try:
            exit = []
            threads = []
            for i in range(N_THREADS):
                t = threading.Thread(target=run_thread)
                threads.append(t)
            with start_threads(threads, lambda: exit.append(1)):
                time.sleep(1.0)
        finally:
            sys.setcheckinterval(old_checkinterval)
        gc.collect()
        self.assertEqual(len(C.inits), len(C.dels))
开发者ID:AtomicConductor,项目名称:conductor_client,代码行数:57,代码来源:test_gc.py


示例19: __unicode__

 def __unicode__(self):
     """Return a serialized version of this tree/branch."""
     
     ci = getcheckinterval()
     setcheckinterval(0)
     
     value = ''.join(self.render('utf8')).decode('utf8')
     
     setcheckinterval(ci)
     return value
开发者ID:marrow,项目名称:contentment,代码行数:10,代码来源:__init__.py


示例20: _empty_async

def _empty_async():
    # run this after changing the signal handler away from the iptyhon 0.10
    # one, in order to remove any possible queued PyThreadState_SetAsyncExc
    # exception
    # This function can be interrupted by any exception, but probably
    # by KeyboardInterrupt
    # We need to use the same inner loop as for test_async below
    # for the default check interval of 100, this functions
    #  takes 5.5 us on reuletlab4
    for i in range(sys.getcheckinterval()+50):
        pass
开发者ID:CamilleBo,项目名称:pyHegel,代码行数:11,代码来源:kbint_util.py



注:本文中的sys.getcheckinterval函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sys.getdefaultencoding函数代码示例发布时间:2022-05-27
下一篇:
Python sys.getPath函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap