• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python models.HTTPResponse类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mitmproxy.models.HTTPResponse的典型用法代码示例。如果您正苦于以下问题:Python HTTPResponse类的具体用法?Python HTTPResponse怎么用?Python HTTPResponse使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了HTTPResponse类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_flow

    def test_flow(self):
        """
            normal flow:

                connect -> request -> response
        """
        c = flow.State()
        f = tutils.tflow()
        c.add_flow(f)
        assert f
        assert c.flow_count() == 1
        assert c.active_flow_count() == 1

        newf = tutils.tflow()
        assert c.add_flow(newf)
        assert c.active_flow_count() == 2

        f.response = HTTPResponse.wrap(netlib.tutils.tresp())
        assert c.update_flow(f)
        assert c.flow_count() == 2
        assert c.active_flow_count() == 1

        assert not c.update_flow(None)
        assert c.active_flow_count() == 1

        newf.response = HTTPResponse.wrap(netlib.tutils.tresp())
        assert c.update_flow(newf)
        assert c.active_flow_count() == 0
开发者ID:mkagenius,项目名称:mitmproxy,代码行数:28,代码来源:test_flow.py


示例2: tflow

def tflow(client_conn=True, server_conn=True, req=True, resp=None, err=None):
    """
    @type client_conn: bool | None | mitmproxy.proxy.connection.ClientConnection
    @type server_conn: bool | None | mitmproxy.proxy.connection.ServerConnection
    @type req:         bool | None | mitmproxy.protocol.http.HTTPRequest
    @type resp:        bool | None | mitmproxy.protocol.http.HTTPResponse
    @type err:         bool | None | mitmproxy.protocol.primitives.Error
    @return:           mitmproxy.protocol.http.HTTPFlow
    """
    if client_conn is True:
        client_conn = tclient_conn()
    if server_conn is True:
        server_conn = tserver_conn()
    if req is True:
        req = netlib.tutils.treq()
    if resp is True:
        resp = netlib.tutils.tresp()
    if err is True:
        err = terr()

    if req:
        req = HTTPRequest.wrap(req)
    if resp:
        resp = HTTPResponse.wrap(resp)

    f = HTTPFlow(client_conn, server_conn)
    f.request = req
    f.response = resp
    f.error = err
    f.reply = controller.DummyReply()
    return f
开发者ID:Amerge,项目名称:mitmproxy,代码行数:31,代码来源:tutils.py


示例3: test_set_view_filter

    def test_set_view_filter(self):
        c = flow.State()

        f = tutils.tflow()
        assert len(c.view) == 0

        c.add_flow(f)
        assert len(c.view) == 1

        c.set_view_filter("~s")
        assert c.filter_txt == "~s"
        assert len(c.view) == 0
        f.response = HTTPResponse.wrap(netlib.tutils.tresp())
        c.update_flow(f)
        assert len(c.view) == 1
        c.set_view_filter(None)
        assert len(c.view) == 1

        f = tutils.tflow()
        c.add_flow(f)
        assert len(c.view) == 2
        c.set_view_filter("~q")
        assert len(c.view) == 1
        c.set_view_filter("~s")
        assert len(c.view) == 1

        assert "Invalid" in c.set_view_filter("~")
开发者ID:mkagenius,项目名称:mitmproxy,代码行数:27,代码来源:test_flow.py


示例4: test_replace

 def test_replace(self):
     r = HTTPResponse.wrap(netlib.tutils.tresp())
     r.headers["Foo"] = "fOo"
     r.content = b"afoob"
     assert r.replace("foo(?i)", "boo") == 3
     assert b"foo" not in r.content
     assert r.headers["boo"] == "boo"
开发者ID:mkagenius,项目名称:mitmproxy,代码行数:7,代码来源:test_flow.py


示例5: test_backup

 def test_backup(self):
     f = tutils.tflow()
     f.response = HTTPResponse.wrap(netlib.tutils.tresp())
     f.request.content = b"foo"
     assert not f.modified()
     f.backup()
     f.request.content = b"bar"
     assert f.modified()
     f.revert()
     assert f.request.content == b"foo"
开发者ID:mkagenius,项目名称:mitmproxy,代码行数:10,代码来源:test_flow.py


示例6: test_missing_content

 def test_missing_content(self):
     cs = StringIO()
     o = dump.Options(flow_detail=3)
     m = dump.DumpMaster(None, o, outfile=cs)
     f = tutils.tflow()
     f.request.content = CONTENT_MISSING
     m.handle_request(f)
     f.response = HTTPResponse.wrap(netlib.tutils.tresp())
     f.response.content = CONTENT_MISSING
     m.handle_response(f)
     assert "content missing" in cs.getvalue()
开发者ID:Bifrozt,项目名称:mitmproxy,代码行数:11,代码来源:test_dump.py


示例7: test_server_playback

    def test_server_playback(self):
        s = flow.State()

        f = tutils.tflow()
        f.response = HTTPResponse.wrap(netlib.tutils.tresp(content=f.request))
        pb = [f]

        fm = flow.FlowMaster(None, s)
        fm.refresh_server_playback = True
        assert not fm.do_server_playback(tutils.tflow())

        fm.start_server_playback(
            pb,
            False,
            [],
            False,
            False,
            None,
            False,
            None,
            False)
        assert fm.do_server_playback(tutils.tflow())

        fm.start_server_playback(
            pb,
            False,
            [],
            True,
            False,
            None,
            False,
            None,
            False)
        r = tutils.tflow()
        r.request.content = "gibble"
        assert not fm.do_server_playback(r)
        assert fm.do_server_playback(tutils.tflow())

        fm.start_server_playback(
            pb,
            False,
            [],
            True,
            False,
            None,
            False,
            None,
            False)
        q = Queue.Queue()
        fm.tick(q, 0)
        assert fm.should_exit.is_set()

        fm.stop_server_playback()
        assert not fm.server_playback
开发者ID:Anandkumar26,项目名称:mitmproxy,代码行数:54,代码来源:test_flow.py


示例8: _cycle

 def _cycle(self, m, content):
     f = tutils.tflow(req=netlib.tutils.treq(content=content))
     l = Log("connect")
     l.reply = mock.MagicMock()
     m.handle_log(l)
     m.handle_clientconnect(f.client_conn)
     m.handle_serverconnect(f.server_conn)
     m.handle_request(f)
     f.response = HTTPResponse.wrap(netlib.tutils.tresp(content=content))
     f = m.handle_response(f)
     m.handle_clientdisconnect(f.client_conn)
     return f
开发者ID:Bifrozt,项目名称:mitmproxy,代码行数:12,代码来源:test_dump.py


示例9: request

def request(flow):
    # pretty_host takes the "Host" header of the request into account,
    # which is useful in transparent mode where we usually only have the IP
    # otherwise.

    # Method 1: Answer with a locally generated response
    if flow.request.pretty_host.endswith("example.com"):
        resp = HTTPResponse.make(200, b"Hello World", {"Content-Type": "text/html"})
        flow.reply.send(resp)

    # Method 2: Redirect the request to a different server
    if flow.request.pretty_host.endswith("example.org"):
        flow.request.host = "mitmproxy.org"
开发者ID:lordillusions,项目名称:mitmproxy,代码行数:13,代码来源:redirect_requests.py


示例10: test_refresh_cookie

    def test_refresh_cookie(self):
        r = HTTPResponse.wrap(netlib.tutils.tresp())

        # Invalid expires format, sent to us by Reddit.
        c = "rfoo=bar; Domain=reddit.com; expires=Thu, 31 Dec 2037 23:59:59 GMT; Path=/"
        assert r._refresh_cookie(c, 60)

        c = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"
        assert "00:21:38" in r._refresh_cookie(c, 60)

        # https://github.com/mitmproxy/mitmproxy/issues/773
        c = ">=A"
        with tutils.raises(ValueError):
            r._refresh_cookie(c, 60)
开发者ID:Anandkumar26,项目名称:mitmproxy,代码行数:14,代码来源:test_flow.py


示例11: test_refresh

    def test_refresh(self):
        r = HTTPResponse.wrap(netlib.tutils.tresp())
        n = time.time()
        r.headers["date"] = email.utils.formatdate(n)
        pre = r.headers["date"]
        r.refresh(n)
        assert pre == r.headers["date"]
        r.refresh(n + 60)

        d = email.utils.parsedate_tz(r.headers["date"])
        d = email.utils.mktime_tz(d)
        # Weird that this is not exact...
        assert abs(60 - (d - n)) <= 1

        r.headers["set-cookie"] = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"
        r.refresh()
开发者ID:Anandkumar26,项目名称:mitmproxy,代码行数:16,代码来源:test_flow.py


示例12: test_all

    def test_all(self):
        s = flow.State()
        fm = flow.FlowMaster(None, None, s)
        f = tutils.tflow(req=None)
        fm.clientconnect(f.client_conn)
        f.request = HTTPRequest.wrap(netlib.tutils.treq())
        fm.request(f)
        assert s.flow_count() == 1

        f.response = HTTPResponse.wrap(netlib.tutils.tresp())
        fm.response(f)
        assert s.flow_count() == 1

        fm.clientdisconnect(f.client_conn)

        f.error = Error("msg")
        fm.error(f)

        fm.shutdown()
开发者ID:mkagenius,项目名称:mitmproxy,代码行数:19,代码来源:test_flow.py


示例13: test_all

    def test_all(self):
        s = flow.State()
        fm = flow.FlowMaster(None, None, s)
        f = tutils.tflow(req=None)
        fm.clientconnect(f.client_conn)
        f.request = HTTPRequest.wrap(netlib.tutils.treq())
        fm.request(f)
        assert s.flow_count() == 1

        f.response = HTTPResponse.wrap(netlib.tutils.tresp())
        fm.response(f)
        assert s.flow_count() == 1

        fm.clientdisconnect(f.client_conn)

        f.error = Error("msg")
        f.error.reply = controller.DummyReply()
        fm.error(f)

        fm.load_script(tutils.test_data.path("data/scripts/a.py"))
        fm.shutdown()
开发者ID:smartnetguru,项目名称:mitmproxy,代码行数:21,代码来源:test_flow.py


示例14: test_server_playback_kill

    def test_server_playback_kill(self):
        s = flow.State()
        f = tutils.tflow()
        f.response = HTTPResponse.wrap(netlib.tutils.tresp(content=f.request))
        pb = [f]
        fm = flow.FlowMaster(None, s)
        fm.refresh_server_playback = True
        fm.start_server_playback(
            pb,
            True,
            [],
            False,
            False,
            None,
            False,
            None,
            False)

        f = tutils.tflow()
        f.request.host = "nonexistent"
        fm.process_new_request(f)
        assert "killed" in f.error.msg
开发者ID:Amerge,项目名称:mitmproxy,代码行数:22,代码来源:test_flow.py


示例15: test_get_content_type

 def test_get_content_type(self):
     resp = HTTPResponse.wrap(netlib.tutils.tresp())
     resp.headers = Headers(content_type="text/plain")
     assert resp.headers["content-type"] == "text/plain"
开发者ID:mkagenius,项目名称:mitmproxy,代码行数:4,代码来源:test_flow.py


示例16: handle_request

 def handle_request(self, f):
     resp = HTTPResponse.wrap(netlib.tutils.tresp())
     resp.content = CONTENT_MISSING
     f.reply(resp)
开发者ID:007durgesh219,项目名称:mitmproxy,代码行数:4,代码来源:test_server.py


示例17: _add_response

 def _add_response(self, state):
     f = tutils.tflow()
     state.add_flow(f)
     f.response = HTTPResponse.wrap(netlib.tutils.tresp())
     state.update_flow(f)
开发者ID:mkagenius,项目名称:mitmproxy,代码行数:5,代码来源:test_flow.py


示例18: request

 def request(self, f):
     resp = HTTPResponse.wrap(netlib.tutils.tresp())
     resp.content = None
     f.reply(resp)
开发者ID:eftychis,项目名称:mitmproxy,代码行数:4,代码来源:test_server.py


示例19: request

def request(context, flow):
    print flow.request.path
    m = re.match("^/geoserver/i/([\d]+)/([\d]+)/([\d]+)$", flow.request.path)
    if m is not None:
        print "match success!"
        imagename = flow.request.path_components[1] + '_' + flow.request.path_components[2] + '_' + flow.request.path_components[3] + '_' + flow.request.path_components[4];
        imagequerycache = "/home/root/accesscache " + imagename;
        imagequerycacheresult = os.popen(imagequerycache).read();
        if (imagequerycacheresult != "No" and imagequerycacheresult != "ERROR"):
        	resp = HTTPResponse(
		"HTTP/1.1", 200, "OK", Headers(Content_Type="image/png"), "helloworld")
                with open(imagequerycacheresult, 'r') as f:
             		payload = f.read()
             		resp.content = payload
             		resp.headers["Content-Length"] = str(len(payload))
                        resp.headers["Access-Control-Allow-Origin"] = "*"
                        print len(payload)
                        f.close()
                flow.reply(resp)
                return
	else:
		imagefilepath = "/home/root/bpqrecv/" + imagename;
                randregid = randrange(0, 32768);
		print randregid;
		querycomm = "/home/root/DTN2/apps/dtnquery/dtnquery -m send -s dtn://dtn1.dtn -d dtn://dtn2.dtn -q " + imagename + " -i " + str(randregid);
                print querycomm;
		os.system(querycomm);
                for x in range(0, 60):
			imagequerycacheresult = os.popen(imagequerycache).read();
			if (imagequerycacheresult != "No" and imagequerycacheresult != "ERROR"):
				resp = HTTPResponse(
				"HTTP/1.1", 200, "OK", Headers(Content_Type="image/png"), "helloworld")
                		with open(imagequerycacheresult, 'r') as f:
             				payload = f.read()
             				resp.content = payload
             				resp.headers["Content-Length"] = str(len(payload))
					resp.headers["Access-Control-Allow-Origin"] = "*"
                       			print len(payload)
                        		f.close()
                		flow.reply(resp)
                		return
                	else:	
                		time.sleep(1);
    z = re.match("^/geoserver/z/([\d]+)/([\d]+)/([\d]+)$", flow.request.path)
    if z is not None:
        vectorname = flow.request.path_components[1] + '_' +  flow.request.path_components[2] + '_' + flow.request.path_components[3] + '_' + flow.request.path_components[4]
        vectorquerycache = "/tmp/ramdisk0/accesscache " + vectorname;
	vectorquerycacheresult = os.popen(vectorquerycache).read();
        if (vectorquerycacheresult != "No" and vectorquerycacheresult != "ERROR"):
        	resp = HTTPResponse(
		"HTTP/1.1", 200, "OK", Headers(Content_Type="application/octet-stream"), "helloworld")
                with open(vectorquerycacheresult, 'r') as f:
             		payload = f.read()
             		resp.content = payload
             		resp.headers["Content-Length"] = str(len(payload))
			resp.headers["Access-Control-Allow-Origin"] = "*"
                        resp.headers["Cache-Control"]= "public, max-age=86400"
                        print len(payload)
                        f.close()
                flow.reply(resp)
                return
        else:
                waitingcomm = "/tmp/ramdisk0/waiting " + vectorname;
                os.system(waitingcomm);
        	vectorquerycacheresult = os.popen(vectorquerycache).read();
                print "vectorquerycacheresult: " + vectorquerycacheresult;
                if (vectorquerycacheresult != "No" and vectorquerycacheresult != "ERROR"):
        		resp = HTTPResponse(
			"HTTP/1.1", 200, "OK", Headers(Content_Type="application/octet-stream"), "helloworld")
                	with open(vectorquerycacheresult, 'r') as f:
             			payload = f.read()
             			resp.content = payload
             			resp.headers["Content-Length"] = str(len(payload))
				resp.headers["Access-Control-Allow-Origin"] = "*"
                                resp.headers["Cache-Control"]= "public, max-age=86400"
                        	print len(payload)
                        	f.close()
                	flow.reply(resp)
                        return
    v = re.match("^/geoserver/v/([\d]+)/([\d]+)/([\d]+)$", flow.request.path)
    if v is not None:
        vectorname = flow.request.path_components[1] + '_' +  flow.request.path_components[2] + '_' + flow.request.path_components[3] + '_' + flow.request.path_components[4]
        print "vectorname: " + vectorname;
        vectorquerycache = "/tmp/ramdisk0/accesscache " + vectorname;
	vectorquerycacheresult = os.popen(vectorquerycache).read();
        if (vectorquerycacheresult != "No" and vectorquerycacheresult != "ERROR"):
        	resp = HTTPResponse(
		"HTTP/1.1", 200, "OK", Headers(Content_Type="application/json"), "helloworld")
                with open(vectorquerycacheresult, 'r') as f:
             		payload = f.read()
             		resp.content = payload
             		resp.headers["Content-Length"] = str(len(payload))
			resp.headers["Access-Control-Allow-Origin"] = "*"
                        resp.headers["Cache-Control"]= "public, max-age=86400"
                        print len(payload)
                        f.close()
                flow.reply(resp)
                return
        else:
                waitingcomm = "/tmp/ramdisk0/waiting " + vectorname;
#.........这里部分代码省略.........
开发者ID:lchao-bit,项目名称:NSSDTN2,代码行数:101,代码来源:script.py


示例20: request

 def request(self, f):
     f.response = HTTPResponse.wrap(netlib.tutils.tresp())
开发者ID:mkagenius,项目名称:mitmproxy,代码行数:2,代码来源:test_server.py



注:本文中的mitmproxy.models.HTTPResponse类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python http2.read_raw_frame函数代码示例发布时间:2022-05-27
下一篇:
Python models.HTTPRequest类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap