本文整理汇总了Python中mock.mock.patch.object函数的典型用法代码示例。如果您正苦于以下问题:Python object函数的具体用法?Python object怎么用?Python object使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了object函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: executeScript
def executeScript(self, path, classname=None, command=None, config_file=None,
# common mocks for all the scripts
config_overrides = None,
shell_mock_value = (0, "OK."),
os_type=('Suse','11','Final'),
kinit_path_local="/usr/bin/kinit"
):
norm_path = os.path.normpath(path)
src_dir = RMFTestCase._getSrcFolder()
stack_version = norm_path.split(os.sep)[0]
stacks_path = os.path.join(src_dir, PATH_TO_STACKS)
configs_path = os.path.join(src_dir, PATH_TO_STACK_TESTS, stack_version, "configs")
script_path = os.path.join(stacks_path, norm_path)
config_file_path = os.path.join(configs_path, config_file)
try:
with open(config_file_path, "r") as f:
self.config_dict = json.load(f)
except IOError:
raise RuntimeError("Can not read config file: "+ config_file_path)
if config_overrides:
for key, value in config_overrides.iteritems():
self.config_dict[key] = value
self.config_dict = ConfigDictionary(self.config_dict)
# append basedir to PYTHONPATH
scriptsdir = os.path.dirname(script_path)
basedir = os.path.dirname(scriptsdir)
sys.path.append(scriptsdir)
# get method to execute
try:
with patch.object(platform, 'linux_distribution', return_value=os_type):
script_module = imp.load_source(classname, script_path)
except IOError:
raise RuntimeError("Cannot load class %s from %s",classname, norm_path)
script_class_inst = RMFTestCase._get_attr(script_module, classname)()
method = RMFTestCase._get_attr(script_class_inst, command)
# Reload params import, otherwise it won't change properties during next import
if 'params' in sys.modules:
del(sys.modules["params"])
# run
with Environment(basedir, test_mode=True) as RMFTestCase.env:
with patch('resource_management.core.shell.checked_call', return_value=shell_mock_value): # we must always mock any shell calls
with patch.object(Script, 'get_config', return_value=self.config_dict): # mocking configurations
with patch.object(Script, 'install_packages'):
with patch('resource_management.libraries.functions.get_kinit_path', return_value=kinit_path_local):
with patch.object(platform, 'linux_distribution', return_value=os_type):
method(RMFTestCase.env)
sys.path.remove(scriptsdir)
开发者ID:wbear2,项目名称:ambari,代码行数:55,代码来源:RMFTestCase.py
示例2: test_create_repo_ubuntu_gpg_key_wrong_output
def test_create_repo_ubuntu_gpg_key_wrong_output(self, file_mock, execute_mock,
tempfile_mock, checked_call_mock):
"""
Checks that GPG key is extracted from output without \r sign
"""
tempfile_mock.return_value = MagicMock(spec=file)
tempfile_mock.return_value.__enter__.return_value.name = "/tmp/1.txt"
checked_call_mock.return_value = 0, "The following signatures couldn't be verified because the public key is not available: NO_PUBKEY 123ABCD\r\n"
with Environment('/') as env:
with patch.object(repository, "__file__", new='/ambari/test/repo/dummy/path/file'):
Repository('HDP',
base_url='http://download.base_url.org/rpm/',
repo_file_name='HDP',
repo_template = DEBIAN_DEFAUTL_TEMPLATE,
components = ['a','b','c']
)
call_content = file_mock.call_args_list[0]
template_name = call_content[0][0]
template_content = call_content[1]['content']
self.assertEquals(template_name, '/tmp/1.txt')
self.assertEquals(template_content, 'deb http://download.base_url.org/rpm/ a b c')
copy_item0 = str(file_mock.call_args_list[1])
copy_item1 = str(file_mock.call_args_list[2])
self.assertEqual(copy_item0, "call('/tmp/1.txt', content=StaticFile('/etc/apt/sources.list.d/HDP.list'))")
self.assertEqual(copy_item1, "call('/etc/apt/sources.list.d/HDP.list', content=StaticFile('/tmp/1.txt'))")
execute_command_item = execute_mock.call_args_list[0][0][0]
self.assertEqual(checked_call_mock.call_args_list[0][0][0], ['apt-get', 'update', '-qq', '-o', 'Dir::Etc::sourcelist=sources.list.d/HDP.list', '-o', 'Dir::Etc::sourceparts=-', '-o', 'APT::Get::List-Cleanup=0'])
self.assertEqual(execute_command_item, ('apt-key', 'adv', '--recv-keys', '--keyserver', 'keyserver.ubuntu.com', '123ABCD'))
开发者ID:maduhu,项目名称:HDP2.5-ambari,代码行数:33,代码来源:TestRepositoryResource.py
示例3: test_execute_retryable_command_with_time_lapse
def test_execute_retryable_command_with_time_lapse(self, CustomServiceOrchestrator_mock,
read_stack_version_mock, sleep_mock, time_mock
):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
python_execution_result_dict = {
'exitcode': 1,
'stdout': 'out',
'stderr': 'stderr',
'structuredOut': '',
'status': 'FAILED'
}
time_mock.side_effect = [4, 8, 10, 14, 18, 22]
def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
return python_execution_result_dict
command = copy.deepcopy(self.retryable_command)
with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
runCommand_mock.side_effect = side_effect
actionQueue.execute_command(command)
#assert that python executor start
self.assertTrue(runCommand_mock.called)
self.assertEqual(2, runCommand_mock.call_count)
self.assertEqual(1, sleep_mock.call_count)
sleep_mock.assert_has_calls([call(2)], False)
runCommand_mock.assert_has_calls([
call(command, '/tmp/ambari-agent/output-19.txt', '/tmp/ambari-agent/errors-19.txt', override_output_files=True, retry=False),
call(command, '/tmp/ambari-agent/output-19.txt', '/tmp/ambari-agent/errors-19.txt', override_output_files=False, retry=True)])
开发者ID:OpenPOWER-BigData,项目名称:HDP-ambari,代码行数:31,代码来源:TestActionQueue.py
示例4: test_create_repo_ubuntu_repo_exists
def test_create_repo_ubuntu_repo_exists(self, file_mock, execute_mock,
tempfile_mock, checked_call_mock):
tempfile_mock.return_value = MagicMock(spec=file)
tempfile_mock.return_value.__enter__.return_value.name = "/tmp/1.txt"
checked_call_mock.return_value = 0, "The following signatures couldn't be verified because the public key is not available: NO_PUBKEY 123ABCD"
with Environment('/') as env:
with patch.object(repository,"Template", new=DummyTemplate.create(DEBIAN_DEFAUTL_TEMPLATE)):
Repository('HDP',
base_url='http://download.base_url.org/rpm/',
repo_file_name='HDP',
repo_template = "dummy.j2",
components = ['a','b','c']
)
call_content = file_mock.call_args_list[0]
template_name = call_content[0][0]
template_content = call_content[1]['content']
self.assertEquals(template_name, '/tmp/1.txt')
self.assertEquals(template_content, 'deb http://download.base_url.org/rpm/ a b c\n')
copy_item = str(file_mock.call_args_list[1])
self.assertEqual(copy_item, "call('/etc/apt/sources.list.d/HDP.list', content=StaticFile('/tmp/1.txt'))")
#'apt-get update -qq -o Dir::Etc::sourcelist="sources.list.d/HDP.list" -o APT::Get::List-Cleanup="0"')
execute_command_item = execute_mock.call_args_list[0][0][0]
self.assertEqual(checked_call_mock.call_args_list[0][0][0], ['apt-get', 'update', '-qq', '-o', 'Dir::Etc::sourcelist=sources.list.d/HDP.list', '-o', 'APT::Get::List-Cleanup=0'])
self.assertEqual(execute_command_item, 'apt-key adv --recv-keys --keyserver keyserver.ubuntu.com 123ABCD')
开发者ID:fanzhidongyzby,项目名称:ambari,代码行数:29,代码来源:TestRepositoryResource.py
示例5: executeScript
def executeScript(self, path, classname=None, command=None, config_file=None,
config_dict=None,
# common mocks for all the scripts
config_overrides = None,
hdp_stack_version = None,
checked_call_mocks = itertools.cycle([(0, "OK.")]),
call_mocks = itertools.cycle([(0, "OK.")]),
os_type=('Suse','11','Final'),
kinit_path_local="/usr/bin/kinit",
os_env={'PATH':'/bin'},
target=TARGET_STACKS,
mocks_dict={},
try_install=False,
command_args=[]):
norm_path = os.path.normpath(path)
src_dir = RMFTestCase.get_src_folder()
if target == self.TARGET_STACKS:
stack_version = norm_path.split(os.sep)[0]
base_path = os.path.join(src_dir, PATH_TO_STACKS)
configs_path = os.path.join(src_dir, PATH_TO_STACK_TESTS, stack_version, "configs")
elif target == self.TARGET_CUSTOM_ACTIONS:
base_path = os.path.join(src_dir, PATH_TO_CUSTOM_ACTIONS)
configs_path = os.path.join(src_dir, PATH_TO_CUSTOM_ACTION_TESTS, "configs")
elif target == self.TARGET_COMMON_SERVICES:
base_path = os.path.join(src_dir, PATH_TO_COMMON_SERVICES)
configs_path = os.path.join(src_dir, PATH_TO_STACK_TESTS, hdp_stack_version, "configs")
else:
raise RuntimeError("Wrong target value %s", target)
script_path = os.path.join(base_path, norm_path)
if config_file is not None and config_dict is None:
config_file_path = os.path.join(configs_path, config_file)
try:
with open(config_file_path, "r") as f:
self.config_dict = json.load(f)
except IOError:
raise RuntimeError("Can not read config file: "+ config_file_path)
elif config_dict is not None and config_file is None:
self.config_dict = config_dict
else:
raise RuntimeError("Please specify either config_file_path or config_dict parameter")
if config_overrides:
for key, value in config_overrides.iteritems():
self.config_dict[key] = value
self.config_dict = ConfigDictionary(self.config_dict)
# append basedir to PYTHONPATH
scriptsdir = os.path.dirname(script_path)
basedir = os.path.dirname(scriptsdir)
sys.path.append(scriptsdir)
# get method to execute
try:
with patch.object(platform, 'linux_distribution', return_value=os_type):
script_module = imp.load_source(classname, script_path)
script_class_inst = RMFTestCase._get_attr(script_module, classname)()
method = RMFTestCase._get_attr(script_class_inst, command)
except IOError, err:
raise RuntimeError("Cannot load class %s from %s: %s" % (classname, norm_path, err.message))
开发者ID:biggeng,项目名称:ambari,代码行数:60,代码来源:RMFTestCase.py
示例6: test_execute_retryable_command_fail_and_succeed
def test_execute_retryable_command_fail_and_succeed(self, CustomServiceOrchestrator_mock,
read_stack_version_mock, sleep_mock
):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
execution_result_fail_dict = {
'exitcode': 1,
'stdout': 'out',
'stderr': 'stderr',
'structuredOut': '',
'status': 'FAILED'
}
execution_result_succ_dict = {
'exitcode': 0,
'stdout': 'out',
'stderr': 'stderr',
'structuredOut': '',
'status': 'COMPLETED'
}
command = copy.deepcopy(self.retryable_command)
with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
runCommand_mock.side_effect = [execution_result_fail_dict, execution_result_succ_dict]
actionQueue.execute_command(command)
#assert that python executor start
self.assertTrue(runCommand_mock.called)
self.assertEqual(2, runCommand_mock.call_count)
self.assertEqual(1, sleep_mock.call_count)
sleep_mock.assert_any_call(2)
开发者ID:OpenPOWER-BigData,项目名称:HDP-ambari,代码行数:31,代码来源:TestActionQueue.py
示例7: test_create_repo_ubuntu_repo_exists
def test_create_repo_ubuntu_repo_exists(self, file_mock, execute_mock, tempfile_mock, checked_call_mock):
tempfile_mock.return_value = MagicMock(spec=file)
tempfile_mock.return_value.__enter__.return_value.name = "/tmp/1.txt"
checked_call_mock.return_value = (
0,
"The following signatures couldn't be verified because the public key is not available: NO_PUBKEY 123ABCD",
)
with Environment("/") as env:
with patch.object(repository, "Template", new=DummyTemplate.create(DEBIAN_DEFAUTL_TEMPLATE)):
Repository(
"HDP",
base_url="http://download.base_url.org/rpm/",
repo_file_name="HDP",
repo_template="dummy.j2",
components=["a", "b", "c"],
)
template_item = file_mock.call_args_list[0]
template_name = template_item[0][0]
template_content = template_item[1]["content"].get_content()
self.assertEquals(template_name, "/tmp/1.txt")
self.assertEquals(template_content, "deb http://download.base_url.org/rpm/ a b c\n")
copy_item = str(file_mock.call_args_list[1])
self.assertEqual(copy_item, "call('/etc/apt/sources.list.d/HDP.list', content=StaticFile('/tmp/1.txt'))")
#'apt-get update -qq -o Dir::Etc::sourcelist="sources.list.d/HDP.list" -o APT::Get::List-Cleanup="0"')
execute_command_item = execute_mock.call_args_list[0][0][0]
self.assertEqual(
checked_call_mock.call_args_list[0][0][0],
'apt-get update -qq -o Dir::Etc::sourcelist="sources.list.d/HDP.list" -o APT::Get::List-Cleanup="0"',
)
self.assertEqual(execute_command_item, "apt-key adv --recv-keys --keyserver keyserver.ubuntu.com 123ABCD")
开发者ID:duxia,项目名称:ambari,代码行数:35,代码来源:TestRepositoryResource.py
示例8: test_create_repo_suse
def test_create_repo_suse(self, file_mock):
with Environment("/") as env:
with patch.object(repository, "Template", new=DummyTemplate.create(RHEL_SUSE_DEFAULT_TEMPLATE)):
Repository(
"hadoop",
base_url="http://download.base_url.org/rpm/",
mirror_list="https://mirrors.base_url.org/?repo=Repository&arch=$basearch",
repo_template="dummy.j2",
repo_file_name="Repository",
)
self.assertTrue("hadoop" in env.resources["Repository"])
defined_arguments = env.resources["Repository"]["hadoop"].arguments
expected_arguments = {
"repo_template": "dummy.j2",
"mirror_list": "https://mirrors.base_url.org/?repo=Repository&arch=$basearch",
"base_url": "http://download.base_url.org/rpm/",
"repo_file_name": "Repository",
}
expected_template_arguments = {
"mirror_list": "https://mirrors.base_url.org/?repo=Repository&arch=$basearch",
"base_url": "http://download.base_url.org/rpm/",
"repo_file_name": "Repository",
}
self.assertEqual(defined_arguments, expected_arguments)
self.assertEqual(file_mock.call_args[0][0], "/etc/zypp/repos.d/Repository.repo")
template_item = file_mock.call_args[1]["content"]
template = str(template_item.name)
expected_template_arguments.update({"repo_id": "hadoop"})
self.assertEqual(expected_template_arguments, template_item.context._dict)
self.assertEqual("dummy.j2", template)
开发者ID:duxia,项目名称:ambari,代码行数:34,代码来源:TestRepositoryResource.py
示例9: test_create_repo_debian_repo_exists
def test_create_repo_debian_repo_exists(self, file_mock, execute_mock, tempfile_mock):
tempfile_mock.return_value = MagicMock(spec=file)
tempfile_mock.return_value.__enter__.return_value.name = "/tmp/1.txt"
with Environment('/') as env:
with patch.object(repository,"Template", new=DummyTemplate.create(DEBIAN_DEFAUTL_TEMPLATE)):
Repository('HDP',
base_url='http://download.base_url.org/rpm/',
repo_file_name='HDP',
repo_template = "dummy.j2",
components = ['a','b','c']
)
template_item = file_mock.call_args_list[0]
template_name = template_item[0][0]
template_content = template_item[1]['content'].get_content()
self.assertEquals(template_name, '/tmp/1.txt')
self.assertEquals(template_content, 'deb http://download.base_url.org/rpm/ a b c\n')
copy_item = str(file_mock.call_args_list[1])
self.assertEqual(copy_item, "call('/etc/apt/sources.list.d/HDP.list', content=StaticFile('/tmp/1.txt'))")
execute_command_item = execute_mock.call_args_list[0][0][0]
self.assertEqual(execute_command_item, 'apt-get update -o Dir::Etc::sourcelist="sources.list.d/HDP.list" -o APT::Get::List-Cleanup="0"')
开发者ID:wbear2,项目名称:ambari,代码行数:25,代码来源:TestRepositoryResource.py
示例10: test_create_repo_redhat
def test_create_repo_redhat(self, file_mock,
is_redhat_family, is_ubuntu_family, is_suse_family):
is_redhat_family.return_value = True
is_ubuntu_family.return_value = False
is_suse_family.return_value = False
with Environment('/') as env:
with patch.object(repository, "__file__", new='/ambari/test/repo/dummy/path/file'):
Repository('hadoop',
base_url='http://download.base_url.org/rpm/',
mirror_list='https://mirrors.base_url.org/?repo=Repository&arch=$basearch',
repo_file_name='Repository',
repo_template=RHEL_SUSE_DEFAULT_TEMPLATE)
self.assertTrue('hadoop' in env.resources['Repository'])
defined_arguments = env.resources['Repository']['hadoop'].arguments
expected_arguments = {'repo_template': RHEL_SUSE_DEFAULT_TEMPLATE,
'base_url': 'http://download.base_url.org/rpm/',
'mirror_list': 'https://mirrors.base_url.org/?repo=Repository&arch=$basearch',
'repo_file_name': 'Repository'}
expected_template_arguments = {'base_url': 'http://download.base_url.org/rpm/',
'mirror_list': 'https://mirrors.base_url.org/?repo=Repository&arch=$basearch',
'repo_file_name': 'Repository'}
self.assertEqual(defined_arguments, expected_arguments)
self.assertEqual(file_mock.call_args[0][0], '/etc/yum.repos.d/Repository.repo')
template_item = file_mock.call_args[1]['content']
template = str(template_item.name)
expected_template_arguments.update({'repo_id': 'hadoop'})
self.assertEqual(expected_template_arguments, template_item.context._dict)
self.assertEqual(RHEL_SUSE_DEFAULT_TEMPLATE, template)
开发者ID:maduhu,项目名称:HDP2.5-ambari,代码行数:32,代码来源:TestRepositoryResource.py
示例11: test_create_repo_suse
def test_create_repo_suse(self, file_mock):
with Environment('/') as env:
with patch.object(repository,"Template", new=DummyTemplate.create(RHEL_SUSE_DEFAULT_TEMPLATE)):
Repository('hadoop',
base_url='http://download.base_url.org/rpm/',
mirror_list='https://mirrors.base_url.org/?repo=Repository&arch=$basearch',
repo_template = "dummy.j2",
repo_file_name='Repository')
self.assertTrue('hadoop' in env.resources['Repository'])
defined_arguments = env.resources['Repository']['hadoop'].arguments
expected_arguments = {'repo_template': 'dummy.j2',
'mirror_list': 'https://mirrors.base_url.org/?repo=Repository&arch=$basearch',
'base_url': 'http://download.base_url.org/rpm/',
'repo_file_name': 'Repository'}
expected_template_arguments = {'mirror_list': 'https://mirrors.base_url.org/?repo=Repository&arch=$basearch',
'base_url': 'http://download.base_url.org/rpm/',
'repo_file_name': 'Repository'}
self.assertEqual(defined_arguments, expected_arguments)
self.assertEqual(file_mock.call_args[0][0], '/etc/zypp/repos.d/Repository.repo')
template_item = file_mock.call_args[1]['content']
template = str(template_item.name)
expected_template_arguments.update({'repo_id': 'hadoop'})
self.assertEqual(expected_template_arguments, template_item.context._dict)
self.assertEqual('dummy.j2', template)
开发者ID:wbear2,项目名称:ambari,代码行数:28,代码来源:TestRepositoryResource.py
示例12: assertResourceCalled
def assertResourceCalled(self, resource_type, name, **kwargs):
with patch.object(UnknownConfiguration, '__getattr__', return_value=lambda: "UnknownConfiguration()"):
self.assertNotEqual(len(RMFTestCase.env.resource_list), 0, "There were no more resources executed!")
resource = RMFTestCase.env.resource_list.pop(0)
self.assertEquals(resource_type, resource.__class__.__name__)
self.assertEquals(name, resource.name)
self.assertEquals(kwargs, resource.arguments)
开发者ID:biggeng,项目名称:ambari,代码行数:8,代码来源:RMFTestCase.py
示例13: setUp
def setUp(self):
# disable stdout
out = StringIO.StringIO()
sys.stdout = out
# Create config
self.config = AmbariConfig().getConfig()
# Instantiate CachedHTTPSConnection (skip connect() call)
with patch.object(security.VerifiedHTTPSConnection, "connect"):
self.cachedHTTPSConnection = security.CachedHTTPSConnection(self.config)
开发者ID:adamosloizou,项目名称:fiware-cosmos-ambari,代码行数:9,代码来源:TestSecurity.py
示例14: main
def main():
if disable_python_and_puppet:
with patch.object(PuppetExecutor.PuppetExecutor, 'run_manifest') \
as run_manifest_method:
run_manifest_method.side_effect = \
lambda command, file, tmpout, tmperr: {
'exitcode' : 0,
'stdout' : "Simulated run of pp %s" % file,
'stderr' : 'None'
}
with patch.object(PythonExecutor.PythonExecutor, 'run_file') \
as run_file_py_method:
run_file_py_method.side_effect = \
lambda command, file, tmpoutfile, tmperrfile: {
'exitcode' : 0,
'stdout' : "Simulated run of py %s" % file,
'stderr' : 'None'
}
run_simulation()
else:
run_simulation()
开发者ID:adamosloizou,项目名称:fiware-cosmos-ambari,代码行数:22,代码来源:ControllerTester.py
示例15: test_get_list
def test_get_list(self):
# TODO - these should be generated from rc file!
dicts = ['wkv_variable', 'gcmd_instrument', 'gcmd_science_keyword',
'gcmd_provider', 'gcmd_platform', 'gcmd_location',
'gcmd_horizontalresolutionrange',
'gcmd_verticalresolutionrange',
'gcmd_temporalresolutionrange', 'gcmd_project',
'gcmd_rucontenttype', 'cf_standard_name',
'iso19115_topic_category']
for name in dicts:
if name != 'iso19115_topic_category':
resource = resource_string(__name__, '../basedata/'+name)
with patch.object(pti.json_vocabulary, 'openURL',
return_value=resource):
function = getattr(pti, 'get_' + name + '_list')
self.assertIsInstance(function(), list)
else:
function = getattr(pti, 'get_' + name + '_list')
self.assertIsInstance(function(), list)
开发者ID:nansencenter,项目名称:py-thesaurus-interface,代码行数:19,代码来源:test_pythesint.py
示例16: assertResourceCalledIgnoreEarlier
def assertResourceCalledIgnoreEarlier(self, resource_type, name, **kwargs):
"""
Fast fowards past earlier resources called, popping them off the list until the specified
resource is hit. If it's not found, then an assertion is thrown that there are no more
resources.
"""
with patch.object(UnknownConfiguration, '__getattr__', return_value=lambda: "UnknownConfiguration()"):
while len(RMFTestCase.env.resource_list) >= 0:
# no more items means exit the loop
self.assertNotEqual(len(RMFTestCase.env.resource_list), 0, "The specified resource was not found in the call stack.")
# take the next resource and try it out
resource = RMFTestCase.env.resource_list.pop(0)
try:
self.assertEquals(resource_type, resource.__class__.__name__)
self.assertEquals(name, resource.name)
self.assertEquals(kwargs, resource.arguments)
break
except AssertionError:
pass
开发者ID:biggeng,项目名称:ambari,代码行数:20,代码来源:RMFTestCase.py
示例17: test_enqueue_jobs_dont_skip
def test_enqueue_jobs_dont_skip(self):
broker = Standard(None)
with patch.object(Standard, 'add_job') as mock_add_job:
beat = Beat(broker, 10, False)
beat.register_job(Adder)
beat.register_job(Divider)
expired_job = PeriodicJob.objects.get(pk=2)
future_job = PeriodicJob.objects.get(pk=4)
future_next_execution = future_job.next_execution
self.assertGreater(
future_next_execution,
datetime.now(pytz.timezone('UTC'))
)
beat.enqueue_next_jobs(beat.get_expired_jobs())
expired_job = PeriodicJob.objects.get(pk=2)
self.assertEqual(
expired_job.next_execution.isoformat(),
"2015-03-29T23:01:00+00:00"
)
japanese_job = PeriodicJob.objects.get(pk=3)
self.assertEqual(
japanese_job.next_execution.isoformat(),
"2015-07-26T16:10:00+00:00"
)
future_job = PeriodicJob.objects.get(pk=4)
self.assertEqual(
future_job.next_execution,
future_next_execution
)
self.assertGreater(
future_job.next_execution,
datetime.now(pytz.timezone('UTC'))
)
assert mock_add_job.called_with(num1=1, num2=2, sqjobs_programmed_date='2015-07-26T17:01:00+02:00')
开发者ID:lonamiaec,项目名称:sqjobs,代码行数:41,代码来源:tests.py
示例18: test_create_repo_debian_doesnt_repo_exist
def test_create_repo_debian_doesnt_repo_exist(self, file_mock, execute_mock, tempfile_mock):
tempfile_mock.return_value = MagicMock(spec=file)
tempfile_mock.return_value.__enter__.return_value.name = "/tmp/1.txt"
with Environment('/') as env:
with patch.object(repository,"Template", new=DummyTemplate.create(DEBIAN_DEFAUTL_TEMPLATE)):
Repository('HDP',
base_url='http://download.base_url.org/rpm/',
repo_file_name='HDP',
repo_template = "dummy.j2",
components = ['a','b','c']
)
template_item = file_mock.call_args_list[0]
template_name = template_item[0][0]
template_content = template_item[1]['content'].get_content()
self.assertEquals(template_name, '/tmp/1.txt')
self.assertEquals(template_content, 'deb http://download.base_url.org/rpm/ a b c\n')
self.assertEqual(file_mock.call_count, 1)
self.assertEqual(execute_mock.call_count, 0)
开发者ID:wbear2,项目名称:ambari,代码行数:22,代码来源:TestRepositoryResource.py
示例19: test_enqueue_jobs
def test_enqueue_jobs(self):
broker = Standard(None)
with patch.object(Standard, 'add_job') as mock_add_job:
beat = Beat(broker, 10, True)
beat.register_job(Adder)
beat.register_job(Divider)
expired_job = PeriodicJob.objects.get(pk=2)
expired_next_execution = expired_job.next_execution
expired_minute = expired_next_execution.minute
expired_second = expired_next_execution.second
future_job = PeriodicJob.objects.get(pk=4)
future_next_execution = future_job.next_execution
self.assertGreater(future_next_execution, datetime.now(pytz.timezone('UTC')))
beat.enqueue_next_jobs(beat.get_expired_jobs())
expired_job = PeriodicJob.objects.get(pk=2)
self.assertEqual(expired_minute, expired_job.next_execution.minute)
self.assertEqual(expired_second, expired_job.next_execution.second)
self.assertGreater(
expired_job.next_execution,
expired_next_execution
)
self.assertGreater(
expired_job.next_execution,
datetime.now(pytz.timezone('UTC'))
)
future_job = PeriodicJob.objects.get(pk=4)
self.assertEqual(future_job.next_execution, future_next_execution)
self.assertGreater(
future_job.next_execution,
datetime.now(pytz.timezone('UTC'))
)
assert mock_add_job.called_with(num1=1, num2=2, sqjobs_programmed_date='2015-07-26T17:01:00+02:00')
开发者ID:lonamiaec,项目名称:sqjobs,代码行数:39,代码来源:tests.py
示例20: test_create_repo_ubuntu_doesnt_repo_exist
def test_create_repo_ubuntu_doesnt_repo_exist(self, file_mock, execute_mock, tempfile_mock):
tempfile_mock.return_value = MagicMock(spec=file)
tempfile_mock.return_value.__enter__.return_value.name = "/tmp/1.txt"
with Environment('/') as env:
with patch.object(repository, "__file__", new='/ambari/test/repo/dummy/path/file'):
Repository('HDP',
base_url='http://download.base_url.org/rpm/',
repo_file_name='HDP',
repo_template = DEBIAN_DEFAUTL_TEMPLATE,
components = ['a','b','c']
)
call_content = file_mock.call_args_list[0]
template_name = call_content[0][0]
template_content = call_content[1]['content']
self.assertEquals(template_name, '/tmp/1.txt')
self.assertEquals(template_content, 'deb http://download.base_url.org/rpm/ a b c')
self.assertEqual(file_mock.call_count, 2)
self.assertEqual(execute_mock.call_count, 0)
开发者ID:maduhu,项目名称:HDP2.5-ambari,代码行数:22,代码来源:TestRepositoryResource.py
注:本文中的mock.mock.patch.object函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论