本文整理汇总了Python中unittest.mock.mock_open函数的典型用法代码示例。如果您正苦于以下问题:Python mock_open函数的具体用法?Python mock_open怎么用?Python mock_open使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了mock_open函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_explicit_mock
def test_explicit_mock(self):
mock = MagicMock()
mock_open(mock)
with patch('%s.open' % __name__, mock, create=True) as patched:
self.assertIs(patched, mock)
open('foo')
mock.assert_called_once_with('foo')
开发者ID:johndpope,项目名称:sims4-ai-engine,代码行数:7,代码来源:testwith.py
示例2: test_using_gzip_if_header_present_and_file_available
def test_using_gzip_if_header_present_and_file_available(loop):
request = make_mocked_request(
'GET', URL('http://python.org/logo.png'), headers={
hdrs.ACCEPT_ENCODING: 'gzip'
}
)
gz_filepath = mock.Mock()
gz_filepath.open = mock.mock_open()
gz_filepath.is_file.return_value = True
gz_filepath.stat.return_value = mock.MagicMock()
gz_filepath.stat.st_size = 1024
filepath = mock.Mock()
filepath.name = 'logo.png'
filepath.open = mock.mock_open()
filepath.with_name.return_value = gz_filepath
file_sender = FileSender()
file_sender._sendfile = make_mocked_coro(None)
loop.run_until_complete(file_sender.send(request, filepath))
assert not filepath.open.called
assert gz_filepath.open.called
开发者ID:cr0hn,项目名称:aiohttp,代码行数:25,代码来源:test_web_sendfile.py
示例3: test_write_bed_file
def test_write_bed_file(self):
v = VariantFileObject(Variant)
m = mock.mock_open(read_data='\n'.join(self.vcf_content))
m.return_value.__iter__ = lambda self: self
m.return_value.__next__ = lambda self: self.readline()
with mock.patch(builtin_open,m ):
v.load_variant_file('test.vcf')
v.drop_indels()
fh = mock.mock_open()
with mock.patch(builtin_open, fh):
v.write_bed_file('output.bed')
calls = mock.call
calls_list = [calls('output.bed' ,'w')]
fh.assert_has_calls(calls_list, any_order=True)
handle = fh()
calls_list = [
calls("1\t10\t10\t1;A;C\n",),
calls("2\t10\t10\t2;A;C\n",)
]
handle.write.assert_has_calls(calls_list)
v.write_bed_file('output_with_penalty.bed', -3, 3)
calls = mock.call
calls_list = [calls('output_with_penalty.bed' ,'w')]
fh.assert_has_calls(calls_list, any_order=True)
handle = fh()
calls_list = [
calls("1\t7\t13\t1;A;C\n",),
calls("2\t7\t13\t2;A;C\n",)
]
handle.write.assert_has_calls(calls_list)
开发者ID:xiuchengquek,项目名称:VCFtoMutect,代码行数:34,代码来源:test_variantFileObject.py
示例4: test_UploadedImagesTracker
def test_UploadedImagesTracker(self, pMockForOpen, pMockForFlock):
#Test assertion raised when flock fails.
mocked_open = mock_open()
with patch("imguploader.open", mocked_open, create=True):
with patch("fcntl.flock", MagicMock(side_effect=IOError)):
self.assertRaises(imguploader.UploadedImagesTrackerLockAcquiringFailed, imguploader.UploadedImagesTracker, "directory")
#Test assertion raised when the activity log file is corrupted.
mocked_open = mock_open(read_data="noooooooooooo")
with patch("imguploader.open", mocked_open, create=True):
with patch("fcntl.flock", MagicMock(return_value=None)):
self.assertRaises(imguploader.UploadedImagesTrackerException, imguploader.UploadedImagesTracker, "directory")
#Test proper construction of the UploadedImagesTracker.getImageList() returned list.
mocked_open = mock_open(read_data="fileName<URLfull<URLthumb")
with patch("imguploader.open", mocked_open, create=True):
with patch("fcntl.flock", MagicMock(return_value=None)):
entry = imguploader.UploadedImagesTracker("directory")
assert(entry.getImageList()[0].getImageFileName() == "fileName")
assert(entry.isImageAlreadyUploaded("fileName") == True)
#Test for proper call to close() on the activity log file.
lOpenMocked = mock_open(read_data="")
with patch("imguploader.open", lOpenMocked, create=True):
with imguploader.UploadedImagesTracker("directory"):
pass
lOpenMocked().close.assert_called_once_with()
#Test for UploadedImagesTracker.addUploadedImage()
uploadedImagesTracker = imguploader.UploadedImagesTracker("directory")
uploadedImagesTracker.addUploadedImage("imageFileName", "FullURL", "ThumbURL")
assert(uploadedImagesTracker._uploadedImages[0].getImageFileName() == "imageFileName")
assert(uploadedImagesTracker._uploadedImages[0].getURLFullImage() == "FullURL")
assert(uploadedImagesTracker._uploadedImages[0].getURLThumbImage() == "ThumbURL")
开发者ID:lukka,项目名称:imguploader,代码行数:34,代码来源:tests.py
示例5: test_saving_and_loading
def test_saving_and_loading(hass):
"""Test that we're saving and loading correctly."""
class TestFlow(data_entry_flow.FlowHandler):
VERSION = 5
@asyncio.coroutine
def async_step_init(self, user_input=None):
return self.async_create_entry(
title='Test Title',
data={
'token': 'abcd'
}
)
with patch.dict(config_entries.HANDLERS, {'test': TestFlow}):
yield from hass.config_entries.flow.async_init('test')
class Test2Flow(data_entry_flow.FlowHandler):
VERSION = 3
@asyncio.coroutine
def async_step_init(self, user_input=None):
return self.async_create_entry(
title='Test 2 Title',
data={
'username': 'bla'
}
)
json_path = 'homeassistant.util.json.open'
with patch('homeassistant.config_entries.HANDLERS.get',
return_value=Test2Flow), \
patch.object(config_entries, 'SAVE_DELAY', 0):
yield from hass.config_entries.flow.async_init('test')
with patch(json_path, mock_open(), create=True) as mock_write:
# To trigger the call_later
yield from asyncio.sleep(0, loop=hass.loop)
# To execute the save
yield from hass.async_block_till_done()
# Mock open calls are: open file, context enter, write, context leave
written = mock_write.mock_calls[2][1][0]
# Now load written data in new config manager
manager = config_entries.ConfigEntries(hass, {})
with patch('os.path.isfile', return_value=True), \
patch(json_path, mock_open(read_data=written), create=True):
yield from manager.async_load()
# Ensure same order
for orig, loaded in zip(hass.config_entries.async_entries(),
manager.async_entries()):
assert orig.version == loaded.version
assert orig.domain == loaded.domain
assert orig.title == loaded.title
assert orig.data == loaded.data
assert orig.source == loaded.source
开发者ID:tucka,项目名称:home-assistant,代码行数:60,代码来源:test_config_entries.py
示例6: test_all_node_names_missing
def test_all_node_names_missing(self):
(self.config
.corosync_conf.load(
filename="corosync-no-node-names.conf",
instead="corosync_conf.load"
)
.fs.open(
settings.pcsd_cert_location,
mock.mock_open(read_data=self.pcsd_ssl_cert)(),
name="fs.open.pcsd_ssl_cert"
)
.fs.open(
settings.pcsd_key_location,
mock.mock_open(read_data=self.pcsd_ssl_key)(),
name="fs.open.pcsd_ssl_key"
)
)
self.env_assist.assert_raise_library_error(
lambda: pcsd.synchronize_ssl_certificate(self.env_assist.get_env()),
[]
)
self.env_assist.assert_reports(
[
fixture.warn(
report_codes.COROSYNC_CONFIG_MISSING_NAMES_OF_NODES,
fatal=False,
),
fixture.error(
report_codes.COROSYNC_CONFIG_NO_NODES_DEFINED,
),
]
)
开发者ID:tomjelinek,项目名称:pcs,代码行数:33,代码来源:test_pcsd.py
示例7: test_load
def test_load(self):
"""Load redirects from specified file or self.file_name."""
rd1 = Redirect('/one', '/two', 302)
rd2 = Redirect('/three', '/four', 302)
rd3 = Redirect('/five', '/six', 302)
rdf = RedirectsFile('/tmp/foo.json')
rdf.add_redirect(rd1)
rdf.add_redirect(rd2)
rdf.add_redirect(rd3)
json_file = StringIO()
json_file.write(json.dumps([rd.to_JSON() for rd in rdf.redirects]))
json_file.seek(0)
rdf2 = RedirectsFile('/tmp/foo.json')
m = mock.mock_open()
with mock.patch('builtins.open', m, create=True):
m.return_value = json_file
rdf2.load('/tmp/bar.json')
m.assert_called_with('/tmp/bar.json', 'r', encoding='utf-8')
json_file = StringIO()
json_file.write(json.dumps([rd.to_JSON() for rd in rdf.redirects]))
json_file.seek(0)
rdf2 = RedirectsFile('/tmp/foo.json')
m = mock.mock_open()
with mock.patch('builtins.open', m, create=True):
m.return_value = json_file
rdf2.load()
m.assert_called_with('/tmp/foo.json', 'r', encoding='utf-8')
开发者ID:TheLetterN,项目名称:sgs-flask,代码行数:27,代码来源:test_redirects.py
示例8: test_generate_command_uwsgi_service_option_nginx_conf_redhat
def test_generate_command_uwsgi_service_option_nginx_conf_redhat(self, mock_os_path_isfile, mock_file, mock_env,
mock_os_path_exists, mock_linux_distribution,
mock_context):
mock_args = mock.MagicMock()
mock_args.type = GEN_UWSGI_SERVICE_OPTION
mock_args.directory = None
mock_os_path_isfile.return_value = False
mock_env.side_effect = ['/foo/conda', 'conda_env']
mock_os_path_exists.return_value = True
mock_linux_distribution.return_value = ['redhat']
# First open is for the Template, next two are for /etc/nginx/nginx.conf and /etc/passwd, and the final
# open is to "write" out the resulting file. The middle two opens return information about a user, while
# the first and last use MagicMock.
handlers = (mock_file.return_value,
mock.mock_open(read_data='user foo_user').return_value,
mock.mock_open(read_data='foo_user:x:1000:1000:Foo User,,,:/foo/nginx:/bin/bash').return_value,
mock_file.return_value)
mock_file.side_effect = handlers
generate_command(args=mock_args)
mock_os_path_isfile.assert_called_once()
mock_file.assert_called()
mock_env.assert_any_call('CONDA_HOME')
mock_env.assert_called_with('CONDA_ENV_NAME')
mock_os_path_exists.assert_called_once_with('/etc/nginx/nginx.conf')
context = mock_context().update.call_args_list[0][0][0]
self.assertEqual('http-', context['user_option_prefix'])
开发者ID:SarvaPulla,项目名称:tethys,代码行数:28,代码来源:test_gen_commands.py
示例9: test_fail_some_nodes_unknown
def test_fail_some_nodes_unknown(self):
(self.config
.env.set_known_nodes(self.node_names[1:])
.fs.open(
settings.pcsd_cert_location,
mock.mock_open(read_data=self.pcsd_ssl_cert)(),
name="fs.open.pcsd_ssl_cert"
)
.fs.open(
settings.pcsd_key_location,
mock.mock_open(read_data=self.pcsd_ssl_key)(),
name="fs.open.pcsd_ssl_key"
)
)
self.env_assist.assert_raise_library_error(
lambda: pcsd.synchronize_ssl_certificate(self.env_assist.get_env()),
[]
)
self.env_assist.assert_reports(
[
fixture.error(
report_codes.HOST_NOT_FOUND,
force_code=report_codes.SKIP_OFFLINE_NODES,
host_list=[self.node_names[0]]
),
]
)
开发者ID:tomjelinek,项目名称:pcs,代码行数:28,代码来源:test_pcsd.py
示例10: test_gzip_if_header_present_and_file_not_available
def test_gzip_if_header_present_and_file_not_available(loop) -> None:
request = make_mocked_request(
'GET', 'http://python.org/logo.png', headers={
hdrs.ACCEPT_ENCODING: 'gzip'
}
)
gz_filepath = mock.Mock()
gz_filepath.open = mock.mock_open()
gz_filepath.is_file.return_value = False
filepath = mock.Mock()
filepath.name = 'logo.png'
filepath.open = mock.mock_open()
filepath.with_name.return_value = gz_filepath
filepath.stat.return_value = mock.MagicMock()
filepath.stat.st_size = 1024
file_sender = FileResponse(filepath)
file_sender._sendfile = make_mocked_coro(None)
loop.run_until_complete(file_sender.prepare(request))
assert filepath.open.called
assert not gz_filepath.open.called
开发者ID:KeepSafe,项目名称:aiohttp,代码行数:25,代码来源:test_web_sendfile.py
示例11: test_success
def test_success(self):
(self.config
.fs.open(
settings.pcsd_cert_location,
mock.mock_open(read_data=self.pcsd_ssl_cert)(),
name="fs.open.pcsd_ssl_cert"
)
.fs.open(
settings.pcsd_key_location,
mock.mock_open(read_data=self.pcsd_ssl_key)(),
name="fs.open.pcsd_ssl_key"
)
.http.host.send_pcsd_cert(
cert=self.pcsd_ssl_cert,
key=self.pcsd_ssl_key,
node_labels=self.node_names
)
)
pcsd.synchronize_ssl_certificate(self.env_assist.get_env())
self.env_assist.assert_reports(
[
fixture.info(
report_codes.PCSD_SSL_CERT_AND_KEY_DISTRIBUTION_STARTED,
node_name_list=self.node_names
)
]
+
[
fixture.info(
report_codes.PCSD_SSL_CERT_AND_KEY_SET_SUCCESS,
node=node,
) for node in self.node_names
]
)
开发者ID:tomjelinek,项目名称:pcs,代码行数:35,代码来源:test_pcsd.py
示例12: test_loading_saving_data
def test_loading_saving_data(hass, registry):
"""Test that we load/save data correctly."""
orig_entry1 = registry.async_get_or_create('light', 'hue', '1234')
orig_entry2 = registry.async_get_or_create('light', 'hue', '5678')
assert len(registry.entities) == 2
with patch(YAML__OPEN_PATH, mock_open(), create=True) as mock_write:
yield from registry._async_save()
# Mock open calls are: open file, context enter, write, context leave
written = mock_write.mock_calls[2][1][0]
# Now load written data in new registry
registry2 = entity_registry.EntityRegistry(hass)
with patch('os.path.isfile', return_value=True), \
patch(YAML__OPEN_PATH, mock_open(read_data=written), create=True):
yield from registry2._async_load()
# Ensure same order
assert list(registry.entities) == list(registry2.entities)
new_entry1 = registry.async_get_or_create('light', 'hue', '1234')
new_entry2 = registry.async_get_or_create('light', 'hue', '5678')
assert orig_entry1 == new_entry1
assert orig_entry2 == new_entry2
开发者ID:BaptisteSim,项目名称:home-assistant,代码行数:27,代码来源:test_entity_registry.py
示例13: test_all_node_names_missing
def test_all_node_names_missing(self):
auth_file = "auth.file"
auth_file_path = os.path.join(settings.booth_config_dir, auth_file)
config_content = "authfile={}".format(auth_file_path)
auth_file_content = b"auth"
(self.config
.fs.open(
self.config_path,
mock.mock_open(read_data=config_content)(),
name="open.conf"
)
.fs.open(
auth_file_path,
mock.mock_open(read_data=auth_file_content)(),
mode="rb",
name="open.authfile",
)
.corosync_conf.load(filename="corosync-no-node-names.conf")
)
self.env_assist.assert_raise_library_error(
lambda: commands.config_sync(self.env_assist.get_env(), self.name),
[
fixture.error(
report_codes.COROSYNC_CONFIG_NO_NODES_DEFINED,
),
]
)
self.env_assist.assert_reports([
fixture.warn(
report_codes.COROSYNC_CONFIG_MISSING_NAMES_OF_NODES,
fatal=False,
),
])
开发者ID:tomjelinek,项目名称:pcs,代码行数:34,代码来源:test_booth.py
示例14: test_parse_observing_summary_dbase_file_mock
def test_parse_observing_summary_dbase_file_mock():
"""
Ensure that all required data are extracted from the RHESSI
observing summary database file mocked in `hessi_data()`
"""
# We need to mock this test differently for <= 3.7.0 and below.
if LooseVersion(platform.python_version()) <= LooseVersion("3.7.0"):
mock_file = mock.mock_open()
mock_file.return_value.__iter__.return_value = hessi_data().splitlines()
else:
mock_file = mock.mock_open(read_data=hessi_data())
dbase_data = {}
with mock.patch('sunpy.instr.rhessi.open', mock_file, create=True):
dbase_data = rhessi.parse_observing_summary_dbase_file(None)
assert len(dbase_data.keys()) == 7
# verify each of the 7 fields
assert dbase_data['filename'] == ['hsi_obssumm_19721101_139.fit',
'hsi_obssumm_19721102_144.fit']
assert dbase_data['orb_st'] == [7, 9]
assert dbase_data['orb_end'] == [8, 10]
assert dbase_data['start_time'] == [parse_time((1972, 11, 1, 0, 0)),
parse_time((1972, 11, 2, 0, 0))]
assert dbase_data['end_time'] == [parse_time((1972, 11, 2, 0, 0)),
parse_time((1972, 11, 3, 0, 0))]
assert dbase_data['status_flag'] == [3, 4]
assert dbase_data['npackets'] == [2, 1]
开发者ID:Cadair,项目名称:sunpy,代码行数:29,代码来源:test_rhessi.py
示例15: setUp
def setUp(self):
xrhelper = MagicMock()
xrhelper.get_current_setup = MagicMock(return_value=xr_settings)
xrhelper.run_xrandr = MagicMock()
self.xrhelper = xrhelper
self.mo = mock_open()
#init xrprofiler without init data
with patch('xrprofiler.open', mock_open(), create=True):
self.xrprofiler = xrprofiler.XrProfiler('test.yaml', xrhelper)
开发者ID:gpedic,项目名称:xrandr_profiler,代码行数:9,代码来源:test.py
示例16: setUp
def setUp(self):
mock.mock_open(read_data='{}')
settings = {
'timer': mock.MagicMock(),
'ldap_config_file': 'testdata/test-ldap-config.json',
'statsd_factory': mock.MagicMock(),
'statsd_host_factory': mock.MagicMock(),
}
self.ldap = controller.LDAPController(settings)
开发者ID:feideconnect,项目名称:core-apis,代码行数:9,代码来源:test_ldap_controller.py
示例17: test_fail_communication
def test_fail_communication(self):
error = "an error"
(self.config
.fs.open(
settings.pcsd_cert_location,
mock.mock_open(read_data=self.pcsd_ssl_cert)(),
name="fs.open.pcsd_ssl_cert"
)
.fs.open(
settings.pcsd_key_location,
mock.mock_open(read_data=self.pcsd_ssl_key)(),
name="fs.open.pcsd_ssl_key"
)
.http.host.send_pcsd_cert(
cert=self.pcsd_ssl_cert,
key=self.pcsd_ssl_key,
communication_list=[
{
"label": self.node_names[0],
"response_code": 400,
"output": error,
}
] + [
dict(label=node) for node in self.node_names[1:]
]
)
)
self.env_assist.assert_raise_library_error(
lambda: pcsd.synchronize_ssl_certificate(self.env_assist.get_env()),
[]
)
self.env_assist.assert_reports(
[
fixture.info(
report_codes.PCSD_SSL_CERT_AND_KEY_DISTRIBUTION_STARTED,
node_name_list=self.node_names
)
]
+
[
fixture.info(
report_codes.PCSD_SSL_CERT_AND_KEY_SET_SUCCESS,
node=node,
) for node in self.node_names[1:]
]
+
[
fixture.error(
report_codes.NODE_COMMUNICATION_COMMAND_UNSUCCESSFUL,
node=self.node_names[0],
command="remote/set_certs",
reason=error
)
]
)
开发者ID:tomjelinek,项目名称:pcs,代码行数:56,代码来源:test_pcsd.py
示例18: test_proc
def test_proc(self):
'''Test reading proc stats with mock data.'''
if mock is None:
return
mock_stat = mock.mock_open(read_data='22411 (cat) R 22301 22411 22301 34818 22411 4194304 82 0 0 0 0 0 0 0 20 0 1 0 709170 8155136 221 18446744073709551615 94052544688128 94052544719312 140729623469552 0 0 0 0 0 0 0 0 0 17 6 0 0 0 0 0 94052546816624 94052546818240 94052566347776 140729623473446 140729623473466 140729623473466 140729623478255 0')
mock_status = mock.mock_open(read_data='Name: cat\n\nVmData: 2 kB\nMultiple colons: 1:1')
with mock.patch('builtins.open', new_callable=mock.mock_open) as mock_file:
mock_file.side_effect = [mock_stat.return_value, mock_status.return_value]
procinfo = process._ProcessMemoryInfoProc()
self.assertTrue(procinfo.available)
self.assertEqual(procinfo.vsz, 8155136)
if sys.version_info >= (3, 4): # Python 3.3 doesn't support mock_open.readlines
self.assertEqual(procinfo.data_segment, 2048)
开发者ID:pympler,项目名称:pympler,代码行数:13,代码来源:test_process.py
示例19: test_12_add__guess
def test_12_add__guess(self):
mock_open(read_data='1\n')
with patch('sys.stdin', mock_open):
self.action_add(namespace=self.local_namespace,
repository='git-repo',
alone=True,
tracking='github',
auto_slug=True,
remotes={
'origin': 'https://github.com/foo/bar',
'wootwoot': '[email protected]:w00t/w00t',
'duckling': 'ssh://github.com/duck/duck',
}
)
开发者ID:guyzmo,项目名称:git-repo,代码行数:14,代码来源:test_github.py
示例20: test_complete_subjob_parses_payload_and_stores_value_in_atom_objects
def test_complete_subjob_parses_payload_and_stores_value_in_atom_objects(self):
fake_atom_exit_code = 777
mock_open(mock=self.mock_open, read_data=str(fake_atom_exit_code))
build = self._create_test_build(BuildStatus.BUILDING, num_subjobs=1, num_atoms_per_subjob=1)
subjob = build.all_subjobs()[0]
build.complete_subjob(subjob.subjob_id(), payload=self._FAKE_PAYLOAD)
expected_payload_sys_path = join(Configuration['results_directory'], '1', 'artifact_0_0')
self.mock_open.assert_called_once_with(
join(expected_payload_sys_path, BuildArtifact.EXIT_CODE_FILE),
'r',
)
self.assertEqual(subjob.atoms[0].exit_code, fake_atom_exit_code)
开发者ID:fengshao0907,项目名称:ClusterRunner,代码行数:14,代码来源:test_build.py
注:本文中的unittest.mock.mock_open函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论