本文整理汇总了Python中sys.getcheckinterval函数的典型用法代码示例。如果您正苦于以下问题:Python getcheckinterval函数的具体用法?Python getcheckinterval怎么用?Python getcheckinterval使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了getcheckinterval函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_setcheckinterval
def test_setcheckinterval(self):
import sys
raises(TypeError, sys.setcheckinterval)
orig = sys.getcheckinterval()
for n in 0, 100, 120, orig: # orig last to restore starting state
sys.setcheckinterval(n)
assert sys.getcheckinterval() == n
开发者ID:yuyichao,项目名称:pypy,代码行数:7,代码来源:test_sysmodule.py
示例2: test_usage
def test_usage(self):
import sys
ticks = sys.getcheckinterval()
with self.assertRaisesRegexp(RuntimeError, "timeout"):
start = time.time()
with timeout(0.01, 0.01):
time.sleep(1)
self.assertLess(time.time() - start, 1)
self.assertEqual(ticks, sys.getcheckinterval())
with self.assertRaises(KeyboardInterrupt):
with timeout(0.01, 0.01):
raise KeyboardInterrupt
self.assertEqual(ticks, sys.getcheckinterval())
with self.assertRaisesRegexp(RuntimeError, "timeout"):
start = time.time()
with timeout(0.01, 0.01, 10):
while True:
pass
self.assertLess(time.time() - start, 0.03)
self.assertEqual(ticks, sys.getcheckinterval())
# 0 for forever
with timeout(0, 0.001):
time.sleep(0.01)
self.assertEqual(ticks, sys.getcheckinterval())
开发者ID:MrLYC,项目名称:ycyc,代码行数:29,代码来源:test_contextutils.py
示例3: test_setcheckinterval
def test_setcheckinterval(self):
with warnings.catch_warnings():
warnings.simplefilter("ignore")
self.assertRaises(TypeError, sys.setcheckinterval)
orig = sys.getcheckinterval()
for n in 0, 100, 120, orig: # orig last to restore starting state
sys.setcheckinterval(n)
self.assertEqual(sys.getcheckinterval(), n)
开发者ID:MaximVanyushkin,项目名称:Sharp.RemoteQueryable,代码行数:8,代码来源:test_sys.py
示例4: test_setcheckinterval
def test_setcheckinterval(self):
if test.test_support.due_to_ironpython_bug("http://tkbgitvstfat01:8080/WorkItemTracking/WorkItem.aspx?artifactMoniker=148342"):
return
self.assertRaises(TypeError, sys.setcheckinterval)
orig = sys.getcheckinterval()
for n in 0, 100, 120, orig: # orig last to restore starting state
sys.setcheckinterval(n)
self.assertEquals(sys.getcheckinterval(), n)
开发者ID:BillyboyD,项目名称:main,代码行数:8,代码来源:test_sys.py
示例5: TimeTest002
def TimeTest002():
import time
import sys
print sys.getcheckinterval()
for i in range (100):
# now = time.time()
now = time.clock()
# b = time.__doc__()
print i, " - ", now
print 'STOP'
开发者ID:MakSim345,项目名称:Python,代码行数:10,代码来源:pygame01.py
示例6: testConfigureInterpreter
def testConfigureInterpreter(self):
oldcheckinterval = sys.getcheckinterval()
newcheckinterval = oldcheckinterval + 1
conf = self.load_config_text("""
instancehome <<INSTANCE_HOME>>
python-check-interval %d
""" % newcheckinterval)
try:
starter = self.get_starter(conf)
starter.setupInterpreter()
self.assertEqual(sys.getcheckinterval(), newcheckinterval)
finally:
sys.setcheckinterval(oldcheckinterval)
开发者ID:dhavlik,项目名称:Zope,代码行数:13,代码来源:test_starter.py
示例7: atomic
def atomic(new_checkinterval=sys.getcheckinterval(),
maxint=sys.maxint,
getcheckinterval=sys.getcheckinterval,
setcheckinterval=sys.setcheckinterval):
setcheckinterval(maxint)
try:
reset_check_interval = True
if ctypes.addressof(P_GIL.contents) == ctypes.addressof(GIL):
lock = thread.allocate_lock()
lock.acquire()
gil = P_GIL.contents
P_GIL.contents = get_pointer_to_lock(lock)
try:
setcheckinterval(new_checkinterval)
reset_check_interval = False
yield True
finally:
P_GIL.contents = gil
else:
setcheckinterval(new_checkinterval)
reset_check_interval = False
yield True
finally:
if reset_check_interval:
setcheckinterval(new_checkinterval)
开发者ID:akruis,项目名称:bad_ideas,代码行数:25,代码来源:gil_hacks.py
示例8: diag_encoding
def diag_encoding(ct):
ct.h2("System Encodings")
s=" locale.getdefaultlocale(): "\
+ repr(locale.getdefaultlocale())
s+="\n sys.getdefaultencoding() : "+ sys.getdefaultencoding()
s+="\n sys.getfilesystemencoding() : " + sys.getfilesystemencoding()
s+="\n sys.stdout.encoding : "
try:
s+=str(sys.stdout.encoding)
except AttributeError:
s+=("(undefined)")
s+="\n sys.stdin.encoding : "
try:
s+=str(sys.stdin.encoding)
except AttributeError:
s+="(undefined)"
s+="\n sys.getcheckinterval() : %r " % sys.getcheckinterval()
s+="\n sys.getwindowsversion() : %r " % (sys.getwindowsversion(),)
s+="\n"
ct.pre(s)
开发者ID:BackupTheBerlios,项目名称:lino-svn,代码行数:25,代码来源:sysinfo.py
示例9: xdump
def xdump(path, show_scheme=True, show_data=True):
# print "query_res " + str(xquery_res)
xobj, scheme, ret_type = list_path(path)
if xobj is None:
return None
if ret_type == "DIR":
ret_fields = [['dir']]
for (son_dir_name, son_dir) in xobj.items():
ret_fields.append([add_cross_if_dir(son_dir_name, son_dir)])
return ret_fields
ret_fields = list()
if show_scheme:
ret_fields.append(list(scheme.keys()))
if ret_type == "LOGS":
ret_fields.extend(xobj)
return ret_fields
def_interval = sys.getcheckinterval()
# TODO: maybe copy before and no need to lock?
sys.setcheckinterval(1000000000)
try:
ret_fields.extend(decompose_fields(xobj,
show_scheme=False,
show_data=show_data))
except Exception as e:
raise e
finally:
sys.setcheckinterval(def_interval)
return ret_fields
开发者ID:kfir-drivenets,项目名称:pyxray,代码行数:28,代码来源:xnode.py
示例10: inject_jump
def inject_jump(self, where, dest):
"""
Monkeypatch bytecode at ``where`` to force it to jump to ``dest``.
Returns function which puts things back how they were.
"""
# We're about to do dangerous things to a functions code content.
# We can't make a lock to prevent the interpreter from using those
# bytes, so the best we can do is to set the check interval to be high
# and just pray that this keeps other threads at bay.
old_check_interval = sys.getcheckinterval()
sys.setcheckinterval(2**20)
pb = ctypes.pointer(self.ob_sval)
orig_bytes = [pb[where+i][0] for i in xrange(where)]
v = struct.pack("<BH", opcode.opmap["JUMP_ABSOLUTE"], dest)
# Overwrite code to cause it to jump to the target
for i in xrange(3):
pb[where+i][0] = ord(v[i])
def tidy_up():
"""
Put the bytecode back how it was. Good as new.
"""
sys.setcheckinterval(old_check_interval)
for i in xrange(3):
pb[where+i][0] = orig_bytes[i]
return tidy_up
开发者ID:akubera,项目名称:rootpy,代码行数:31,代码来源:magic.py
示例11: get_py_internals
def get_py_internals():
py_internals = []
if hasattr(sys, 'builtin_module_names'):
py_internals.append(
('Built-in Modules', ', '.join(sys.builtin_module_names)))
py_internals.append(('Byte Order', sys.byteorder + ' endian'))
if hasattr(sys, 'getcheckinterval'):
py_internals.append(('Check Interval', sys.getcheckinterval()))
if hasattr(sys, 'getfilesystemencoding'):
py_internals.append(
('File System Encoding', sys.getfilesystemencoding()))
max_integer_size = str(sys.maxsize) + ' (%s)' % \
hex(sys.maxsize).upper()
py_internals.append(('Maximum Integer Size', max_integer_size))
if hasattr(sys, 'getrecursionlimit'):
py_internals.append(('Maximum Recursion Depth',
sys.getrecursionlimit()))
if hasattr(sys, 'tracebacklimit'):
traceback_limit = sys.tracebacklimit
else:
traceback_limit = 1000
py_internals.append(('Maximum Traceback Limit', traceback_limit))
py_internals.append(('Maximum Code Point', sys.maxunicode))
return py_internals
开发者ID:lifenglifeng001,项目名称:flask-hello,代码行数:30,代码来源:pyinfo.py
示例12: __call__
def __call__(self, *args, **kwargs):
"""
Execute the callback.
"""
if self.finished:
# We're finished before we even started. The only sane reason for
# this is that the we were aborted, so check for for this, and if
# it's not the case, log an error.
if self.failed and self._exception[0] == InProgressAborted:
# Aborted, fine.
return
# This shouldn't happen. If it does, it's certainly an error
# condition. But as we are inside the thread now and already
# finished, we can't really raise an exception. So logging the
# error will have to suffice.
log.error('Attempting to start thread which has already finished')
if self._callback is None:
# Attempting to invoke multiple times? Shouldn't happen.
return None
try:
result = self._callback()
# Kludge alert: InProgressAborted gets raised asynchronously inside
# the thread. Assuming it doesn't inadvertently get cleared out
# by PyErr_Clear(), it may take up to check-interval bytecodes for
# it to trigger. So we do a dummy loop to chew up that many byte
# codes (roughly) to cause any pending async InProgressAborted to
# raise here, which we'll catch next. The overhead added by this
# loop is negligible. [About 10us on my system]
for i in xrange(sys.getcheckinterval()):
pass
except InProgressAborted:
# InProgressAborted was raised inside the thread (from the InProgress
# abort handler). This means we're already finished, so there's no
# need to do anything further.
pass
except:
# FIXME: should we really be catching KeyboardInterrupt and SystemExit?
MainThreadCallback(self.throw)(*sys.exc_info())
else:
if type(result) == types.GeneratorType or isinstance(result, InProgress):
# Looks like the callback is yielding something, or callback is a
# coroutine-decorated function. Not supported (yet?). In the
# case of coroutines, the first entry will execute in the
# thread, but subsequent entries (via the generator's next())
# will be from the mainthread, which is almost certainly _not_
# what is intended by threading a coroutine.
log.warning('NYI: coroutines cannot (yet) be executed in threads.')
# If we're finished, it means we were aborted, but probably caught the
# InProgressAborted inside the threaded callback. If so, we discard the
# return value from the callback, as we're considered finished. Otherwise
# finish up in the mainthread.
if not self.finished:
MainThreadCallback(self.finish)(result)
self._callback = None
开发者ID:jpmunz,项目名称:smartplayer,代码行数:59,代码来源:thread.py
示例13: short_checkinterval
def short_checkinterval(request):
"""
Sets a small interval using sys.setcheckinterval to cause many context
switches.
"""
old_interval = sys.getcheckinterval()
sys.setcheckinterval(0)
request.addfinalizer(lambda: sys.setcheckinterval(old_interval))
开发者ID:Bluehorn,项目名称:calltrace,代码行数:8,代码来源:test_current_frames.py
示例14: _collect
def _collect(self):
gc.collect()
check_interval = sys.getcheckinterval()
sys.setcheckinterval(sys.maxint)
try:
return {id(object) for object in gc.get_objects() if not isinstance(object, EXCLUDE_TYPES)}
finally:
sys.setcheckinterval(check_interval)
开发者ID:VDOMBoxGroup,项目名称:runtime2.0,代码行数:8,代码来源:snapshots.py
示例15: run
def run(self):
pthread_setname_np(self.ident, "Manhole ----")
client = self.client
client.settimeout(None)
pid, uid, gid = get_peercred(client)
euid = os.geteuid()
client_name = "PID:%s UID:%s GID:%s" % (pid, uid, gid)
if uid not in (0, euid):
raise SuspiciousClient(
"Can't accept client with %s. "
"It doesn't match the current EUID:%s or ROOT." % (
client_name, euid
))
cry("Accepted connection %s from %s" % (client, client_name))
pthread_setname_np(self.ident, "Manhole %s" % pid)
client.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 0)
client.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 0)
backup = []
try:
client_fd = client.fileno()
for mode, names in (
('w', (
'stderr',
'stdout',
'__stderr__',
'__stdout__'
)),
('r', (
'stdin',
'__stdin__'
))
):
for name in names:
backup.append((name, getattr(sys, name)))
setattr(sys, name, os.fdopen(client_fd, mode, 0))
run_repl()
cry("DONE.")
finally:
cry("Cleaning up.")
old_interval = sys.getcheckinterval()
sys.setcheckinterval(2147483647)
junk = [] # keep the old file objects alive for a bit
for name, fh in backup:
junk.append(getattr(sys, name))
setattr(sys, name, fh)
del backup
for fh in junk:
try:
fh.close()
except IOError:
pass
del fh
del junk
self.client = None
sys.setcheckinterval(old_interval)
开发者ID:knzm,项目名称:python-manhole,代码行数:58,代码来源:manhole.py
示例16: printCheckInterval
def printCheckInterval(self):
""" Get/set python check interval """
request = self.REQUEST
interval = request.get('interval', None)
if interval:
return sys.setcheckinterval(int(interval))
else:
return sys.getcheckinterval()
开发者ID:eea,项目名称:Products.EEAPloneAdmin,代码行数:9,代码来源:setupmethods.py
示例17: setUp
def setUp(self):
# Set a very small check interval, this will make it more likely
# that the interpreter crashes when threading is done incorrectly.
if sys.version_info[:2] >= (3, 2):
self._int = sys.getswitchinterval()
sys.setswitchinterval(0.0000001)
else:
self._int = sys.getcheckinterval()
sys.setcheckinterval(1)
开发者ID:aosm,项目名称:pyobjc,代码行数:9,代码来源:test_threading.py
示例18: test_trashcan_threads
def test_trashcan_threads(self):
# Issue #13992: trashcan mechanism should be thread-safe
NESTING = 60
N_THREADS = 2
def sleeper_gen():
"""A generator that releases the GIL when closed or dealloc'ed."""
try:
yield
finally:
time.sleep(0.000001)
class C(list):
# Appending to a list is atomic, which avoids the use of a lock.
inits = []
dels = []
def __init__(self, alist):
self[:] = alist
C.inits.append(None)
def __del__(self):
# This __del__ is called by subtype_dealloc().
C.dels.append(None)
# `g` will release the GIL when garbage-collected. This
# helps assert subtype_dealloc's behaviour when threads
# switch in the middle of it.
g = sleeper_gen()
next(g)
# Now that __del__ is finished, subtype_dealloc will proceed
# to call list_dealloc, which also uses the trashcan mechanism.
def make_nested():
"""Create a sufficiently nested container object so that the
trashcan mechanism is invoked when deallocating it."""
x = C([])
for i in range(NESTING):
x = [C([x])]
del x
def run_thread():
"""Exercise make_nested() in a loop."""
while not exit:
make_nested()
old_checkinterval = sys.getcheckinterval()
sys.setcheckinterval(3)
try:
exit = []
threads = []
for i in range(N_THREADS):
t = threading.Thread(target=run_thread)
threads.append(t)
with start_threads(threads, lambda: exit.append(1)):
time.sleep(1.0)
finally:
sys.setcheckinterval(old_checkinterval)
gc.collect()
self.assertEqual(len(C.inits), len(C.dels))
开发者ID:AtomicConductor,项目名称:conductor_client,代码行数:57,代码来源:test_gc.py
示例19: __unicode__
def __unicode__(self):
"""Return a serialized version of this tree/branch."""
ci = getcheckinterval()
setcheckinterval(0)
value = ''.join(self.render('utf8')).decode('utf8')
setcheckinterval(ci)
return value
开发者ID:marrow,项目名称:contentment,代码行数:10,代码来源:__init__.py
示例20: _empty_async
def _empty_async():
# run this after changing the signal handler away from the iptyhon 0.10
# one, in order to remove any possible queued PyThreadState_SetAsyncExc
# exception
# This function can be interrupted by any exception, but probably
# by KeyboardInterrupt
# We need to use the same inner loop as for test_async below
# for the default check interval of 100, this functions
# takes 5.5 us on reuletlab4
for i in range(sys.getcheckinterval()+50):
pass
开发者ID:CamilleBo,项目名称:pyHegel,代码行数:11,代码来源:kbint_util.py
注:本文中的sys.getcheckinterval函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论