本文整理汇总了Python中test.mocking.return_value函数的典型用法代码示例。如果您正苦于以下问题:Python return_value函数的具体用法?Python return_value怎么用?Python return_value使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了return_value函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_get_socks_listeners_new
def test_get_socks_listeners_new(self):
"""
Exercises the get_socks_listeners() method as if talking to a newer tor
instance.
"""
# multiple SOCKS listeners
mocking.mock_method(Controller, "get_info", mocking.return_value(
'"127.0.0.1:1112" "127.0.0.1:1114"'
))
self.assertEqual(
[('127.0.0.1', 1112), ('127.0.0.1', 1114)],
self.controller.get_socks_listeners()
)
# no SOCKS listeners
mocking.mock_method(Controller, "get_info", mocking.return_value(""))
self.assertEqual([], self.controller.get_socks_listeners())
# check where GETINFO provides malformed content
invalid_responses = (
'"127.0.0.1"', # address only
'"1112"', # port only
'"5127.0.0.1:1112"', # invlaid address
'"127.0.0.1:991112"', # invalid port
)
for response in invalid_responses:
mocking.mock_method(Controller, "get_info", mocking.return_value(response))
self.assertRaises(stem.ProtocolError, self.controller.get_socks_listeners)
开发者ID:gsathya,项目名称:stem,代码行数:32,代码来源:controller.py
示例2: test_mirror_mirror_on_the_wall_1
def test_mirror_mirror_on_the_wall_1(self):
def tutorial_example():
from stem.control import Controller
with Controller.from_port(control_port = 9051) as controller:
controller.authenticate()
for desc in controller.get_network_statuses():
print "found relay %s (%s)" % (desc.nickname, desc.fingerprint)
controller = mocking.get_object(Controller, {
'authenticate': mocking.no_op(),
'close': mocking.no_op(),
'get_network_statuses': mocking.return_value(
[mocking.get_router_status_entry_v2()],
),
})
mocking.mock(
Controller.from_port, mocking.return_value(controller),
target_module = Controller,
is_static = True,
)
tutorial_example()
self.assertEqual("found relay caerSidi (A7569A83B5706AB1B1A9CB52EFF7D2D32E4553EB)\n", self.stdout.getvalue())
开发者ID:ankitmodi,项目名称:Projects,代码行数:26,代码来源:tutorial.py
示例3: test_mirror_mirror_on_the_wall_4
def test_mirror_mirror_on_the_wall_4(self):
def tutorial_example():
from stem.control import Controller
from stem.util import str_tools
# provides a mapping of observed bandwidth to the relay nicknames
def get_bw_to_relay():
bw_to_relay = {}
with Controller.from_port(control_port = 9051) as controller:
controller.authenticate()
for desc in controller.get_server_descriptors():
if desc.exit_policy.is_exiting_allowed():
bw_to_relay.setdefault(desc.observed_bandwidth, []).append(desc.nickname)
return bw_to_relay
# prints the top fifteen relays
bw_to_relay = get_bw_to_relay()
count = 1
for bw_value in sorted(bw_to_relay.keys(), reverse = True):
for nickname in bw_to_relay[bw_value]:
print "%i. %s (%s/s)" % (count, nickname, str_tools.get_size_label(bw_value, 2))
count += 1
if count > 15:
return
exit_descriptor = mocking.get_relay_server_descriptor({
'router': 'speedyexit 149.255.97.109 9001 0 0'
}, content = True).replace(b'reject *:*', b'accept *:*')
exit_descriptor = mocking.sign_descriptor_content(exit_descriptor)
exit_descriptor = RelayDescriptor(exit_descriptor)
controller = mocking.get_object(Controller, {
'authenticate': mocking.no_op(),
'close': mocking.no_op(),
'get_server_descriptors': mocking.return_value([
exit_descriptor,
mocking.get_relay_server_descriptor(), # non-exit
exit_descriptor,
exit_descriptor,
])
})
mocking.mock(
Controller.from_port, mocking.return_value(controller),
target_module = Controller,
is_static = True,
)
tutorial_example()
self.assertEqual(MIRROR_MIRROR_OUTPUT, self.stdout.getvalue())
开发者ID:ankitmodi,项目名称:Projects,代码行数:57,代码来源:tutorial.py
示例4: _mock_open
def _mock_open(content):
test_content = StringIO.StringIO(content)
mocking.support_with(test_content)
if stem.prereq.is_python_3():
import builtins
mocking.mock(builtins.open, mocking.return_value(test_content), builtins)
else:
mocking.mock(open, mocking.return_value(test_content))
开发者ID:ankitmodi,项目名称:Projects,代码行数:9,代码来源:reader.py
示例5: test_the_little_relay_that_could
def test_the_little_relay_that_could(self):
def tutorial_example():
from stem.control import Controller
with Controller.from_port(control_port = 9051) as controller:
controller.authenticate() # provide the password here if you set one
bytes_read = controller.get_info("traffic/read")
bytes_written = controller.get_info("traffic/written")
print "My Tor relay has read %s bytes and written %s." % (bytes_read, bytes_written)
controller = mocking.get_object(Controller, {
'authenticate': mocking.no_op(),
'close': mocking.no_op(),
'get_info': mocking.return_for_args({
('traffic/read',): '33406',
('traffic/written',): '29649',
}, is_method = True),
})
mocking.mock(
Controller.from_port, mocking.return_value(controller),
target_module = Controller,
is_static = True,
)
tutorial_example()
self.assertEqual("My Tor relay has read 33406 bytes and written 29649.\n", self.stdout.getvalue())
开发者ID:ankitmodi,项目名称:Projects,代码行数:29,代码来源:tutorial.py
示例6: test_get_stats
def test_get_stats(self):
"""
Tests get_stats() with all combinations of stat_type arguments.
"""
# list of all combinations of args with respective return values
stat_combinations = mocking.get_all_combinations([
('command', 'test_program'),
('utime', '0.13'),
('stime', '0.14'),
('start time', '10.21'),
])
stat_path = "/proc/24062/stat"
stat = '1 (test_program) 2 3 4 5 6 7 8 9 10 11 12 13.0 14.0 15 16 17 18 19 20 21.0 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43'
mocking.mock(proc.get_system_start_time, mocking.return_value(10))
# tests the case where no stat_types are specified
mocking.mock(proc._get_line, mocking.return_for_args({
(stat_path, '24062', 'process '): stat
}))
self.assertEquals((), proc.get_stats(24062))
for stats in stat_combinations:
# the stats variable is...
# [(arg1, resp1), (arg2, resp2)...]
#
# but we need...
# (arg1, arg2...), (resp1, resp2...).
args, response = zip(*stats)
mocking.mock(proc._get_line, mocking.return_for_args({
(stat_path, '24062', 'process %s' % ', '.join(args)): stat
}))
self.assertEquals(response, proc.get_stats(24062, *args))
# tests the case where pid = 0
if 'start time' in args:
response = 10
else:
response = ()
for arg in args:
if arg == 'command':
response += ('sched',)
elif arg == 'utime':
response += ('0',)
elif arg == 'stime':
response += ('0',)
mocking.mock(proc._get_line, mocking.return_for_args({
('/proc/0/stat', '0', 'process %s' % ', '.join(args)): stat
}))
self.assertEquals(response, proc.get_stats(0, *args))
开发者ID:ankitmodi,项目名称:Projects,代码行数:60,代码来源:proc.py
示例7: test_get_streams
def test_get_streams(self):
"""
Exercises the get_streams() method.
"""
# get a list of fake, but good looking, streams
valid_streams = (
("1", "NEW", "4", "10.10.10.1:80"),
("2", "SUCCEEDED", "4", "10.10.10.1:80"),
("3", "SUCCEEDED", "4", "10.10.10.1:80")
)
responses = ["%s\r\n" % " ".join(entry) for entry in valid_streams]
mocking.mock_method(Controller, "get_info", mocking.return_value(
"".join(responses)
))
streams = self.controller.get_streams()
self.assertEqual(len(valid_streams), len(streams))
for index, stream in enumerate(streams):
self.assertEqual(valid_streams[index][0], stream.id)
self.assertEqual(valid_streams[index][1], stream.status)
self.assertEqual(valid_streams[index][2], stream.circ_id)
self.assertEqual(valid_streams[index][3], stream.target)
开发者ID:gsathya,项目名称:stem,代码行数:26,代码来源:controller.py
示例8: test_load_processed_files
def test_load_processed_files(self):
"""
Successful load of content.
"""
test_lines = (
"/dir/ 0",
"/dir/file 12345",
"/dir/file with spaces 7138743",
" /dir/with extra space 12345 ",
" \t ",
"",
"/dir/after empty line 12345",
)
expected_value = {
"/dir/": 0,
"/dir/file": 12345,
"/dir/file with spaces": 7138743,
"/dir/with extra space": 12345,
"/dir/after empty line": 12345,
}
test_content = StringIO.StringIO("\n".join(test_lines))
mocking.support_with(test_content)
mocking.mock(open, mocking.return_value(test_content))
self.assertEquals(expected_value, stem.descriptor.reader.load_processed_files(""))
开发者ID:ndanger000,项目名称:stem,代码行数:27,代码来源:reader.py
示例9: test_get_protocolinfo
def test_get_protocolinfo(self):
"""
Exercises the get_protocolinfo() method.
"""
# Use the handy mocked protocolinfo response.
mocking.mock(stem.connection.get_protocolinfo, mocking.return_value(
mocking.get_protocolinfo_response()
))
# Compare the str representation of these object, because the class
# does not have, nor need, a direct comparison operator.
self.assertEqual(str(mocking.get_protocolinfo_response()), str(self.controller.get_protocolinfo()))
# Raise an exception in the stem.connection.get_protocolinfo() call.
mocking.mock(stem.connection.get_protocolinfo, mocking.raise_exception(ProtocolError))
# Get a default value when the call fails.
self.assertEqual(
"default returned",
self.controller.get_protocolinfo(default = "default returned")
)
# No default value, accept the error.
self.assertRaises(ProtocolError, self.controller.get_protocolinfo)
开发者ID:gsathya,项目名称:stem,代码行数:25,代码来源:controller.py
示例10: test_event_listening
def test_event_listening(self):
"""
Exercises the add_event_listener and remove_event_listener methods.
"""
# set up for failure to create any events
mocking.mock_method(Controller, "get_version", mocking.return_value(stem.version.Version('0.1.0.14')))
self.assertRaises(InvalidRequest, self.controller.add_event_listener, mocking.no_op(), EventType.BW)
# set up to only fail newer events
mocking.mock_method(Controller, "get_version", mocking.return_value(stem.version.Version('0.2.0.35')))
# EventType.BW is one of the earliest events
self.controller.add_event_listener(mocking.no_op(), EventType.BW)
# EventType.SIGNAL was added in tor version 0.2.3.1-alpha
self.assertRaises(InvalidRequest, self.controller.add_event_listener, mocking.no_op(), EventType.SIGNAL)
开发者ID:abcdef123,项目名称:stem,代码行数:17,代码来源:controller.py
示例11: test_get_version
def test_get_version(self):
"""
Exercises the get_version() method.
"""
try:
# Use one version for first check.
version_2_1 = "0.2.1.32"
version_2_1_object = stem.version.Version(version_2_1)
mocking.mock_method(Controller, "get_info", mocking.return_value(version_2_1))
# Return a version with a cold cache.
self.assertEqual(version_2_1_object, self.controller.get_version())
# Use a different version for second check.
version_2_2 = "0.2.2.39"
version_2_2_object = stem.version.Version(version_2_2)
mocking.mock_method(Controller, "get_info", mocking.return_value(version_2_2))
# Return a version with a hot cache, so it will be the old version.
self.assertEqual(version_2_1_object, self.controller.get_version())
# Turn off caching.
self.controller._is_caching_enabled = False
# Return a version without caching, so it will be the new version.
self.assertEqual(version_2_2_object, self.controller.get_version())
# Raise an exception in the get_info() call.
mocking.mock_method(Controller, "get_info", mocking.raise_exception(InvalidArguments))
# Get a default value when the call fails.
self.assertEqual(
"default returned",
self.controller.get_version(default = "default returned")
)
# No default value, accept the error.
self.assertRaises(InvalidArguments, self.controller.get_version)
# Give a bad version. The stem.version.Version ValueError should bubble up.
version_A_42 = "0.A.42.spam"
mocking.mock_method(Controller, "get_info", mocking.return_value(version_A_42))
self.assertRaises(ValueError, self.controller.get_version)
finally:
# Turn caching back on before we leave.
self.controller._is_caching_enabled = True
开发者ID:gsathya,项目名称:stem,代码行数:46,代码来源:controller.py
示例12: test_load_processed_files_empty
def test_load_processed_files_empty(self):
"""
Tests the load_processed_files() function with an empty file.
"""
test_content = StringIO.StringIO("")
mocking.support_with(test_content)
mocking.mock(open, mocking.return_value(test_content))
self.assertEquals({}, stem.descriptor.reader.load_processed_files(""))
开发者ID:ndanger000,项目名称:stem,代码行数:9,代码来源:reader.py
示例13: test_load_processed_files_malformed_file
def test_load_processed_files_malformed_file(self):
"""
Tests the load_processed_files() function content that is malformed because
it has an invalid file path.
"""
test_content = StringIO.StringIO("not_an_absolute_file 12345")
mocking.support_with(test_content)
mocking.mock(open, mocking.return_value(test_content))
self.assertRaises(TypeError, stem.descriptor.reader.load_processed_files, "")
开发者ID:ndanger000,项目名称:stem,代码行数:10,代码来源:reader.py
示例14: test_load_processed_files_malformed_timestamp
def test_load_processed_files_malformed_timestamp(self):
"""
Tests the load_processed_files() function content that is malformed because
it has a non-numeric timestamp.
"""
test_content = StringIO.StringIO("/dir/file 123a")
mocking.support_with(test_content)
mocking.mock(open, mocking.return_value(test_content))
self.assertRaises(TypeError, stem.descriptor.reader.load_processed_files, "")
开发者ID:ndanger000,项目名称:stem,代码行数:10,代码来源:reader.py
示例15: test_attach_stream
def test_attach_stream(self):
"""
Exercises the attach_stream() method.
"""
# Response when the stream is in a state where it can't be attached (for
# instance, it's already open).
response = stem.response.ControlMessage.from_str("555 Connection is not managed by controller.\r\n")
mocking.mock_method(Controller, "msg", mocking.return_value(response))
self.assertRaises(UnsatisfiableRequest, self.controller.attach_stream, 'stream_id', 'circ_id')
开发者ID:axitkhurana,项目名称:stem,代码行数:12,代码来源:controller.py
示例16: test_mirror_mirror_on_the_wall
def test_mirror_mirror_on_the_wall(self):
from stem.descriptor.server_descriptor import RelayDescriptor
from stem.descriptor.reader import DescriptorReader
from stem.util import str_tools
exit_descriptor = mocking.get_relay_server_descriptor({
'router': 'speedyexit 149.255.97.109 9001 0 0'
}, content = True).replace('reject *:*', 'accept *:*')
exit_descriptor = mocking.sign_descriptor_content(exit_descriptor)
exit_descriptor = RelayDescriptor(exit_descriptor)
reader_wrapper = mocking.get_object(DescriptorReader, {
'__enter__': lambda x: x,
'__exit__': mocking.no_op(),
'__iter__': mocking.return_value(iter((
exit_descriptor,
mocking.get_relay_server_descriptor(), # non-exit
exit_descriptor,
exit_descriptor,
)))
})
# provides a mapping of observed bandwidth to the relay nicknames
def get_bw_to_relay():
bw_to_relay = {}
with reader_wrapper as reader:
for desc in reader:
if desc.exit_policy.is_exiting_allowed():
bw_to_relay.setdefault(desc.observed_bandwidth, []).append(desc.nickname)
return bw_to_relay
# prints the top fifteen relays
bw_to_relay = get_bw_to_relay()
count = 1
for bw_value in sorted(bw_to_relay.keys(), reverse = True):
for nickname in bw_to_relay[bw_value]:
expected_line = "%i. speedyexit (102.13 KB/s)" % count
printed_line = "%i. %s (%s/s)" % (count, nickname, str_tools.get_size_label(bw_value, 2))
self.assertEqual(expected_line, printed_line)
count += 1
if count > 15:
return
self.assertEqual(4, count)
开发者ID:bwrichte,项目名称:TorFinalProject,代码行数:50,代码来源:tutorial.py
示例17: test_mirror_mirror_on_the_wall_2
def test_mirror_mirror_on_the_wall_2(self):
def tutorial_example():
from stem.descriptor import parse_file
for desc in parse_file(open("/home/atagar/.tor/cached-consensus")):
print "found relay %s (%s)" % (desc.nickname, desc.fingerprint)
test_file = io.BytesIO(mocking.get_network_status_document_v3(
routers = [mocking.get_router_status_entry_v3()],
content = True,
))
mocking.support_with(test_file)
test_file.name = "/home/atagar/.tor/cached-consensus"
if is_python_3():
import builtins
mocking.mock(open, mocking.return_value(test_file), target_module = builtins)
else:
mocking.mock(open, mocking.return_value(test_file))
tutorial_example()
self.assertEqual("found relay caerSidi (A7569A83B5706AB1B1A9CB52EFF7D2D32E4553EB)\n", self.stdout.getvalue())
开发者ID:ankitmodi,项目名称:Projects,代码行数:23,代码来源:tutorial.py
示例18: test_mirror_mirror_on_the_wall_3
def test_mirror_mirror_on_the_wall_3(self):
def tutorial_example():
from stem.descriptor.reader import DescriptorReader
with DescriptorReader(["/home/atagar/server-descriptors-2013-03.tar"]) as reader:
for desc in reader:
print "found relay %s (%s)" % (desc.nickname, desc.fingerprint)
mocking.mock(
DescriptorReader.__iter__,
mocking.return_value(iter([mocking.get_relay_server_descriptor()])),
target_module = DescriptorReader
)
tutorial_example()
self.assertEqual("found relay caerSidi (None)\n", self.stdout.getvalue())
开发者ID:ankitmodi,项目名称:Projects,代码行数:16,代码来源:tutorial.py
示例19: test_take_ownership_via_pid
def test_take_ownership_via_pid(self):
"""
Checks that the tor process quits after we do if we set take_ownership. To
test this we spawn a process and trick tor into thinking that it is us.
"""
if not stem.prereq.is_python_26() and stem.util.system.is_windows():
test.runner.skip(self, "(unable to kill subprocesses)")
return
elif not stem.util.system.is_available("sleep"):
test.runner.skip(self, "('sleep' command is unavailable)")
return
elif test.runner.only_run_once(self, "test_take_ownership_via_pid"): return
elif test.runner.require_version(self, stem.version.Requirement.TAKEOWNERSHIP): return
# Have os.getpid provide the pid of a process we can safely kill. I hate
# needing to a _get_pid() helper but after much head scratching I haven't
# been able to mock os.getpid() or posix.getpid().
sleep_process = subprocess.Popen(['sleep', '60'])
mocking.mock(stem.process._get_pid, mocking.return_value(str(sleep_process.pid)))
tor_process = stem.process.launch_tor_with_config(
tor_cmd = test.runner.get_runner().get_tor_command(),
config = {
'SocksPort': '2777',
'ControlPort': '2778',
'DataDirectory': DATA_DIRECTORY,
},
completion_percent = 5,
take_ownership = True,
)
# Kill the sleep command. Tor should quit shortly after.
_kill_process(sleep_process)
# tor polls for the process every fifteen seconds so this may take a
# while...
for seconds_waited in xrange(30):
if tor_process.poll() == 0:
return # tor exited
time.sleep(1)
self.fail("tor didn't quit after the process that owned it terminated")
开发者ID:abcdef123,项目名称:stem,代码行数:47,代码来源:process.py
示例20: test_expand_path_unix
def test_expand_path_unix(self):
"""
Tests the expand_path function. This does not exercise home directory
expansions since that deals with our environment (that's left to integ
tests).
"""
mocking.mock(platform.system, mocking.return_value("Linux"))
mocking.mock(os.path.join, posixpath.join, os.path)
self.assertEquals("", system.expand_path(""))
self.assertEquals("/tmp", system.expand_path("/tmp"))
self.assertEquals("/tmp", system.expand_path("/tmp/"))
self.assertEquals("/tmp", system.expand_path(".", "/tmp"))
self.assertEquals("/tmp", system.expand_path("./", "/tmp"))
self.assertEquals("/tmp/foo", system.expand_path("foo", "/tmp"))
self.assertEquals("/tmp/foo", system.expand_path("./foo", "/tmp"))
开发者ID:bwrichte,项目名称:TorFinalProject,代码行数:17,代码来源:system.py
注:本文中的test.mocking.return_value函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论