• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python tutils.tflow函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tutils.tflow函数的典型用法代码示例。如果您正苦于以下问题:Python tflow函数的具体用法?Python tflow怎么用?Python tflow使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了tflow函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_client_playback

    def test_client_playback(self):
        s = flow.State()

        f = tutils.tflow(resp=True)
        pb = [tutils.tflow(resp=True), f]
        fm = flow.FlowMaster(DummyServer(ProxyConfig()), s)
        assert not fm.start_server_playback(
            pb,
            False,
            [],
            False,
            False,
            None,
            False,
            None,
            False)
        assert not fm.start_client_playback(pb, False)
        fm.client_playback.testing = True

        q = Queue.Queue()
        assert not fm.state.flow_count()
        fm.tick(q, 0)
        assert fm.state.flow_count()

        f.error = Error("error")
        fm.handle_error(f)
开发者ID:keithsun80,项目名称:mitmproxy,代码行数:26,代码来源:test_flow.py


示例2: test_set_limit

    def test_set_limit(self):
        c = flow.State()

        f = tutils.tflow()
        assert len(c.view) == 0

        c.add_request(f)
        assert len(c.view) == 1

        c.set_limit("~s")
        assert c.limit_txt == "~s"
        assert len(c.view) == 0
        f.response = tutils.tresp()
        c.add_response(f)
        assert len(c.view) == 1
        c.set_limit(None)
        assert len(c.view) == 1

        f = tutils.tflow()
        c.add_request(f)
        assert len(c.view) == 2
        c.set_limit("~q")
        assert len(c.view) == 1
        c.set_limit("~s")
        assert len(c.view) == 1

        assert "Invalid" in c.set_limit("~")
开发者ID:AmineCherrai,项目名称:mitmproxy,代码行数:27,代码来源:test_flow.py


示例3: test_tick

    def test_tick(self):
        first = tutils.tflow()
        s = flow.State()
        fm = flow.FlowMaster(None, s)
        fm.start_client_playback([first, tutils.tflow()], True)
        c = fm.client_playback

        assert not c.done()
        assert not s.flow_count()
        assert c.count() == 2
        c.tick(fm, testing=True)
        assert s.flow_count()
        assert c.count() == 1

        c.tick(fm, testing=True)
        assert c.count() == 1

        c.clear(c.current)
        c.tick(fm, testing=True)
        assert c.count() == 0
        c.clear(c.current)
        assert c.done()

        q = Queue.Queue()
        fm.state.clear()
        fm.tick(q)

        fm.stop_client_playback()
        assert not fm.client_playback
开发者ID:alxsoares,项目名称:mitmproxy,代码行数:29,代码来源:test_flow.py


示例4: test_server_playback

    def test_server_playback(self):
        s = flow.State()

        f = tutils.tflow()
        f.response = HTTPResponse.wrap(netlib.tutils.tresp(content=f.request))
        pb = [f]

        fm = flow.FlowMaster(None, s)
        fm.refresh_server_playback = True
        assert not fm.do_server_playback(tutils.tflow())

        fm.start_server_playback(pb, False, [], False, False, None, False,
                                 None, False)
        assert fm.do_server_playback(tutils.tflow())

        fm.start_server_playback(pb, False, [], True, False, None, False, None,
                                 False)
        r = tutils.tflow()
        r.request.content = "gibble"
        assert not fm.do_server_playback(r)
        assert fm.do_server_playback(tutils.tflow())

        fm.start_server_playback(pb, False, [], True, False, None, False, None,
                                 False)
        q = Queue.Queue()
        fm.tick(q, 0)
        assert fm.should_exit.is_set()

        fm.stop_server_playback()
        assert not fm.server_playback
开发者ID:tracyhatemice,项目名称:mitmproxy,代码行数:30,代码来源:test_flow.py


示例5: test_stream

    def test_stream(self):
        with tutils.tmpdir() as tdir:
            p = os.path.join(tdir, "foo")

            def r():
                r = flow.FlowReader(open(p, "rb"))
                return list(r.stream())

            s = flow.State()
            fm = flow.FlowMaster(None, s)
            f = tutils.tflow(resp=True)

            fm.start_stream(file(p, "ab"), None)
            fm.handle_request(f)
            fm.handle_response(f)
            fm.stop_stream()

            assert r()[0].response

            f = tutils.tflow()
            fm.start_stream(file(p, "ab"), None)
            fm.handle_request(f)
            fm.shutdown()

            assert not r()[1].response
开发者ID:tracyhatemice,项目名称:mitmproxy,代码行数:25,代码来源:test_flow.py


示例6: test_flow

    def test_flow(self):
        """
            normal flow:

                connect -> request -> response
        """
        c = flow.State()
        f = tutils.tflow()
        c.add_flow(f)
        assert f
        assert c.flow_count() == 1
        assert c.active_flow_count() == 1

        newf = tutils.tflow()
        assert c.add_flow(newf)
        assert c.active_flow_count() == 2

        f.response = HTTPResponse.wrap(netlib.tutils.tresp())
        assert c.update_flow(f)
        assert c.flow_count() == 2
        assert c.active_flow_count() == 1

        _ = HTTPResponse.wrap(netlib.tutils.tresp())
        assert not c.update_flow(None)
        assert c.active_flow_count() == 1

        newf.response = HTTPResponse.wrap(netlib.tutils.tresp())
        assert c.update_flow(newf)
        assert c.active_flow_count() == 0
开发者ID:tracyhatemice,项目名称:mitmproxy,代码行数:29,代码来源:test_flow.py


示例7: test_set_limit

    def test_set_limit(self):
        c = flow.State()

        f = tutils.tflow()
        assert len(c.view) == 0

        c.add_flow(f)
        assert len(c.view) == 1

        c.set_limit("~s")
        assert c.limit_txt == "~s"
        assert len(c.view) == 0
        f.response = HTTPResponse.wrap(netlib.tutils.tresp())
        c.update_flow(f)
        assert len(c.view) == 1
        c.set_limit(None)
        assert len(c.view) == 1

        f = tutils.tflow()
        c.add_flow(f)
        assert len(c.view) == 2
        c.set_limit("~q")
        assert len(c.view) == 1
        c.set_limit("~s")
        assert len(c.view) == 1

        assert "Invalid" in c.set_limit("~")
开发者ID:tracyhatemice,项目名称:mitmproxy,代码行数:27,代码来源:test_flow.py


示例8: test_ignore_content

    def test_ignore_content(self):
        s = flow.ServerPlaybackState(None, [], False, False, None, False, None,
                                     False)
        r = tutils.tflow(resp=True)
        r2 = tutils.tflow(resp=True)

        r.request.content = "foo"
        r2.request.content = "foo"
        assert s._hash(r) == s._hash(r2)
        r2.request.content = "bar"
        assert not s._hash(r) == s._hash(r2)

        # now ignoring content
        s = flow.ServerPlaybackState(None, [], False, False, None, True, None,
                                     False)
        r = tutils.tflow(resp=True)
        r2 = tutils.tflow(resp=True)
        r.request.content = "foo"
        r2.request.content = "foo"
        assert s._hash(r) == s._hash(r2)
        r2.request.content = "bar"
        assert s._hash(r) == s._hash(r2)
        r2.request.content = ""
        assert s._hash(r) == s._hash(r2)
        r2.request.content = None
        assert s._hash(r) == s._hash(r2)
开发者ID:tracyhatemice,项目名称:mitmproxy,代码行数:26,代码来源:test_flow.py


示例9: test_copy

    def test_copy(self):
        f = tutils.tflow(resp=True)
        a0 = f.get_state()
        f2 = f.copy()
        a = f.get_state()
        b = f2.get_state()
        del a["id"]
        del b["id"]
        assert a == b
        assert not f == f2
        assert not f is f2
        assert f.request.get_state() == f2.request.get_state()
        assert not f.request is f2.request
        assert f.request.headers == f2.request.headers
        assert not f.request.headers is f2.request.headers
        assert f.response.get_state() == f2.response.get_state()
        assert not f.response is f2.response

        f = tutils.tflow(err=True)
        f2 = f.copy()
        assert not f is f2
        assert not f.request is f2.request
        assert f.request.headers == f2.request.headers
        assert not f.request.headers is f2.request.headers
        assert f.error.get_state() == f2.error.get_state()
        assert not f.error is f2.error
开发者ID:tracyhatemice,项目名称:mitmproxy,代码行数:26,代码来源:test_flow.py


示例10: test_ignore_payload_params

 def test_ignore_payload_params(self):
     s = flow.ServerPlaybackState(None, [], False, False, None, False,
                                  ["param1", "param2"], False)
     r = tutils.tflow(resp=True)
     r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
     r.request.content = "paramx=x&param1=1"
     r2 = tutils.tflow(resp=True)
     r2.request.headers[
         "Content-Type"] = "application/x-www-form-urlencoded"
     r2.request.content = "paramx=x&param1=1"
     # same parameters
     assert s._hash(r) == s._hash(r2)
     # ignored parameters !=
     r2.request.content = "paramx=x&param1=2"
     assert s._hash(r) == s._hash(r2)
     # missing parameter
     r2.request.content = "paramx=x"
     assert s._hash(r) == s._hash(r2)
     # ignorable parameter added
     r2.request.content = "paramx=x&param1=2"
     assert s._hash(r) == s._hash(r2)
     # not ignorable parameter changed
     r2.request.content = "paramx=y&param1=1"
     assert not s._hash(r) == s._hash(r2)
     # not ignorable parameter missing
     r2.request.content = "param1=1"
     assert not s._hash(r) == s._hash(r2)
开发者ID:tracyhatemice,项目名称:mitmproxy,代码行数:27,代码来源:test_flow.py


示例11: test_server_playback

    def test_server_playback(self):
        controller.should_exit = False
        s = flow.State()

        f = tutils.tflow()
        f.response = tutils.tresp(f.request)
        pb = [f]

        fm = flow.FlowMaster(None, s)
        fm.refresh_server_playback = True
        assert not fm.do_server_playback(tutils.tflow())

        fm.start_server_playback(pb, False, [], False, False)
        assert fm.do_server_playback(tutils.tflow())

        fm.start_server_playback(pb, False, [], True, False)
        r = tutils.tflow()
        r.request.content = "gibble"
        assert not fm.do_server_playback(r)
        assert fm.do_server_playback(tutils.tflow())

        fm.start_server_playback(pb, False, [], True, False)
        q = Queue.Queue()
        fm.tick(q)
        assert controller.should_exit

        fm.stop_server_playback()
        assert not fm.server_playback
开发者ID:alxsoares,项目名称:mitmproxy,代码行数:28,代码来源:test_flow.py


示例12: test_flow

    def test_flow(self):
        """
            normal flow:

                connect -> request -> response
        """
        c = flow.State()
        f = tutils.tflow()
        c.add_request(f)
        assert f
        assert c.flow_count() == 1
        assert c.active_flow_count() == 1

        newf = tutils.tflow()
        assert c.add_request(newf)
        assert c.active_flow_count() == 2

        f.response = tutils.tresp()
        assert c.add_response(f)
        assert c.flow_count() == 2
        assert c.active_flow_count() == 1

        _ = tutils.tresp()
        assert not c.add_response(None)
        assert c.active_flow_count() == 1

        newf.response = tutils.tresp()
        assert c.add_response(newf)
        assert c.active_flow_count() == 0
开发者ID:AmineCherrai,项目名称:mitmproxy,代码行数:29,代码来源:test_flow.py


示例13: test_match

    def test_match(self):
        f = tutils.tflow(resp=True)
        assert not f.match("~b test")
        assert f.match(None)
        assert not f.match("~b test")

        f = tutils.tflow(err=True)
        assert f.match("~e")

        tutils.raises(ValueError, f.match, "~")
开发者ID:AmineCherrai,项目名称:mitmproxy,代码行数:10,代码来源:test_flow.py


示例14: test_handle_response

    def test_handle_response(self):
        s = flow.StickyAuthState(filt.parse(".*"))
        f = tutils.tflow(resp=True)
        f.request.headers["authorization"] = ["foo"]
        s.handle_request(f)
        assert "address" in s.hosts

        f = tutils.tflow(resp=True)
        s.handle_request(f)
        assert f.request.headers["authorization"] == ["foo"]
开发者ID:AmineCherrai,项目名称:mitmproxy,代码行数:10,代码来源:test_flow.py


示例15: test_ignore_host

    def test_ignore_host(self):
        s = flow.ServerPlaybackState(None, [], False, False, None, False, None, True)
        r = tutils.tflow(resp=True)
        r2 = tutils.tflow(resp=True)

        r.request.host="address"
        r2.request.host="address"
        assert s._hash(r) == s._hash(r2)
        r2.request.host="wrong_address"
        assert s._hash(r) == s._hash(r2)
开发者ID:Nolski,项目名称:mitmproxy,代码行数:10,代码来源:test_flow.py


示例16: test_ignore_payload_wins_over_params

 def test_ignore_payload_wins_over_params(self):
     #NOTE: parameters are mutually exclusive in options
     s = flow.ServerPlaybackState(None, [], False, False, None, True, ["param1", "param2"])
     r = tutils.tflow(resp=True)
     r.request.headers["Content-Type"] = ["application/x-www-form-urlencoded"]
     r.request.content = "paramx=y"
     r2 = tutils.tflow(resp=True)
     r2.request.headers["Content-Type"] = ["application/x-www-form-urlencoded"]
     r2.request.content = "paramx=x"
     # same parameters 
     assert s._hash(r) == s._hash(r2)
开发者ID:gabe-k,项目名称:mitmproxy,代码行数:11,代码来源:test_flow.py


示例17: test_hash

    def test_hash(self):
        s = flow.ServerPlaybackState(None, [], False, False)
        r = tutils.tflow()
        r2 = tutils.tflow()

        assert s._hash(r)
        assert s._hash(r) == s._hash(r2)
        r.request.headers["foo"] = ["bar"]
        assert s._hash(r) == s._hash(r2)
        r.request.path = "voing"
        assert s._hash(r) != s._hash(r2)
开发者ID:alxsoares,项目名称:mitmproxy,代码行数:11,代码来源:test_flow.py


示例18: test_load_with_nopop

    def test_load_with_nopop(self):
        r = tutils.tflow(resp=True)
        r.request.headers["key"] = ["one"]

        r2 = tutils.tflow(resp=True)
        r2.request.headers["key"] = ["two"]

        s = flow.ServerPlaybackState(None, [r, r2], False, True, None, False)

        assert s.count() == 2
        s.next_flow(r)
        assert s.count() == 2
开发者ID:AmineCherrai,项目名称:mitmproxy,代码行数:12,代码来源:test_flow.py


示例19: _treader

    def _treader(self):
        sio = StringIO()
        w = flow.FlowWriter(sio)
        for i in range(3):
            f = tutils.tflow(resp=True)
            w.add(f)
        for i in range(3):
            f = tutils.tflow(err=True)
            w.add(f)

        sio.seek(0)
        return flow.FlowReader(sio)
开发者ID:AmineCherrai,项目名称:mitmproxy,代码行数:12,代码来源:test_flow.py


示例20: test_server_playback_kill

    def test_server_playback_kill(self):
        s = flow.State()
        f = tutils.tflow()
        f.response = tutils.tresp(f.request)
        pb = [f]
        fm = flow.FlowMaster(None, s)
        fm.refresh_server_playback = True
        fm.start_server_playback(pb, True, [], False, False)

        f = tutils.tflow()
        f.request.host = "nonexistent"
        fm.process_new_request(f)
        assert "killed" in f.error.msg
开发者ID:alxsoares,项目名称:mitmproxy,代码行数:13,代码来源:test_flow.py



注:本文中的tutils.tflow函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python tutils.tflow_err函数代码示例发布时间:2022-05-27
下一篇:
Python tutils.raises函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap