• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python mocking.return_value函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中test.mocking.return_value函数的典型用法代码示例。如果您正苦于以下问题:Python return_value函数的具体用法?Python return_value怎么用?Python return_value使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了return_value函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_get_socks_listeners_new

  def test_get_socks_listeners_new(self):
    """
    Exercises the get_socks_listeners() method as if talking to a newer tor
    instance.
    """

    # multiple SOCKS listeners
    mocking.mock_method(Controller, "get_info", mocking.return_value(
      '"127.0.0.1:1112" "127.0.0.1:1114"'
    ))

    self.assertEqual(
      [('127.0.0.1', 1112), ('127.0.0.1', 1114)],
      self.controller.get_socks_listeners()
    )

    # no SOCKS listeners
    mocking.mock_method(Controller, "get_info", mocking.return_value(""))
    self.assertEqual([], self.controller.get_socks_listeners())

    # check where GETINFO provides malformed content

    invalid_responses = (
      '"127.0.0.1"',         # address only
      '"1112"',              # port only
      '"5127.0.0.1:1112"',   # invlaid address
      '"127.0.0.1:991112"',  # invalid port
    )

    for response in invalid_responses:
      mocking.mock_method(Controller, "get_info", mocking.return_value(response))
      self.assertRaises(stem.ProtocolError, self.controller.get_socks_listeners)
开发者ID:gsathya,项目名称:stem,代码行数:32,代码来源:controller.py


示例2: test_mirror_mirror_on_the_wall_1

  def test_mirror_mirror_on_the_wall_1(self):
    def tutorial_example():
      from stem.control import Controller

      with Controller.from_port(control_port = 9051) as controller:
        controller.authenticate()

        for desc in controller.get_network_statuses():
          print "found relay %s (%s)" % (desc.nickname, desc.fingerprint)

    controller = mocking.get_object(Controller, {
      'authenticate': mocking.no_op(),
      'close': mocking.no_op(),
      'get_network_statuses': mocking.return_value(
        [mocking.get_router_status_entry_v2()],
      ),
    })

    mocking.mock(
      Controller.from_port, mocking.return_value(controller),
      target_module = Controller,
      is_static = True,
    )

    tutorial_example()
    self.assertEqual("found relay caerSidi (A7569A83B5706AB1B1A9CB52EFF7D2D32E4553EB)\n", self.stdout.getvalue())
开发者ID:ankitmodi,项目名称:Projects,代码行数:26,代码来源:tutorial.py


示例3: test_mirror_mirror_on_the_wall_4

  def test_mirror_mirror_on_the_wall_4(self):
    def tutorial_example():
      from stem.control import Controller
      from stem.util import str_tools

      # provides a mapping of observed bandwidth to the relay nicknames
      def get_bw_to_relay():
        bw_to_relay = {}

        with Controller.from_port(control_port = 9051) as controller:
          controller.authenticate()

          for desc in controller.get_server_descriptors():
            if desc.exit_policy.is_exiting_allowed():
              bw_to_relay.setdefault(desc.observed_bandwidth, []).append(desc.nickname)

        return bw_to_relay

      # prints the top fifteen relays

      bw_to_relay = get_bw_to_relay()
      count = 1

      for bw_value in sorted(bw_to_relay.keys(), reverse = True):
        for nickname in bw_to_relay[bw_value]:
          print "%i. %s (%s/s)" % (count, nickname, str_tools.get_size_label(bw_value, 2))
          count += 1

          if count > 15:
            return

    exit_descriptor = mocking.get_relay_server_descriptor({
      'router': 'speedyexit 149.255.97.109 9001 0 0'
    }, content = True).replace(b'reject *:*', b'accept *:*')

    exit_descriptor = mocking.sign_descriptor_content(exit_descriptor)
    exit_descriptor = RelayDescriptor(exit_descriptor)

    controller = mocking.get_object(Controller, {
      'authenticate': mocking.no_op(),
      'close': mocking.no_op(),
      'get_server_descriptors': mocking.return_value([
        exit_descriptor,
        mocking.get_relay_server_descriptor(),  # non-exit
        exit_descriptor,
        exit_descriptor,
      ])
    })

    mocking.mock(
      Controller.from_port, mocking.return_value(controller),
      target_module = Controller,
      is_static = True,
    )

    tutorial_example()
    self.assertEqual(MIRROR_MIRROR_OUTPUT, self.stdout.getvalue())
开发者ID:ankitmodi,项目名称:Projects,代码行数:57,代码来源:tutorial.py


示例4: _mock_open

def _mock_open(content):
  test_content = StringIO.StringIO(content)
  mocking.support_with(test_content)

  if stem.prereq.is_python_3():
    import builtins
    mocking.mock(builtins.open, mocking.return_value(test_content), builtins)
  else:
    mocking.mock(open, mocking.return_value(test_content))
开发者ID:ankitmodi,项目名称:Projects,代码行数:9,代码来源:reader.py


示例5: test_the_little_relay_that_could

  def test_the_little_relay_that_could(self):
    def tutorial_example():
      from stem.control import Controller

      with Controller.from_port(control_port = 9051) as controller:
        controller.authenticate()  # provide the password here if you set one

        bytes_read = controller.get_info("traffic/read")
        bytes_written = controller.get_info("traffic/written")

        print "My Tor relay has read %s bytes and written %s." % (bytes_read, bytes_written)

    controller = mocking.get_object(Controller, {
      'authenticate': mocking.no_op(),
      'close': mocking.no_op(),
      'get_info': mocking.return_for_args({
        ('traffic/read',): '33406',
        ('traffic/written',): '29649',
      }, is_method = True),
    })

    mocking.mock(
      Controller.from_port, mocking.return_value(controller),
      target_module = Controller,
      is_static = True,
    )

    tutorial_example()
    self.assertEqual("My Tor relay has read 33406 bytes and written 29649.\n", self.stdout.getvalue())
开发者ID:ankitmodi,项目名称:Projects,代码行数:29,代码来源:tutorial.py


示例6: test_get_stats

  def test_get_stats(self):
    """
    Tests get_stats() with all combinations of stat_type arguments.
    """

    # list of all combinations of args with respective return values
    stat_combinations = mocking.get_all_combinations([
      ('command', 'test_program'),
      ('utime', '0.13'),
      ('stime', '0.14'),
      ('start time', '10.21'),
    ])

    stat_path = "/proc/24062/stat"
    stat = '1 (test_program) 2 3 4 5 6 7 8 9 10 11 12 13.0 14.0 15 16 17 18 19 20 21.0 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43'

    mocking.mock(proc.get_system_start_time, mocking.return_value(10))

    # tests the case where no stat_types are specified
    mocking.mock(proc._get_line, mocking.return_for_args({
      (stat_path, '24062', 'process '): stat
    }))

    self.assertEquals((), proc.get_stats(24062))

    for stats in stat_combinations:
      # the stats variable is...
      #   [(arg1, resp1), (arg2, resp2)...]
      #
      # but we need...
      #   (arg1, arg2...), (resp1, resp2...).

      args, response = zip(*stats)

      mocking.mock(proc._get_line, mocking.return_for_args({
        (stat_path, '24062', 'process %s' % ', '.join(args)): stat
      }))

      self.assertEquals(response, proc.get_stats(24062, *args))

      # tests the case where pid = 0

      if 'start time' in args:
        response = 10
      else:
        response = ()

        for arg in args:
          if arg == 'command':
            response += ('sched',)
          elif arg == 'utime':
            response += ('0',)
          elif arg == 'stime':
            response += ('0',)

      mocking.mock(proc._get_line, mocking.return_for_args({
        ('/proc/0/stat', '0', 'process %s' % ', '.join(args)): stat
      }))

      self.assertEquals(response, proc.get_stats(0, *args))
开发者ID:ankitmodi,项目名称:Projects,代码行数:60,代码来源:proc.py


示例7: test_get_streams

  def test_get_streams(self):
    """
    Exercises the get_streams() method.
    """

    # get a list of fake, but good looking, streams
    valid_streams = (
      ("1", "NEW", "4", "10.10.10.1:80"),
      ("2", "SUCCEEDED", "4", "10.10.10.1:80"),
      ("3", "SUCCEEDED", "4", "10.10.10.1:80")
    )

    responses = ["%s\r\n" % " ".join(entry) for entry in valid_streams]

    mocking.mock_method(Controller, "get_info", mocking.return_value(
      "".join(responses)
    ))

    streams = self.controller.get_streams()
    self.assertEqual(len(valid_streams), len(streams))

    for index, stream in enumerate(streams):
      self.assertEqual(valid_streams[index][0], stream.id)
      self.assertEqual(valid_streams[index][1], stream.status)
      self.assertEqual(valid_streams[index][2], stream.circ_id)
      self.assertEqual(valid_streams[index][3], stream.target)
开发者ID:gsathya,项目名称:stem,代码行数:26,代码来源:controller.py


示例8: test_load_processed_files

    def test_load_processed_files(self):
        """
    Successful load of content.
    """

        test_lines = (
            "/dir/ 0",
            "/dir/file 12345",
            "/dir/file with spaces 7138743",
            "  /dir/with extra space 12345   ",
            "   \t   ",
            "",
            "/dir/after empty line 12345",
        )

        expected_value = {
            "/dir/": 0,
            "/dir/file": 12345,
            "/dir/file with spaces": 7138743,
            "/dir/with extra space": 12345,
            "/dir/after empty line": 12345,
        }

        test_content = StringIO.StringIO("\n".join(test_lines))
        mocking.support_with(test_content)
        mocking.mock(open, mocking.return_value(test_content))
        self.assertEquals(expected_value, stem.descriptor.reader.load_processed_files(""))
开发者ID:ndanger000,项目名称:stem,代码行数:27,代码来源:reader.py


示例9: test_get_protocolinfo

  def test_get_protocolinfo(self):
    """
    Exercises the get_protocolinfo() method.
    """

    # Use the handy mocked protocolinfo response.
    mocking.mock(stem.connection.get_protocolinfo, mocking.return_value(
      mocking.get_protocolinfo_response()
    ))
    # Compare the str representation of these object, because the class
    # does not have, nor need, a direct comparison operator.
    self.assertEqual(str(mocking.get_protocolinfo_response()), str(self.controller.get_protocolinfo()))

    # Raise an exception in the stem.connection.get_protocolinfo() call.
    mocking.mock(stem.connection.get_protocolinfo, mocking.raise_exception(ProtocolError))

    # Get a default value when the call fails.

    self.assertEqual(
      "default returned",
      self.controller.get_protocolinfo(default = "default returned")
    )

    # No default value, accept the error.
    self.assertRaises(ProtocolError, self.controller.get_protocolinfo)
开发者ID:gsathya,项目名称:stem,代码行数:25,代码来源:controller.py


示例10: test_event_listening

 def test_event_listening(self):
   """
   Exercises the add_event_listener and remove_event_listener methods.
   """
   
   # set up for failure to create any events
   mocking.mock_method(Controller, "get_version", mocking.return_value(stem.version.Version('0.1.0.14')))
   self.assertRaises(InvalidRequest, self.controller.add_event_listener, mocking.no_op(), EventType.BW)
   
   # set up to only fail newer events
   mocking.mock_method(Controller, "get_version", mocking.return_value(stem.version.Version('0.2.0.35')))
   
   # EventType.BW is one of the earliest events
   self.controller.add_event_listener(mocking.no_op(), EventType.BW)
   
   # EventType.SIGNAL was added in tor version 0.2.3.1-alpha
   self.assertRaises(InvalidRequest, self.controller.add_event_listener, mocking.no_op(), EventType.SIGNAL)
开发者ID:abcdef123,项目名称:stem,代码行数:17,代码来源:controller.py


示例11: test_get_version

  def test_get_version(self):
    """
    Exercises the get_version() method.
    """

    try:
      # Use one version for first check.
      version_2_1 = "0.2.1.32"
      version_2_1_object = stem.version.Version(version_2_1)
      mocking.mock_method(Controller, "get_info", mocking.return_value(version_2_1))

      # Return a version with a cold cache.
      self.assertEqual(version_2_1_object, self.controller.get_version())

      # Use a different version for second check.
      version_2_2 = "0.2.2.39"
      version_2_2_object = stem.version.Version(version_2_2)
      mocking.mock_method(Controller, "get_info", mocking.return_value(version_2_2))

      # Return a version with a hot cache, so it will be the old version.
      self.assertEqual(version_2_1_object, self.controller.get_version())

      # Turn off caching.
      self.controller._is_caching_enabled = False
      # Return a version without caching, so it will be the new version.
      self.assertEqual(version_2_2_object, self.controller.get_version())

      # Raise an exception in the get_info() call.
      mocking.mock_method(Controller, "get_info", mocking.raise_exception(InvalidArguments))

      # Get a default value when the call fails.
      self.assertEqual(
        "default returned",
        self.controller.get_version(default = "default returned")
      )

      # No default value, accept the error.
      self.assertRaises(InvalidArguments, self.controller.get_version)

      # Give a bad version.  The stem.version.Version ValueError should bubble up.
      version_A_42 = "0.A.42.spam"
      mocking.mock_method(Controller, "get_info", mocking.return_value(version_A_42))
      self.assertRaises(ValueError, self.controller.get_version)
    finally:
      # Turn caching back on before we leave.
      self.controller._is_caching_enabled = True
开发者ID:gsathya,项目名称:stem,代码行数:46,代码来源:controller.py


示例12: test_load_processed_files_empty

    def test_load_processed_files_empty(self):
        """
    Tests the load_processed_files() function with an empty file.
    """

        test_content = StringIO.StringIO("")
        mocking.support_with(test_content)
        mocking.mock(open, mocking.return_value(test_content))
        self.assertEquals({}, stem.descriptor.reader.load_processed_files(""))
开发者ID:ndanger000,项目名称:stem,代码行数:9,代码来源:reader.py


示例13: test_load_processed_files_malformed_file

    def test_load_processed_files_malformed_file(self):
        """
    Tests the load_processed_files() function content that is malformed because
    it has an invalid file path.
    """

        test_content = StringIO.StringIO("not_an_absolute_file 12345")
        mocking.support_with(test_content)
        mocking.mock(open, mocking.return_value(test_content))
        self.assertRaises(TypeError, stem.descriptor.reader.load_processed_files, "")
开发者ID:ndanger000,项目名称:stem,代码行数:10,代码来源:reader.py


示例14: test_load_processed_files_malformed_timestamp

    def test_load_processed_files_malformed_timestamp(self):
        """
    Tests the load_processed_files() function content that is malformed because
    it has a non-numeric timestamp.
    """

        test_content = StringIO.StringIO("/dir/file 123a")
        mocking.support_with(test_content)
        mocking.mock(open, mocking.return_value(test_content))
        self.assertRaises(TypeError, stem.descriptor.reader.load_processed_files, "")
开发者ID:ndanger000,项目名称:stem,代码行数:10,代码来源:reader.py


示例15: test_attach_stream

  def test_attach_stream(self):
    """
    Exercises the attach_stream() method.
    """

    # Response when the stream is in a state where it can't be attached (for
    # instance, it's already open).

    response = stem.response.ControlMessage.from_str("555 Connection is not managed by controller.\r\n")
    mocking.mock_method(Controller, "msg", mocking.return_value(response))

    self.assertRaises(UnsatisfiableRequest, self.controller.attach_stream, 'stream_id', 'circ_id')
开发者ID:axitkhurana,项目名称:stem,代码行数:12,代码来源:controller.py


示例16: test_mirror_mirror_on_the_wall

  def test_mirror_mirror_on_the_wall(self):
    from stem.descriptor.server_descriptor import RelayDescriptor
    from stem.descriptor.reader import DescriptorReader
    from stem.util import str_tools

    exit_descriptor = mocking.get_relay_server_descriptor({
     'router': 'speedyexit 149.255.97.109 9001 0 0'
    }, content = True).replace('reject *:*', 'accept *:*')
    exit_descriptor = mocking.sign_descriptor_content(exit_descriptor)
    exit_descriptor = RelayDescriptor(exit_descriptor)

    reader_wrapper = mocking.get_object(DescriptorReader, {
      '__enter__': lambda x: x,
      '__exit__': mocking.no_op(),
      '__iter__': mocking.return_value(iter((
        exit_descriptor,
        mocking.get_relay_server_descriptor(),  # non-exit
        exit_descriptor,
        exit_descriptor,
      )))
    })

    # provides a mapping of observed bandwidth to the relay nicknames
    def get_bw_to_relay():
      bw_to_relay = {}

      with reader_wrapper as reader:
        for desc in reader:
          if desc.exit_policy.is_exiting_allowed():
            bw_to_relay.setdefault(desc.observed_bandwidth, []).append(desc.nickname)

      return bw_to_relay

    # prints the top fifteen relays

    bw_to_relay = get_bw_to_relay()
    count = 1

    for bw_value in sorted(bw_to_relay.keys(), reverse = True):
      for nickname in bw_to_relay[bw_value]:
        expected_line = "%i. speedyexit (102.13 KB/s)" % count
        printed_line = "%i. %s (%s/s)" % (count, nickname, str_tools.get_size_label(bw_value, 2))
        self.assertEqual(expected_line, printed_line)

        count += 1

        if count > 15:
          return

    self.assertEqual(4, count)
开发者ID:bwrichte,项目名称:TorFinalProject,代码行数:50,代码来源:tutorial.py


示例17: test_mirror_mirror_on_the_wall_2

  def test_mirror_mirror_on_the_wall_2(self):
    def tutorial_example():
      from stem.descriptor import parse_file

      for desc in parse_file(open("/home/atagar/.tor/cached-consensus")):
        print "found relay %s (%s)" % (desc.nickname, desc.fingerprint)

    test_file = io.BytesIO(mocking.get_network_status_document_v3(
      routers = [mocking.get_router_status_entry_v3()],
      content = True,
    ))

    mocking.support_with(test_file)
    test_file.name = "/home/atagar/.tor/cached-consensus"

    if is_python_3():
      import builtins
      mocking.mock(open, mocking.return_value(test_file), target_module = builtins)
    else:
      mocking.mock(open, mocking.return_value(test_file))

    tutorial_example()
    self.assertEqual("found relay caerSidi (A7569A83B5706AB1B1A9CB52EFF7D2D32E4553EB)\n", self.stdout.getvalue())
开发者ID:ankitmodi,项目名称:Projects,代码行数:23,代码来源:tutorial.py


示例18: test_mirror_mirror_on_the_wall_3

  def test_mirror_mirror_on_the_wall_3(self):
    def tutorial_example():
      from stem.descriptor.reader import DescriptorReader

      with DescriptorReader(["/home/atagar/server-descriptors-2013-03.tar"]) as reader:
        for desc in reader:
          print "found relay %s (%s)" % (desc.nickname, desc.fingerprint)

    mocking.mock(
      DescriptorReader.__iter__,
      mocking.return_value(iter([mocking.get_relay_server_descriptor()])),
      target_module = DescriptorReader
    )

    tutorial_example()
    self.assertEqual("found relay caerSidi (None)\n", self.stdout.getvalue())
开发者ID:ankitmodi,项目名称:Projects,代码行数:16,代码来源:tutorial.py


示例19: test_take_ownership_via_pid

 def test_take_ownership_via_pid(self):
   """
   Checks that the tor process quits after we do if we set take_ownership. To
   test this we spawn a process and trick tor into thinking that it is us.
   """
   
   if not stem.prereq.is_python_26() and stem.util.system.is_windows():
     test.runner.skip(self, "(unable to kill subprocesses)")
     return
   elif not stem.util.system.is_available("sleep"):
     test.runner.skip(self, "('sleep' command is unavailable)")
     return
   elif test.runner.only_run_once(self, "test_take_ownership_via_pid"): return
   elif test.runner.require_version(self, stem.version.Requirement.TAKEOWNERSHIP): return
   
   # Have os.getpid provide the pid of a process we can safely kill. I hate
   # needing to a _get_pid() helper but after much head scratching I haven't
   # been able to mock os.getpid() or posix.getpid().
   
   sleep_process = subprocess.Popen(['sleep', '60'])
   mocking.mock(stem.process._get_pid, mocking.return_value(str(sleep_process.pid)))
   
   tor_process = stem.process.launch_tor_with_config(
     tor_cmd = test.runner.get_runner().get_tor_command(),
     config = {
       'SocksPort': '2777',
       'ControlPort': '2778',
       'DataDirectory': DATA_DIRECTORY,
     },
     completion_percent = 5,
     take_ownership = True,
   )
   
   # Kill the sleep command. Tor should quit shortly after.
   
   _kill_process(sleep_process)
   
   # tor polls for the process every fifteen seconds so this may take a
   # while...
   
   for seconds_waited in xrange(30):
     if tor_process.poll() == 0:
       return # tor exited
     
     time.sleep(1)
   
   self.fail("tor didn't quit after the process that owned it terminated")
开发者ID:abcdef123,项目名称:stem,代码行数:47,代码来源:process.py


示例20: test_expand_path_unix

  def test_expand_path_unix(self):
    """
    Tests the expand_path function. This does not exercise home directory
    expansions since that deals with our environment (that's left to integ
    tests).
    """

    mocking.mock(platform.system, mocking.return_value("Linux"))
    mocking.mock(os.path.join, posixpath.join, os.path)

    self.assertEquals("", system.expand_path(""))
    self.assertEquals("/tmp", system.expand_path("/tmp"))
    self.assertEquals("/tmp", system.expand_path("/tmp/"))
    self.assertEquals("/tmp", system.expand_path(".", "/tmp"))
    self.assertEquals("/tmp", system.expand_path("./", "/tmp"))
    self.assertEquals("/tmp/foo", system.expand_path("foo", "/tmp"))
    self.assertEquals("/tmp/foo", system.expand_path("./foo", "/tmp"))
开发者ID:bwrichte,项目名称:TorFinalProject,代码行数:17,代码来源:system.py



注:本文中的test.mocking.return_value函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python output.println函数代码示例发布时间:2022-05-27
下一篇:
Python mocking.mock函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap