本文整理汇总了Python中sys.setcheckinterval函数的典型用法代码示例。如果您正苦于以下问题:Python setcheckinterval函数的具体用法?Python setcheckinterval怎么用?Python setcheckinterval使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了setcheckinterval函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: evaluate
def evaluate(self,pop,force = 0):
#import tree
#print '1',tree.ref()
#only send the individuals out that need evaluation
if force:
_eval_list = pop.data
else:
_eval_list = filter(lambda x: not x.evaluated,pop)
#print '2',tree.ref()
eval_list = pop.clone()
#print '3',tree.ref()
eval_list.data = _eval_list
if len(eval_list):
Nserv = len(pop.server_list)
groups = divide_list(eval_list,Nserv)
#print '4',tree.ref()
sys.setcheckinterval(10)
finished = sync.event()
bar = sync.barrier(Nserv)
#print "EVAL LENGTH!!!", plen(pop.evaluator)
gr = groups[0]
print "GROUP LENGTH!!!", plen(groups[0]), len(gr),
#print "IND!!!", plen(gr[0]),plen(gr[0].root)
#print '4.5',tree.ref()
for i in range(len(groups)):
inputs = {'sub_pop':groups[i], 'evaluator':pop.evaluator, 'force':force}
returns = ('sub_pop',)
code = 'evaluator.evaluate(sub_pop,force)'
data_pack = (inputs,returns,code)
server = pop.server_list[i]
thread.start_new_thread(remote_thread_eval,(bar,finished,server,data_pack))
#print '7',tree.ref()
finished.wait()
sys.setcheckinterval(10)
开发者ID:mbentz80,项目名称:jzigbeercp,代码行数:34,代码来源:parallel_pop.py
示例2: test_string_slicing
def test_string_slicing(self):
def f(ct, passCt,chars):
x = "asdfasdf" * (ct / 8)
res = 0
for _ in xrange(passCt):
for ix in xrange(len(x)):
res = res + len(x[ix:ix+chars])
return res
self.evaluateWithExecutor(f, 1000000, 1, 2)
self.evaluateWithExecutor(f, 10000, 1, 2)
def runTest(func, name):
PerformanceTestReporter.PerfTest(name)(func)()
runTest(lambda: self.evaluateWithExecutor(f, 1000000, 10, 2), "pyfora.string_slicing_10mm.2_char_large_string.pyfora")
runTest(lambda: self.evaluateWithExecutor(f, 1000000, 10, 200), "pyfora.string_slicing_10mm.200_char_large_string.pyfora")
runTest(lambda: self.evaluateWithExecutor(f, 10000, 1000, 2), "pyfora.string_slicing_10mm.2_char_small_string.pyfora")
runTest(lambda: self.evaluateWithExecutor(f, 10000, 1000, 200), "pyfora.string_slicing_10mm.200_char_small_string.pyfora")
sys.setcheckinterval(100000)
runTest(lambda: f(1000000, 10, 2), "pyfora.string_slicing_10mm.2_char_large_string.native")
runTest(lambda: f(1000000, 10, 200), "pyfora.string_slicing_10mm.200_char_large_string.native")
runTest(lambda: f(10000, 1000, 2), "pyfora.string_slicing_10mm.2_char_small_string.native")
runTest(lambda: f(10000, 1000, 200), "pyfora.string_slicing_10mm.200_char_small_string.native")
sys.setcheckinterval(100)
开发者ID:WantonSoup,项目名称:ufora,代码行数:28,代码来源:StringTestCases.py
示例3: tidy_up
def tidy_up():
"""
Put the bytecode back how it was. Good as new.
"""
sys.setcheckinterval(old_check_interval)
for i in xrange(3):
pb[where+i][0] = orig_bytes[i]
开发者ID:akubera,项目名称:rootpy,代码行数:7,代码来源:magic.py
示例4: __init__
def __init__(self):
threading.Thread.__init__(self)
self.myOverlay = None
sys.setcheckinterval(25)
self.chanlist = ChannelList()
self.paused = False
self.fullUpdating = True
开发者ID:PseudoTV,项目名称:PseudoTV_Live,代码行数:7,代码来源:ChannelListThread.py
示例5: main
def main():
"""Main entry point for the application"""
# Prepare for mantid import
prepare_mantid_env()
# todo: parse command arguments
# general initialization
app = initialize()
# the default sys check interval leads to long lags
# when request scripts to be aborted
sys.setcheckinterval(SYSCHECK_INTERVAL)
main_window = None
try:
main_window = start_workbench(app)
except BaseException:
# We count this as a crash
import traceback
# This is type of thing we want to capture and have reports
# about. Prints to stderr as we can't really count on anything
# else
traceback.print_exc(file=ORIGINAL_STDERR)
if main_window is None:
# An exception occurred don't exit here
return
ORIGINAL_SYS_EXIT()
开发者ID:DanNixon,项目名称:mantid,代码行数:28,代码来源:mainwindow.py
示例6: setup
def setup(options):
sys.setcheckinterval(options.check_interval)
zope.app.appsetup.product.setProductConfigurations(
options.product_config)
options.eventlog()
options.accesslog()
for logger in options.loggers:
logger()
features = ('zserver',)
# Provide the devmode, if activated
if options.devmode:
features += ('devmode',)
logging.warning("Developer mode is enabled: this is a security risk "
"and should NOT be enabled on production servers. Developer mode "
"can be turned off in etc/zope.conf")
zope.app.appsetup.config(options.site_definition, features=features)
db = zope.app.appsetup.appsetup.multi_database(options.databases)[0][0]
notify(zope.processlifetime.DatabaseOpened(db))
task_dispatcher = ThreadedTaskDispatcher()
task_dispatcher.setThreadCount(options.threads)
for server in options.servers:
server.create(task_dispatcher, db)
notify(zope.processlifetime.ProcessStarting())
return db
开发者ID:zopefoundation,项目名称:zope.app.server,代码行数:33,代码来源:main.py
示例7: xdump
def xdump(path, show_scheme=True, show_data=True):
# print "query_res " + str(xquery_res)
xobj, scheme, ret_type = list_path(path)
if xobj is None:
return None
if ret_type == "DIR":
ret_fields = [['dir']]
for (son_dir_name, son_dir) in xobj.items():
ret_fields.append([add_cross_if_dir(son_dir_name, son_dir)])
return ret_fields
ret_fields = list()
if show_scheme:
ret_fields.append(list(scheme.keys()))
if ret_type == "LOGS":
ret_fields.extend(xobj)
return ret_fields
def_interval = sys.getcheckinterval()
# TODO: maybe copy before and no need to lock?
sys.setcheckinterval(1000000000)
try:
ret_fields.extend(decompose_fields(xobj,
show_scheme=False,
show_data=show_data))
except Exception as e:
raise e
finally:
sys.setcheckinterval(def_interval)
return ret_fields
开发者ID:kfir-drivenets,项目名称:pyxray,代码行数:28,代码来源:xnode.py
示例8: test_string_slicing_into_vector
def test_string_slicing_into_vector(self):
def testFunction(ct, passCt,chars):
x = "asdfasdf" * (ct / 8)
res = 0
for _ in xrange(passCt):
v = [x[ix*chars:ix*chars+chars] for ix in xrange(len(x) / chars)]
for e in v:
res = res + len(e)
return res
f = testFunction
self.evaluateWithExecutor(f, 1000000, 1, 2)
self.evaluateWithExecutor(f, 10000, 1, 2)
def runTest(func, name):
PerformanceTestReporter.PerfTest(name)(func)()
runTest(lambda: self.evaluateWithExecutor(f, 1000000, 10, 2), "pyfora.string_slicing_into_vector_10mm.2_char_large_string.pyfora")
runTest(lambda: self.evaluateWithExecutor(f, 1000000, 1000, 200), "pyfora.string_slicing_into_vector_10mm.200_char_large_string.pyfora")
runTest(lambda: self.evaluateWithExecutor(f, 10000, 1000, 2), "pyfora.string_slicing_into_vector_10mm.2_char_small_string.pyfora")
runTest(lambda: self.evaluateWithExecutor(f, 10000, 100000, 200), "pyfora.string_slicing_into_vector_10mm.200_char_small_string.pyfora")
sys.setcheckinterval(100000)
runTest(lambda: f(1000000, 10, 2), "pyfora.string_slicing_into_vector_10mm.2_char_large_string.native")
runTest(lambda: f(1000000, 1000, 200), "pyfora.string_slicing_into_vector_10mm.200_char_large_string.native")
runTest(lambda: f(10000, 1000, 2), "pyfora.string_slicing_into_vector_10mm.2_char_small_string.native")
runTest(lambda: f(10000, 100000, 200), "pyfora.string_slicing_into_vector_10mm.200_char_small_string.native")
sys.setcheckinterval(100)
开发者ID:WantonSoup,项目名称:ufora,代码行数:30,代码来源:StringTestCases.py
示例9: __init__
def __init__(self):
threading.Thread.__init__(self)
self.myOverlay = None
self.shouldExit = False
sys.setcheckinterval(25)
self.chanlist = ChannelList()
self.chanlist.sleepTime = 0.1
开发者ID:PseudoTV,项目名称:PseudoTV_Archive,代码行数:7,代码来源:ChannelListThread.py
示例10: inject_jump
def inject_jump(self, where, dest):
"""
Monkeypatch bytecode at ``where`` to force it to jump to ``dest``.
Returns function which puts things back how they were.
"""
# We're about to do dangerous things to a functions code content.
# We can't make a lock to prevent the interpreter from using those
# bytes, so the best we can do is to set the check interval to be high
# and just pray that this keeps other threads at bay.
old_check_interval = sys.getcheckinterval()
sys.setcheckinterval(2**20)
pb = ctypes.pointer(self.ob_sval)
orig_bytes = [pb[where+i][0] for i in xrange(where)]
v = struct.pack("<BH", opcode.opmap["JUMP_ABSOLUTE"], dest)
# Overwrite code to cause it to jump to the target
for i in xrange(3):
pb[where+i][0] = ord(v[i])
def tidy_up():
"""
Put the bytecode back how it was. Good as new.
"""
sys.setcheckinterval(old_check_interval)
for i in xrange(3):
pb[where+i][0] = orig_bytes[i]
return tidy_up
开发者ID:akubera,项目名称:rootpy,代码行数:31,代码来源:magic.py
示例11: test_setcheckinterval
def test_setcheckinterval(self):
import sys
raises(TypeError, sys.setcheckinterval)
orig = sys.getcheckinterval()
for n in 0, 100, 120, orig: # orig last to restore starting state
sys.setcheckinterval(n)
assert sys.getcheckinterval() == n
开发者ID:yuyichao,项目名称:pypy,代码行数:7,代码来源:test_sysmodule.py
示例12: preprocessPercentileRatios
def preprocessPercentileRatios(self):
print "preprocessPercentileRatios start"
distributionsFile = self.getDatasetSlidingSizesFile()
if os.path.isfile(distributionsFile):
#Teh distributions file is processed
print "The distribution file exists"
return
self.initializePropertiesComputeStructures(False)
print "computing the ratios"
try:
zpa = zipfile.ZipFile(distributionsFile,"w",zipfile.ZIP_DEFLATED)
zpa.writestr("dummy.txt","dummy file")
zpa.close()
self.ziplock = threading.Lock()
# Create a pool with three worker threads
pool = ThreadPool.ThreadPool(5)
sys.setcheckinterval(1000)
for windowSize in self.slidingWindowSizes:
if self.useWindowThreading:
pool.queueTask(self.preprocessWindowSize, windowSize, None)
else:
self.preprocessWindowSize(windowSize)
# When all tasks are finished, allow the threads to terminate
if self.useWindowThreading:
print "Joining the threads"
pool.joinAll()
except:
os.unlink(distributionsFile)
raise
print "preprocessPercentileRatios end"
开发者ID:kosio,项目名称:EpiExplorer,代码行数:31,代码来源:NIHEpigenomeWig.py
示例13: test_setcheckinterval
def test_setcheckinterval(self):
with warnings.catch_warnings():
warnings.simplefilter("ignore")
self.assertRaises(TypeError, sys.setcheckinterval)
orig = sys.getcheckinterval()
for n in 0, 100, 120, orig: # orig last to restore starting state
sys.setcheckinterval(n)
self.assertEqual(sys.getcheckinterval(), n)
开发者ID:MaximVanyushkin,项目名称:Sharp.RemoteQueryable,代码行数:8,代码来源:test_sys.py
示例14: _collect
def _collect(self):
gc.collect()
check_interval = sys.getcheckinterval()
sys.setcheckinterval(sys.maxint)
try:
return {id(object) for object in gc.get_objects() if not isinstance(object, EXCLUDE_TYPES)}
finally:
sys.setcheckinterval(check_interval)
开发者ID:VDOMBoxGroup,项目名称:runtime2.0,代码行数:8,代码来源:snapshots.py
示例15: test_setcheckinterval
def test_setcheckinterval(self):
if test.test_support.due_to_ironpython_bug("http://tkbgitvstfat01:8080/WorkItemTracking/WorkItem.aspx?artifactMoniker=148342"):
return
self.assertRaises(TypeError, sys.setcheckinterval)
orig = sys.getcheckinterval()
for n in 0, 100, 120, orig: # orig last to restore starting state
sys.setcheckinterval(n)
self.assertEquals(sys.getcheckinterval(), n)
开发者ID:BillyboyD,项目名称:main,代码行数:8,代码来源:test_sys.py
示例16: doStatProf
def doStatProf(*args):
import bench.statprof
sys.setcheckinterval(0)
bench.statprof.start()
runLimited(*args)
bench.statprof.stop()
bench.statprof.display()
开发者ID:siebenmann,项目名称:dwiki,代码行数:8,代码来源:testbench.py
示例17: short_checkinterval
def short_checkinterval(request):
"""
Sets a small interval using sys.setcheckinterval to cause many context
switches.
"""
old_interval = sys.getcheckinterval()
sys.setcheckinterval(0)
request.addfinalizer(lambda: sys.setcheckinterval(old_interval))
开发者ID:Bluehorn,项目名称:calltrace,代码行数:8,代码来源:test_current_frames.py
示例18: run
def run(self):
pthread_setname_np(self.ident, "Manhole ----")
client = self.client
client.settimeout(None)
pid, uid, gid = get_peercred(client)
euid = os.geteuid()
client_name = "PID:%s UID:%s GID:%s" % (pid, uid, gid)
if uid not in (0, euid):
raise SuspiciousClient(
"Can't accept client with %s. "
"It doesn't match the current EUID:%s or ROOT." % (
client_name, euid
))
cry("Accepted connection %s from %s" % (client, client_name))
pthread_setname_np(self.ident, "Manhole %s" % pid)
client.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 0)
client.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 0)
backup = []
try:
client_fd = client.fileno()
for mode, names in (
('w', (
'stderr',
'stdout',
'__stderr__',
'__stdout__'
)),
('r', (
'stdin',
'__stdin__'
))
):
for name in names:
backup.append((name, getattr(sys, name)))
setattr(sys, name, os.fdopen(client_fd, mode, 0))
run_repl()
cry("DONE.")
finally:
cry("Cleaning up.")
old_interval = sys.getcheckinterval()
sys.setcheckinterval(2147483647)
junk = [] # keep the old file objects alive for a bit
for name, fh in backup:
junk.append(getattr(sys, name))
setattr(sys, name, fh)
del backup
for fh in junk:
try:
fh.close()
except IOError:
pass
del fh
del junk
self.client = None
sys.setcheckinterval(old_interval)
开发者ID:knzm,项目名称:python-manhole,代码行数:58,代码来源:manhole.py
示例19: thread_func
def thread_func(lock, acquire_array, lock_array, index):
gevent.sleep(random.uniform(0, 0.1))
# We use the fact that the GIL prevents CPython from context switching between the acquire_array.append()
# and the lock.acquire() when we set the check interval to a great value
sys.setcheckinterval(10000000)
acquire_array.append(index)
with lock:
sys.setcheckinterval(1)
lock_array.append(index)
开发者ID:yalon,项目名称:gevent,代码行数:9,代码来源:test__native_friendly_rlock.py
示例20: test_trashcan_threads
def test_trashcan_threads(self):
# Issue #13992: trashcan mechanism should be thread-safe
NESTING = 60
N_THREADS = 2
def sleeper_gen():
"""A generator that releases the GIL when closed or dealloc'ed."""
try:
yield
finally:
time.sleep(0.000001)
class C(list):
# Appending to a list is atomic, which avoids the use of a lock.
inits = []
dels = []
def __init__(self, alist):
self[:] = alist
C.inits.append(None)
def __del__(self):
# This __del__ is called by subtype_dealloc().
C.dels.append(None)
# `g` will release the GIL when garbage-collected. This
# helps assert subtype_dealloc's behaviour when threads
# switch in the middle of it.
g = sleeper_gen()
next(g)
# Now that __del__ is finished, subtype_dealloc will proceed
# to call list_dealloc, which also uses the trashcan mechanism.
def make_nested():
"""Create a sufficiently nested container object so that the
trashcan mechanism is invoked when deallocating it."""
x = C([])
for i in range(NESTING):
x = [C([x])]
del x
def run_thread():
"""Exercise make_nested() in a loop."""
while not exit:
make_nested()
old_checkinterval = sys.getcheckinterval()
sys.setcheckinterval(3)
try:
exit = []
threads = []
for i in range(N_THREADS):
t = threading.Thread(target=run_thread)
threads.append(t)
with start_threads(threads, lambda: exit.append(1)):
time.sleep(1.0)
finally:
sys.setcheckinterval(old_checkinterval)
gc.collect()
self.assertEqual(len(C.inits), len(C.dels))
开发者ID:AtomicConductor,项目名称:conductor_client,代码行数:57,代码来源:test_gc.py
注:本文中的sys.setcheckinterval函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论