本文整理汇总了Python中tracemalloc.take_snapshot函数的典型用法代码示例。如果您正苦于以下问题:Python take_snapshot函数的具体用法?Python take_snapshot怎么用?Python take_snapshot使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了take_snapshot函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_memory_usage_coordinates
def test_memory_usage_coordinates():
"""
Watch out for high memory usage on huge spatial files
"""
ntf = tempfile.NamedTemporaryFile()
tracemalloc.start()
snap1 = tracemalloc.take_snapshot()
# create a "flat" cube
cube,_ = utilities.generate_gaussian_cube(shape=[1,2000,2000])
sz = _.dtype.itemsize
snap1b = tracemalloc.take_snapshot()
diff = snap1b.compare_to(snap1, 'lineno')
diffvals = np.array([dd.size_diff for dd in diff])
# at this point, the generated cube should still exist in memory
assert diffvals.max()*u.B >= 2000**2*sz*u.B
del _
snap2 = tracemalloc.take_snapshot()
diff = snap2.compare_to(snap1b, 'lineno')
assert diff[0].size_diff*u.B < -0.3*u.MB
print(cube)
# printing the cube should not occupy any more memory
# (it will allocate a few bytes for the cache, but should *not*
# load the full 2000x2000 coordinate arrays for RA, Dec
snap3 = tracemalloc.take_snapshot()
diff = snap3.compare_to(snap2, 'lineno')
assert sum([dd.size_diff for dd in diff])*u.B < 100*u.kB
开发者ID:e-koch,项目名称:spectral-cube,代码行数:34,代码来源:test_performance.py
示例2: req_debug_handler
def req_debug_handler(self):
global mem_stat
req = urlparse.urlparse(self.path).query
reqs = urlparse.parse_qs(req, keep_blank_values=True)
try:
import tracemalloc
import gc
gc.collect()
if not mem_stat or "reset" in reqs:
mem_stat = tracemalloc.take_snapshot()
snapshot = tracemalloc.take_snapshot()
if "compare" in reqs:
top_stats = snapshot.compare_to(mem_stat, 'traceback')
else:
top_stats = snapshot.statistics('traceback')
python_lib = os.path.join(root_path, "python27")
dat = ""
for stat in top_stats[:100]:
print("%s memory blocks: %.1f KiB" % (stat.count, stat.size / 1024))
lines = stat.traceback.format()
ll = "\n".join(lines)
ln = len(lines)
pl = ""
for i in xrange(ln, 0, -1):
line = lines[i - 1]
print(line)
if line[8:].startswith(python_lib):
break
if not line.startswith(" File"):
pl = line
continue
if not line[8:].startswith(root_path):
break
ll = line[8:] + "\n" + pl
if ll[0] == "[":
pass
dat += "%d KB, count:%d %s\n" % (stat.size / 1024, stat.count, ll)
if hasattr(threading, "_start_trace"):
dat += "\n\nThread stat:\n"
for path in threading._start_trace:
n = threading._start_trace[path]
if n <= 1:
continue
dat += "%s => %d\n\n" % (path, n)
self.send_response("text/plain", dat)
except Exception as e:
xlog.exception("debug:%r", e)
self.send_response("text/html", "no mem_top")
开发者ID:DMJackZ,项目名称:XX-Net,代码行数:58,代码来源:web_control.py
示例3: main
def main():
# Parse command line options
parser = argparse.ArgumentParser()
parser.add_argument('input_file', help='input ninja file')
parser.add_argument('--encoding', default='utf-8',
help='ninja file encoding')
parser.add_argument('--ninja-deps', help='.ninja_deps file')
args = parser.parse_args()
if DEBUG_ALLOC:
tracemalloc.start(25)
tc_start = tracemalloc.take_snapshot()
# Parse ninja file
manifest = Parser().parse(args.input_file, args.encoding, args.ninja_deps)
if DEBUG_ALLOC:
tc_end = tracemalloc.take_snapshot()
for rule in manifest.rules:
print('rule', rule.name)
for build in manifest.builds:
print('build')
for path in build.explicit_outs:
print(' explicit_out:', path)
for path in build.implicit_outs:
print(' implicit_out:', path)
for path in build.explicit_ins:
print(' explicit_in:', path)
for path in build.implicit_ins:
print(' implicit_in:', path)
for path in build.prerequisites:
print(' prerequisites:', path)
for path in build.depfile_implicit_ins:
print(' depfile_implicit_in:', path)
for pool in manifest.pools:
print('pool', pool.name)
for default in manifest.defaults:
print('default')
for path in default.outs:
print(' out:', path)
if DEBUG_ALLOC:
top_stats = tc_end.compare_to(tc_start, 'traceback')
with open('tracemalloc.log', 'w') as fp:
for s in top_stats:
print('', file=fp)
print('========================================', file=fp)
print(s, file=fp)
for line in s.traceback.format():
print(line, file=fp)
开发者ID:endian11,项目名称:platform_development,代码行数:54,代码来源:ninja.py
示例4: measure_memory_diff
def measure_memory_diff(self, func):
import tracemalloc
tracemalloc.start()
try:
before = tracemalloc.take_snapshot()
# Keep the result and only delete it after taking a snapshot
res = func()
after = tracemalloc.take_snapshot()
del res
return after.compare_to(before, 'lineno')
finally:
tracemalloc.stop()
开发者ID:Alexhuszagh,项目名称:numba,代码行数:12,代码来源:test_nrt.py
示例5: flush_and_send
def flush_and_send(self, cursor = None, wal_end = None):
if os.getenv("DEBUG_MODE") == "yes":
snapshot = tracemalloc.take_snapshot()
display_top(snapshot)
if len(self.batch_msgs) > 0:
comp_batch_msgs = zlib.compress(json.dumps(self.batch_msgs).encode('utf-8'))
if os.getenv("DEBUG_MODE") == "yes" or os.getenv("ROW_LEVEL_MODE") == "yes":
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + " : sending batch, length: " + str(len(self.batch_msgs)))
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + " : uncompressed batch, length: " + str(len(json.dumps(self.batch_msgs))))
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + " : compressed batch, length: " + str(len(comp_batch_msgs)))
sending_not_successfull = True
while sending_not_successfull:
try:
requests.post(self.rest_api_endpoint, headers=self.headers, data=comp_batch_msgs)
sending_not_successfull = False
except requests.exceptions.ConnectionError:
print("can't send POST Request. Retrying in 2 seconds ...")
time.sleep(2)
if os.getenv("DEBUG_MODE") == "yes" or os.getenv("ROW_LEVEL_MODE") == "yes":
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + " : sending batch successfull ...")
self.batch_msgs.clear()
if cursor is not None and wal_end is not None:
try:
if os.getenv("DEBUG_MODE") == "yes" or os.getenv("ROW_LEVEL_MODE") == "yes":
print("syncing with pg server single")
cursor.send_replication_feedback(flush_lsn=wal_end)
except AttributeError:
print("cannot sync wal_log, since no valuable cursor was provided ...")
signal.alarm(TIMEOUT)
开发者ID:zalando-incubator,项目名称:saiki-banyu,代码行数:31,代码来源:LogicalWriter.py
示例6: memory_obj
def memory_obj(args: tuple, packet: ircp.Packet, ___: dict):
""" Print the biggest memory hogs """
if not _IS_TRACING:
return packet.notice(
"Sorry, but tracing is currently disabled. "
'Please restart probot with the "PYTHONTRACEMALLOC=NFRAME" '
"environment variable."
)
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics("filename")
num = 0
if len(args) >= 2:
try:
num = int(args[1])
except ValueError:
return packet.notice("Your argument must be an integer")
else:
return packet.notice("You must specify an object to inspect!")
if len(top_stats) >= num:
output = [packet.notice("Memory hog #{}".format(num))]
obj = top_stats[num]
trace = tracemalloc.get_object_traceback(obj)
for line in trace:
output.append(packet.notice(line))
return output
else:
return packet.notice("Sorry, but that object does not exist")
开发者ID:camconn,项目名称:probot,代码行数:30,代码来源:stats.py
示例7: wrapper
def wrapper(*args, **kwargs):
import sys
do_prof, do_tracemalloc, do_traceopen = 3 * [False]
if len(sys.argv) > 1:
do_prof = sys.argv[1] == "prof"
do_tracemalloc = sys.argv[1] == "tracemalloc"
do_traceopen = sys.argv[1] == "traceopen"
if do_prof or do_tracemalloc or do_traceopen: sys.argv.pop(1)
if do_prof:
print("Entering profiling mode...")
import pstats, cProfile, tempfile
prof_file = kwargs.pop("prof_file", None)
if prof_file is None:
_, prof_file = tempfile.mkstemp()
print("Profiling data stored in %s" % prof_file)
sortby = kwargs.pop("sortby", "time")
cProfile.runctx("main()", globals(), locals(), prof_file)
s = pstats.Stats(prof_file)
s.strip_dirs().sort_stats(sortby).print_stats()
return 0
elif do_tracemalloc:
print("Entering tracemalloc mode...")
# Requires py3.4
try:
import tracemalloc
except ImportError:
print("Error while trying to import tracemalloc (requires py3.4)")
raise SystemExit(1)
tracemalloc.start()
retcode = main(*args, **kwargs)
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
n = min(len(top_stats), 20)
print("[Top %d]" % n)
for stat in top_stats[:20]:
print(stat)
elif do_traceopen:
try:
import psutil
except ImportError:
print("traceopen requires psutil module")
raise SystemExit(1)
import os
p = psutil.Process(os.getpid())
retcode = main(*args, **kwargs)
print("open_files", p.open_files())
else:
retcode = main(*args, **kwargs)
return retcode
开发者ID:gmatteo,项目名称:monty,代码行数:58,代码来源:functools.py
示例8: sigterm_handler
def sigterm_handler(signal, frame):
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
if tracemallocfile:
with open(tracemallocfile, 'w+') as f:
f.write("SIGTERM tracemalloc before shutdown [ Top 10 ]\n")
for stat in top_stats[:10]:
f.write("{}\n".format(stat))
sys.exit(0)
开发者ID:badgley,项目名称:luigi,代码行数:9,代码来源:slurm_runner.py
示例9: display_biggest_traceback
def display_biggest_traceback():
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('traceback')
# pick the biggest memory block
stat = top_stats[0]
print("%s memory blocks: %.1f KiB" % (stat.count, stat.size / 1024))
for line in stat.traceback.format():
print(line)
开发者ID:ekw,项目名称:cpppo,代码行数:9,代码来源:history_test.py
示例10: tracemalloc_tool
def tracemalloc_tool():
# .. cross-platform but but requires Python 3.4 or higher ..
stat = next(filter(lambda item: str(item).startswith(filename),
tracemalloc.take_snapshot().statistics('filename')))
mem = stat.size / _TWO_20
if timestamps:
return mem, time.time()
else:
return mem
开发者ID:bbengfort,项目名称:memory_profiler,代码行数:9,代码来源:memory_profiler.py
示例11: take_snapshots
def take_snapshots():
all_snapshots = []
for loop in range(NGET_SNAPSHOT):
objs = [alloc_object() for index in range(NOBJECTS)]
snapshot = tracemalloc.take_snapshot()
objs = None
all_snapshots.append(snapshot)
snapshots = None
all_snapshots = None
开发者ID:haypo,项目名称:pytracemalloc,代码行数:9,代码来源:bench.py
示例12: test
def test(self):
timings = []
memory_usage = []
tracemalloc.start()
for i in range(self.repeat):
before_memory = tracemalloc.take_snapshot()
start_time = time.time()
self.bench()
end_time = time.time()
after_memory = tracemalloc.take_snapshot()
timings.append(end_time - start_time)
memory_usage.append(sum([t.size for t in after_memory.compare_to(before_memory, 'filename')]))
print("time min:", min(timings), "max:", max(timings), "avg:", sum(timings) / len(timings)) # NOQA
print("memory min:", min(memory_usage), "max:", max(memory_usage), "avg:", sum(memory_usage) / len(memory_usage)) # NOQA
开发者ID:BertrandBordage,项目名称:wagtail,代码行数:18,代码来源:benchmark.py
示例13: take_snapshot
def take_snapshot():
snapshot = tracemalloc.take_snapshot()
return snapshot.filter_traces(
(
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
tracemalloc.Filter(False, "tracemalloc"),
tracemalloc.Filter(False, "<unknown>"),
)
)
开发者ID:podhmo,项目名称:individual-sandbox,代码行数:9,代码来源:02loop.py
示例14: test_snapshot_save_attr
def test_snapshot_save_attr(self):
# take a snapshot with a new attribute
snapshot = tracemalloc.take_snapshot()
snapshot.test_attr = "new"
snapshot.dump(support.TESTFN)
self.addCleanup(support.unlink, support.TESTFN)
# load() should recreate the attribute
snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
self.assertEqual(snapshot2.test_attr, "new")
开发者ID:asvetlov,项目名称:cpython,代码行数:10,代码来源:test_tracemalloc.py
示例15: test_does_not_leak_too_much
def test_does_not_leak_too_much(self):
tracemalloc.start()
gc.collect()
series = []
snapshot1 = tracemalloc.take_snapshot()
for i in range(100):
try:
execute_script(self.feature, self)
except Exception:
pass
gc.collect()
snapshot2 = tracemalloc.take_snapshot()
stats = snapshot2.compare_to(snapshot1, "lineno")
snapshot1 = snapshot2
series.append(sum(stat.size / 1024 for stat in stats))
tracemalloc.stop()
series = series[1:] # ignore first run, which creates regex
cv = statistics.stdev(series) / statistics.mean(series)
assert cv < 0.1
开发者ID:kidosoft,项目名称:Morelia,代码行数:19,代码来源:test_traceback_hiding.py
示例16: _stop_memory_tracing
def _stop_memory_tracing():
try:
import tracemalloc
except ImportError:
return
snapshot = tracemalloc.take_snapshot()
_log_memory_top(snapshot)
tracemalloc.stop()
开发者ID:lu-zero,项目名称:aqualid,代码行数:11,代码来源:aql_main.py
示例17: print_malloc_context
def print_malloc_context(**kwargs):
"""
:param \**kwargs: see print_malloc_snapshot
"""
if tracemalloc is None:
logger.error('tracemalloc required')
return
tracemalloc.start()
yield
snapshot = tracemalloc.take_snapshot()
print_malloc_snapshot(snapshot, **kwargs)
开发者ID:maurov,项目名称:pymca,代码行数:11,代码来源:ProfilingUtils.py
示例18: test_leak_in_transport
async def test_leak_in_transport(zipkin_url, client, loop):
tracemalloc.start()
endpoint = az.create_endpoint('simple_service')
tracer = await az.create(zipkin_url, endpoint, sample_rate=1,
send_interval=0.0001, loop=loop)
await asyncio.sleep(5)
gc.collect()
snapshot1 = tracemalloc.take_snapshot()
await asyncio.sleep(10)
gc.collect()
snapshot2 = tracemalloc.take_snapshot()
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
count = sum(s.count for s in top_stats)
await tracer.close()
assert count < 400 # in case of leak this number is around 901452
开发者ID:nicholasamorim,项目名称:aiozipkin,代码行数:20,代码来源:test_zipkin.py
示例19: test_snapshot
def test_snapshot(self):
obj, source = allocate_bytes(123)
# take a snapshot
snapshot = tracemalloc.take_snapshot()
# write on disk
snapshot.dump(support.TESTFN)
self.addCleanup(support.unlink, support.TESTFN)
# load from disk
snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
self.assertEqual(snapshot2.traces, snapshot.traces)
# tracemalloc must be tracing memory allocations to take a snapshot
tracemalloc.stop()
with self.assertRaises(RuntimeError) as cm:
tracemalloc.take_snapshot()
self.assertEqual(str(cm.exception),
"the tracemalloc module must be tracing memory "
"allocations to take a snapshot")
开发者ID:asvetlov,项目名称:cpython,代码行数:21,代码来源:test_tracemalloc.py
示例20: dump_tracemalloc
def dump_tracemalloc():
"""
Dumps memory usage information to file
"""
gc.collect()
snapshot = tracemalloc.take_snapshot()
output_file = PROFILING_OUTPUT_FMT % get_filename_fmt()
with open(output_file, 'wb') as fp:
cPickle.dump(snapshot, fp, 2)
# Make sure the snapshot goes away
snapshot = None
开发者ID:0x554simon,项目名称:w3af,代码行数:13,代码来源:pytracemalloc.py
注:本文中的tracemalloc.take_snapshot函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论