本文整理汇总了Python中taskflow.utils.reflection.get_callable_name函数的典型用法代码示例。如果您正苦于以下问题:Python get_callable_name函数的具体用法?Python get_callable_name怎么用?Python get_callable_name使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_callable_name函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_inner_class
def test_inner_class(self):
obj = self.InnerCallableClass()
name = reflection.get_callable_name(obj)
expected_name = '.'.join((__name__,
'GetCallableNameTestExtended',
'InnerCallableClass'))
self.assertEqual(expected_name, name)
开发者ID:codybum,项目名称:OpenStackInAction,代码行数:7,代码来源:test_utils.py
示例2: _trigger
def _trigger(self, event, *args, **kwargs):
"""Execute all handlers for the given event type."""
for (handler, event_data) in self._events_listeners.get(event, []):
try:
handler(self, event_data, *args, **kwargs)
except Exception:
LOG.exception("Failed calling `%s` on event '%s'",
reflection.get_callable_name(handler), event)
开发者ID:toabctl,项目名称:taskflow,代码行数:8,代码来源:task.py
示例3: __init__
def __init__(self, name=None, provides=None, requires=None,
auto_extract=True, rebind=None):
"""Initialize task instance"""
if name is None:
name = reflection.get_callable_name(self)
super(Task, self).__init__(name,
provides=provides)
self.requires = _build_arg_mapping(self.name, requires, rebind,
self.execute, auto_extract)
开发者ID:pombredanne,项目名称:taskflow,代码行数:9,代码来源:task.py
示例4: _with_connection
def _with_connection(self, functor, *args, **kwargs):
# NOTE(harlowja): Activate the given function with a backend
# connection, if a backend is provided in the first place, otherwise
# don't call the function.
if self._backend is None:
LOG.debug("No backend provided, not calling functor '%s'",
reflection.get_callable_name(functor))
return
with contextlib.closing(self._backend.get_connection()) as conn:
functor(conn, *args, **kwargs)
开发者ID:zhujzhuo,项目名称:trove-1.0.10.4,代码行数:10,代码来源:storage.py
示例5: test_static_method
def test_static_method(self):
name = reflection.get_callable_name(Class.static_method)
if six.PY3:
self.assertEqual(name,
'.'.join((__name__, 'Class', 'static_method')))
else:
# NOTE(imelnikov): static method are just functions, class name
# is not recorded anywhere in them.
self.assertEqual(name,
'.'.join((__name__, 'static_method')))
开发者ID:codybum,项目名称:OpenStackInAction,代码行数:10,代码来源:test_utils.py
示例6: test_update_progress_handler_failure
def test_update_progress_handler_failure(self, mocked_exception):
def progress_callback(*args, **kwargs):
raise Exception("Woot!")
task = ProgressTask()
with task.autobind("update_progress", progress_callback):
task.execute([0.5])
mocked_exception.assert_called_once_with(
mock.ANY, reflection.get_callable_name(progress_callback), "update_progress"
)
开发者ID:zhujzhuo,项目名称:trove-1.0.10.4,代码行数:10,代码来源:test_task.py
示例7: _trigger
def _trigger(self, event, *args, **kwargs):
"""Execute all handlers for the given event type."""
if event in self._events_listeners:
for handler in self._events_listeners[event]:
event_data = self._events_listeners[event][handler]
try:
handler(self, event_data, *args, **kwargs)
except Exception:
LOG.exception("Failed calling `%s` on event '%s'",
reflection.get_callable_name(handler), event)
开发者ID:SEJeff,项目名称:taskflow,代码行数:10,代码来源:task.py
示例8: _make_logger
def _make_logger(self, level=logging.DEBUG):
log = logging.getLogger(
reflection.get_callable_name(self._get_test_method()))
log.propagate = False
for handler in reversed(log.handlers):
log.removeHandler(handler)
handler = test.CapturingLoggingHandler(level=level)
log.addHandler(handler)
log.setLevel(level)
self.addCleanup(handler.reset)
self.addCleanup(log.removeHandler, handler)
return (log, handler)
开发者ID:TheSriram,项目名称:taskflow,代码行数:12,代码来源:test_listeners.py
示例9: test_inner_callable_function
def test_inner_callable_function(self):
def a():
def b():
pass
return b
name = reflection.get_callable_name(a())
expected_name = '.'.join((__name__, 'GetCallableNameTestExtended',
'test_inner_callable_function', '<locals>',
'a', '<locals>', 'b'))
self.assertEqual(expected_name, name)
开发者ID:codybum,项目名称:OpenStackInAction,代码行数:13,代码来源:test_utils.py
示例10: _fetch_validate_factory
def _fetch_validate_factory(flow_factory):
if isinstance(flow_factory, six.string_types):
factory_fun = _fetch_factory(flow_factory)
factory_name = flow_factory
else:
factory_fun = flow_factory
factory_name = reflection.get_callable_name(flow_factory)
try:
reimported = _fetch_factory(factory_name)
assert reimported == factory_fun
except (ImportError, AssertionError):
raise ValueError("Flow factory %r is not reimportable by name %s" % (factory_fun, factory_name))
return (factory_name, factory_fun)
开发者ID:yanheven,项目名称:OpenStackInAction,代码行数:13,代码来源:helpers.py
示例11: __init__
def __init__(self, execute, name=None, provides=None,
requires=None, auto_extract=True, rebind=None, revert=None,
version=None):
assert six.callable(execute), ("Function to use for executing must be"
" callable")
if revert:
assert six.callable(revert), ("Function to use for reverting must"
" be callable")
if name is None:
name = reflection.get_callable_name(execute)
super(FunctorTask, self).__init__(name, provides=provides)
self._execute = execute
self._revert = revert
if version is not None:
self.version = version
self._build_arg_mapping(execute, requires, rebind, auto_extract)
开发者ID:toabctl,项目名称:taskflow,代码行数:16,代码来源:task.py
示例12: __init__
def __init__(self, volume_id):
# Note here that the volume name is composed of the name of the class
# along with the volume id that is being created, since a name of a
# task uniquely identifies that task in storage it is important that
# the name be relevant and identifiable if the task is recreated for
# subsequent resumption (if applicable).
#
# UUIDs are *not* used as they can not be tied back to a previous tasks
# state on resumption (since they are unique and will vary for each
# task that is created). A name based off the volume id that is to be
# created is more easily tied back to the original task so that the
# volume create can be resumed/revert, and is much easier to use for
# audit and tracking purposes.
base_name = reflection.get_callable_name(self)
super(VolumeCreator, self).__init__(name="%s-%s" % (base_name,
volume_id))
self._volume_id = volume_id
开发者ID:celttechie,项目名称:taskflow,代码行数:17,代码来源:create_parallel_volume.py
示例13: load_from_factory
def load_from_factory(
flow_factory, factory_args=None, factory_kwargs=None, store=None, book=None, engine_conf=None, backend=None
):
"""Load flow from factory function into engine
Gets flow factory function (or name of it) and creates flow with
it. Then, flow is loaded into engine with load(), and factory
function fully qualified name is saved to flow metadata so that
it can be later resumed with resume.
:param flow_factory: function or string: function that creates the flow
:param factory_args: list or tuple of factory positional arguments
:param factory_kwargs: dict of factory keyword arguments
:param store: dict -- data to put to storage to satisfy flow requirements
:param book: LogBook to create flow detail in
:param engine_conf: engine type and configuration configuration
:param backend: storage backend to use or configuration
:returns: engine
"""
if isinstance(flow_factory, six.string_types):
factory_fun = importutils.import_class(flow_factory)
factory_name = flow_factory
else:
factory_fun = flow_factory
factory_name = reflection.get_callable_name(flow_factory)
try:
reimported = importutils.import_class(factory_name)
assert reimported == factory_fun
except (ImportError, AssertionError):
raise ValueError("Flow factory %r is not reimportable by name %s" % (factory_fun, factory_name))
args = factory_args or []
kwargs = factory_kwargs or {}
flow = factory_fun(*args, **kwargs)
factory_data = dict(name=factory_name, args=args, kwargs=kwargs)
if isinstance(backend, dict):
backend = p_backends.fetch(backend)
flow_detail = p_utils.create_flow_detail(flow, book=book, backend=backend, meta={"factory": factory_data})
return load(flow=flow, flow_detail=flow_detail, store=store, book=book, engine_conf=engine_conf, backend=backend)
开发者ID:ntt-sic,项目名称:taskflow,代码行数:41,代码来源:helpers.py
示例14: test_callable_class_call
def test_callable_class_call(self):
name = reflection.get_callable_name(CallableClass().__call__)
self.assertEqual(name, '.'.join((__name__, 'CallableClass',
'__call__')))
开发者ID:codybum,项目名称:OpenStackInAction,代码行数:4,代码来源:test_utils.py
示例15: test_constructor
def test_constructor(self):
name = reflection.get_callable_name(Class)
self.assertEqual(name, '.'.join((__name__, 'Class')))
开发者ID:codybum,项目名称:OpenStackInAction,代码行数:3,代码来源:test_utils.py
示例16: test_class_method
def test_class_method(self):
name = reflection.get_callable_name(Class.class_method)
self.assertEqual(name, '.'.join((__name__, 'Class', 'class_method')))
开发者ID:codybum,项目名称:OpenStackInAction,代码行数:3,代码来源:test_utils.py
示例17: test_instance_method
def test_instance_method(self):
name = reflection.get_callable_name(Class().method)
self.assertEqual(name, '.'.join((__name__, 'Class', 'method')))
开发者ID:codybum,项目名称:OpenStackInAction,代码行数:3,代码来源:test_utils.py
示例18: test_static_method
def test_static_method(self):
# NOTE(imelnikov): static method are just functions, class name
# is not recorded anywhere in them.
name = reflection.get_callable_name(Class.static_method)
self.assertEqual(name, ".".join((__name__, "static_method")))
开发者ID:zhujzhuo,项目名称:trove-1.0.10.4,代码行数:5,代码来源:test_utils.py
示例19: test_callable_class
def test_callable_class(self):
name = reflection.get_callable_name(CallableClass())
self.assertEquals(name, '.'.join((__name__, 'CallableClass')))
开发者ID:jessicalucci,项目名称:TaskManagement,代码行数:3,代码来源:test_utils.py
示例20: __init__
def __init__(self, timeout, functors):
self._timeout = timeout
self._functors = []
for f in functors:
self._functors.append((f, reflection.get_callable_name(f)))
开发者ID:devananda,项目名称:taskflow,代码行数:5,代码来源:executor.py
注:本文中的taskflow.utils.reflection.get_callable_name函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论