• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python tracemalloc.take_snapshot函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tracemalloc.take_snapshot函数的典型用法代码示例。如果您正苦于以下问题:Python take_snapshot函数的具体用法?Python take_snapshot怎么用?Python take_snapshot使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了take_snapshot函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_memory_usage_coordinates

def test_memory_usage_coordinates():
    """
    Watch out for high memory usage on huge spatial files
    """

    ntf = tempfile.NamedTemporaryFile()

    tracemalloc.start()

    snap1 = tracemalloc.take_snapshot()

    # create a "flat" cube
    cube,_ = utilities.generate_gaussian_cube(shape=[1,2000,2000])
    sz = _.dtype.itemsize

    snap1b = tracemalloc.take_snapshot()
    diff = snap1b.compare_to(snap1, 'lineno')
    diffvals = np.array([dd.size_diff for dd in diff])
    # at this point, the generated cube should still exist in memory
    assert diffvals.max()*u.B >= 2000**2*sz*u.B

    del _
    snap2 = tracemalloc.take_snapshot()
    diff = snap2.compare_to(snap1b, 'lineno')
    assert diff[0].size_diff*u.B < -0.3*u.MB

    print(cube)

    # printing the cube should not occupy any more memory
    # (it will allocate a few bytes for the cache, but should *not*
    # load the full 2000x2000 coordinate arrays for RA, Dec
    snap3 = tracemalloc.take_snapshot()
    diff = snap3.compare_to(snap2, 'lineno')
    assert sum([dd.size_diff for dd in diff])*u.B < 100*u.kB
开发者ID:e-koch,项目名称:spectral-cube,代码行数:34,代码来源:test_performance.py


示例2: req_debug_handler

    def req_debug_handler(self):
        global mem_stat
        req = urlparse.urlparse(self.path).query
        reqs = urlparse.parse_qs(req, keep_blank_values=True)

        try:
            import tracemalloc
            import gc
            gc.collect()

            if not mem_stat or "reset" in reqs:
                mem_stat = tracemalloc.take_snapshot()

            snapshot = tracemalloc.take_snapshot()

            if "compare" in reqs:
                top_stats = snapshot.compare_to(mem_stat, 'traceback')
            else:
                top_stats = snapshot.statistics('traceback')

            python_lib = os.path.join(root_path, "python27")

            dat = ""
            for stat in top_stats[:100]:
                print("%s memory blocks: %.1f KiB" % (stat.count, stat.size / 1024))
                lines = stat.traceback.format()
                ll = "\n".join(lines)
                ln = len(lines)
                pl = ""
                for i in xrange(ln, 0, -1):
                    line = lines[i - 1]
                    print(line)
                    if line[8:].startswith(python_lib):
                        break
                    if not line.startswith("  File"):
                        pl = line
                        continue
                    if not line[8:].startswith(root_path):
                        break
                    ll = line[8:] + "\n" + pl

                if ll[0] == "[":
                    pass

                dat += "%d KB, count:%d %s\n" % (stat.size / 1024, stat.count, ll)

            if hasattr(threading, "_start_trace"):
                dat += "\n\nThread stat:\n"
                for path in threading._start_trace:
                    n = threading._start_trace[path]
                    if n <= 1:
                        continue
                    dat += "%s => %d\n\n" % (path, n)

            self.send_response("text/plain", dat)
        except Exception as e:
            xlog.exception("debug:%r", e)
            self.send_response("text/html", "no mem_top")
开发者ID:DMJackZ,项目名称:XX-Net,代码行数:58,代码来源:web_control.py


示例3: main

def main():
    # Parse command line options
    parser = argparse.ArgumentParser()
    parser.add_argument('input_file', help='input ninja file')
    parser.add_argument('--encoding', default='utf-8',
                        help='ninja file encoding')
    parser.add_argument('--ninja-deps', help='.ninja_deps file')
    args = parser.parse_args()

    if DEBUG_ALLOC:
        tracemalloc.start(25)
        tc_start = tracemalloc.take_snapshot()

    # Parse ninja file
    manifest = Parser().parse(args.input_file, args.encoding, args.ninja_deps)

    if DEBUG_ALLOC:
        tc_end = tracemalloc.take_snapshot()

    for rule in manifest.rules:
        print('rule', rule.name)

    for build in manifest.builds:
        print('build')
        for path in build.explicit_outs:
            print('  explicit_out:', path)
        for path in build.implicit_outs:
            print('  implicit_out:', path)
        for path in build.explicit_ins:
            print('  explicit_in:', path)
        for path in build.implicit_ins:
            print('  implicit_in:', path)
        for path in build.prerequisites:
            print('  prerequisites:', path)
        for path in build.depfile_implicit_ins:
            print('  depfile_implicit_in:', path)

    for pool in manifest.pools:
        print('pool', pool.name)

    for default in manifest.defaults:
        print('default')
        for path in default.outs:
            print('  out:', path)

    if DEBUG_ALLOC:
        top_stats = tc_end.compare_to(tc_start, 'traceback')
        with open('tracemalloc.log', 'w') as fp:
            for s in top_stats:
                print('', file=fp)
                print('========================================', file=fp)
                print(s, file=fp)
                for line in s.traceback.format():
                    print(line, file=fp)
开发者ID:endian11,项目名称:platform_development,代码行数:54,代码来源:ninja.py


示例4: measure_memory_diff

 def measure_memory_diff(self, func):
     import tracemalloc
     tracemalloc.start()
     try:
         before = tracemalloc.take_snapshot()
         # Keep the result and only delete it after taking a snapshot
         res = func()
         after = tracemalloc.take_snapshot()
         del res
         return after.compare_to(before, 'lineno')
     finally:
         tracemalloc.stop()
开发者ID:Alexhuszagh,项目名称:numba,代码行数:12,代码来源:test_nrt.py


示例5: flush_and_send

 def flush_and_send(self, cursor = None, wal_end = None):
     if os.getenv("DEBUG_MODE") == "yes":
         snapshot = tracemalloc.take_snapshot()
         display_top(snapshot)
     if len(self.batch_msgs) > 0:
         comp_batch_msgs = zlib.compress(json.dumps(self.batch_msgs).encode('utf-8'))
         if os.getenv("DEBUG_MODE") == "yes" or os.getenv("ROW_LEVEL_MODE") == "yes":
             print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + " : sending batch, length: " + str(len(self.batch_msgs)))
             print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + " : uncompressed batch, length: " + str(len(json.dumps(self.batch_msgs))))
             print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + " : compressed batch, length: " + str(len(comp_batch_msgs)))
         sending_not_successfull = True
         while sending_not_successfull:
             try:
                 requests.post(self.rest_api_endpoint, headers=self.headers, data=comp_batch_msgs)
                 sending_not_successfull = False
             except requests.exceptions.ConnectionError:
                 print("can't send POST Request. Retrying in 2 seconds ...")
                 time.sleep(2)
         if os.getenv("DEBUG_MODE") == "yes" or os.getenv("ROW_LEVEL_MODE") == "yes":
             print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + " : sending batch successfull ...")
             
         self.batch_msgs.clear()
         
         if cursor is not None and wal_end is not None:
             try:
                 if os.getenv("DEBUG_MODE") == "yes" or os.getenv("ROW_LEVEL_MODE") == "yes":
                     print("syncing with pg server single")
                     cursor.send_replication_feedback(flush_lsn=wal_end)
             except AttributeError:
                 print("cannot sync wal_log, since no valuable cursor was provided ...")
         signal.alarm(TIMEOUT)
开发者ID:zalando-incubator,项目名称:saiki-banyu,代码行数:31,代码来源:LogicalWriter.py


示例6: memory_obj

def memory_obj(args: tuple, packet: ircp.Packet, ___: dict):
    """ Print the biggest memory hogs """
    if not _IS_TRACING:
        return packet.notice(
            "Sorry, but tracing is currently disabled. "
            'Please restart probot with the "PYTHONTRACEMALLOC=NFRAME" '
            "environment variable."
        )

    snapshot = tracemalloc.take_snapshot()
    top_stats = snapshot.statistics("filename")

    num = 0
    if len(args) >= 2:
        try:
            num = int(args[1])
        except ValueError:
            return packet.notice("Your argument must be an integer")
    else:
        return packet.notice("You must specify an object to inspect!")

    if len(top_stats) >= num:
        output = [packet.notice("Memory hog #{}".format(num))]
        obj = top_stats[num]
        trace = tracemalloc.get_object_traceback(obj)
        for line in trace:
            output.append(packet.notice(line))
        return output
    else:
        return packet.notice("Sorry, but that object does not exist")
开发者ID:camconn,项目名称:probot,代码行数:30,代码来源:stats.py


示例7: wrapper

    def wrapper(*args, **kwargs):
        import sys
        do_prof, do_tracemalloc, do_traceopen = 3 * [False]
        if len(sys.argv) > 1:
            do_prof = sys.argv[1] == "prof"
            do_tracemalloc = sys.argv[1] == "tracemalloc"
            do_traceopen = sys.argv[1] == "traceopen"

        if do_prof or do_tracemalloc or do_traceopen: sys.argv.pop(1)

        if do_prof:
            print("Entering profiling mode...")
            import pstats, cProfile, tempfile
            prof_file = kwargs.pop("prof_file", None)
            if prof_file is None:
                _, prof_file = tempfile.mkstemp()
                print("Profiling data stored in %s" % prof_file)

            sortby = kwargs.pop("sortby", "time")
            cProfile.runctx("main()", globals(), locals(), prof_file)
            s = pstats.Stats(prof_file)
            s.strip_dirs().sort_stats(sortby).print_stats()
            return 0

        elif do_tracemalloc:
            print("Entering tracemalloc mode...")
            # Requires py3.4
            try:
                import tracemalloc
            except ImportError:
                print("Error while trying to import tracemalloc (requires py3.4)")
                raise SystemExit(1)

            tracemalloc.start()
            retcode = main(*args, **kwargs)
            snapshot = tracemalloc.take_snapshot()
            top_stats = snapshot.statistics('lineno')

            n = min(len(top_stats), 20)
            print("[Top %d]" % n)
            for stat in top_stats[:20]:
                print(stat)

        elif do_traceopen:
            try:
                import psutil
            except ImportError:
                print("traceopen requires psutil module")
                raise SystemExit(1)
            import os
            p = psutil.Process(os.getpid())
            retcode = main(*args, **kwargs)
            print("open_files", p.open_files())

        else:
            retcode = main(*args, **kwargs)

        return retcode
开发者ID:gmatteo,项目名称:monty,代码行数:58,代码来源:functools.py


示例8: sigterm_handler

def sigterm_handler(signal, frame):
    snapshot = tracemalloc.take_snapshot()
    top_stats = snapshot.statistics('lineno')
    if tracemallocfile:
        with open(tracemallocfile, 'w+') as f:
            f.write("SIGTERM tracemalloc before shutdown [ Top 10 ]\n")
            for stat in top_stats[:10]:
                f.write("{}\n".format(stat))
        sys.exit(0)
开发者ID:badgley,项目名称:luigi,代码行数:9,代码来源:slurm_runner.py


示例9: display_biggest_traceback

 def display_biggest_traceback():
     snapshot = tracemalloc.take_snapshot()
     top_stats = snapshot.statistics('traceback')
 
     # pick the biggest memory block
     stat = top_stats[0]
     print("%s memory blocks: %.1f KiB" % (stat.count, stat.size / 1024))
     for line in stat.traceback.format():
         print(line)
开发者ID:ekw,项目名称:cpppo,代码行数:9,代码来源:history_test.py


示例10: tracemalloc_tool

 def tracemalloc_tool():
     # .. cross-platform but but requires Python 3.4 or higher ..
     stat = next(filter(lambda item: str(item).startswith(filename),
                        tracemalloc.take_snapshot().statistics('filename')))
     mem = stat.size / _TWO_20
     if timestamps:
         return mem, time.time()
     else:
         return mem
开发者ID:bbengfort,项目名称:memory_profiler,代码行数:9,代码来源:memory_profiler.py


示例11: take_snapshots

def take_snapshots():
    all_snapshots = []
    for loop in range(NGET_SNAPSHOT):
        objs = [alloc_object() for index in range(NOBJECTS)]
        snapshot = tracemalloc.take_snapshot()
        objs = None
        all_snapshots.append(snapshot)
        snapshots = None
    all_snapshots = None
开发者ID:haypo,项目名称:pytracemalloc,代码行数:9,代码来源:bench.py


示例12: test

    def test(self):
        timings = []
        memory_usage = []
        tracemalloc.start()

        for i in range(self.repeat):
            before_memory = tracemalloc.take_snapshot()
            start_time = time.time()

            self.bench()

            end_time = time.time()
            after_memory = tracemalloc.take_snapshot()
            timings.append(end_time - start_time)
            memory_usage.append(sum([t.size for t in after_memory.compare_to(before_memory, 'filename')]))

        print("time min:", min(timings), "max:", max(timings), "avg:", sum(timings) / len(timings))  # NOQA
        print("memory min:", min(memory_usage), "max:", max(memory_usage), "avg:", sum(memory_usage) / len(memory_usage))  # NOQA
开发者ID:BertrandBordage,项目名称:wagtail,代码行数:18,代码来源:benchmark.py


示例13: take_snapshot

def take_snapshot():
    snapshot = tracemalloc.take_snapshot()
    return snapshot.filter_traces(
        (
            tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
            tracemalloc.Filter(False, "tracemalloc"),
            tracemalloc.Filter(False, "<unknown>"),
        )
    )
开发者ID:podhmo,项目名称:individual-sandbox,代码行数:9,代码来源:02loop.py


示例14: test_snapshot_save_attr

    def test_snapshot_save_attr(self):
        # take a snapshot with a new attribute
        snapshot = tracemalloc.take_snapshot()
        snapshot.test_attr = "new"
        snapshot.dump(support.TESTFN)
        self.addCleanup(support.unlink, support.TESTFN)

        # load() should recreate the attribute
        snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
        self.assertEqual(snapshot2.test_attr, "new")
开发者ID:asvetlov,项目名称:cpython,代码行数:10,代码来源:test_tracemalloc.py


示例15: test_does_not_leak_too_much

 def test_does_not_leak_too_much(self):
     tracemalloc.start()
     gc.collect()
     series = []
     snapshot1 = tracemalloc.take_snapshot()
     for i in range(100):
         try:
             execute_script(self.feature, self)
         except Exception:
             pass
         gc.collect()
         snapshot2 = tracemalloc.take_snapshot()
         stats = snapshot2.compare_to(snapshot1, "lineno")
         snapshot1 = snapshot2
         series.append(sum(stat.size / 1024 for stat in stats))
     tracemalloc.stop()
     series = series[1:]  # ignore first run, which creates regex
     cv = statistics.stdev(series) / statistics.mean(series)
     assert cv < 0.1
开发者ID:kidosoft,项目名称:Morelia,代码行数:19,代码来源:test_traceback_hiding.py


示例16: _stop_memory_tracing

def _stop_memory_tracing():
    try:
        import tracemalloc
    except ImportError:
        return

    snapshot = tracemalloc.take_snapshot()

    _log_memory_top(snapshot)

    tracemalloc.stop()
开发者ID:lu-zero,项目名称:aqualid,代码行数:11,代码来源:aql_main.py


示例17: print_malloc_context

def print_malloc_context(**kwargs):
    """
    :param \**kwargs: see print_malloc_snapshot
    """
    if tracemalloc is None:
        logger.error('tracemalloc required')
        return
    tracemalloc.start()
    yield
    snapshot = tracemalloc.take_snapshot()
    print_malloc_snapshot(snapshot, **kwargs)
开发者ID:maurov,项目名称:pymca,代码行数:11,代码来源:ProfilingUtils.py


示例18: test_leak_in_transport

async def test_leak_in_transport(zipkin_url, client, loop):

    tracemalloc.start()

    endpoint = az.create_endpoint('simple_service')
    tracer = await az.create(zipkin_url, endpoint, sample_rate=1,
                             send_interval=0.0001, loop=loop)

    await asyncio.sleep(5)
    gc.collect()
    snapshot1 = tracemalloc.take_snapshot()

    await asyncio.sleep(10)
    gc.collect()
    snapshot2 = tracemalloc.take_snapshot()

    top_stats = snapshot2.compare_to(snapshot1, 'lineno')
    count = sum(s.count for s in top_stats)
    await tracer.close()
    assert count < 400  # in case of leak this number is around 901452
开发者ID:nicholasamorim,项目名称:aiozipkin,代码行数:20,代码来源:test_zipkin.py


示例19: test_snapshot

    def test_snapshot(self):
        obj, source = allocate_bytes(123)

        # take a snapshot
        snapshot = tracemalloc.take_snapshot()

        # write on disk
        snapshot.dump(support.TESTFN)
        self.addCleanup(support.unlink, support.TESTFN)

        # load from disk
        snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
        self.assertEqual(snapshot2.traces, snapshot.traces)

        # tracemalloc must be tracing memory allocations to take a snapshot
        tracemalloc.stop()
        with self.assertRaises(RuntimeError) as cm:
            tracemalloc.take_snapshot()
        self.assertEqual(str(cm.exception),
                         "the tracemalloc module must be tracing memory "
                         "allocations to take a snapshot")
开发者ID:asvetlov,项目名称:cpython,代码行数:21,代码来源:test_tracemalloc.py


示例20: dump_tracemalloc

def dump_tracemalloc():
    """
    Dumps memory usage information to file
    """
    gc.collect()
    snapshot = tracemalloc.take_snapshot()

    output_file = PROFILING_OUTPUT_FMT % get_filename_fmt()
    with open(output_file, 'wb') as fp:
        cPickle.dump(snapshot, fp, 2)

    # Make sure the snapshot goes away
    snapshot = None
开发者ID:0x554simon,项目名称:w3af,代码行数:13,代码来源:pytracemalloc.py



注:本文中的tracemalloc.take_snapshot函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python ray_bundle.RayBundle类代码示例发布时间:2022-05-27
下一篇:
Python tracemalloc.stop函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap