本文整理汇总了Python中twisted.plugin.getPlugins函数的典型用法代码示例。如果您正苦于以下问题:Python getPlugins函数的具体用法?Python getPlugins怎么用?Python getPlugins使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了getPlugins函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: write
def write(self):
self.file.write('#compdef mktap\n')
self.file.write('_mktap_subcmds=(\n')
from twisted import plugin as newplugin
from twisted.scripts.mktap import IServiceMaker
plugins = newplugin.getPlugins(IServiceMaker)
for p in plugins:
self.file.write('"%s:%s"\n' % (p.tapname, p.description))
self.file.write(")\n\n")
self.optionsClass.zsh_extras = ['*::subcmd:->subcmd']
gen = ArgumentsGenerator(self.cmd_name, self.optionsClass, self.file)
gen.write()
self.file.write("""if (( CURRENT == 1 )); then
_describe "tap to build" _mktap_subcmds && ret=0
fi
(( ret )) || return 0
service="$words[1]"
case $service in\n""")
plugins = newplugin.getPlugins(IServiceMaker)
for p in plugins:
self.file.write(p.tapname + ")\n")
gen = ArgumentsGenerator(p.tapname, p.options, self.file)
gen.write()
self.file.write(";;\n")
self.file.write("""*) _message "don't know how to complete $service";;\nesac""")
开发者ID:pwarren,项目名称:AGDeviceControl,代码行数:31,代码来源:zshcomp.py
示例2: test_detectFilesChanged
def test_detectFilesChanged(self):
"""
Check that if the content of a plugin change, L{plugin.getPlugins} is
able to detect the new plugins added.
"""
FilePath(__file__).sibling('plugin_extra1.py'
).copyTo(self.package.child('pluginextra.py'))
try:
plgs = list(plugin.getPlugins(ITestPlugin, self.module))
# Sanity check
self.assertEquals(len(plgs), 2)
FilePath(__file__).sibling('plugin_extra2.py'
).copyTo(self.package.child('pluginextra.py'))
# Fake out Python.
self._unimportPythonModule(sys.modules['mypackage.pluginextra'])
# Make sure additions are noticed
plgs = list(plugin.getPlugins(ITestPlugin, self.module))
self.assertEquals(len(plgs), 3)
names = ['TestPlugin', 'FourthTestPlugin', 'FifthTestPlugin']
for p in plgs:
names.remove(p.__name__)
p.test1()
finally:
self._unimportPythonModule(
sys.modules['mypackage.pluginextra'],
True)
开发者ID:Alberto-Beralix,项目名称:Beralix,代码行数:31,代码来源:test_plugin.py
示例3: write
def write(self):
"""
Write the completion function to the file given to __init__
@return: C{None}
"""
self.file.write('#compdef %s\n' % (self.cmd_name,))
self.file.write('local _zsh_subcmds_array\n_zsh_subcmds_array=(\n')
from twisted import plugin as newplugin
plugins = newplugin.getPlugins(self.interface)
for p in plugins:
self.file.write('"%s:%s"\n' % (p.tapname, p.description))
self.file.write(")\n\n")
self.options.__class__.zsh_extras = ['*::subcmd:->subcmd']
gen = ArgumentsGenerator(self.cmd_name, self.options, self.file)
gen.write()
self.file.write("""if (( CURRENT == 1 )); then
_describe "%s" _zsh_subcmds_array && ret=0
fi
(( ret )) || return 0
service="$words[1]"
case $service in\n""" % (self.subcmdLabel,))
plugins = newplugin.getPlugins(self.interface)
for p in plugins:
self.file.write(p.tapname + ")\n")
gen = ArgumentsGenerator(p.tapname, p.options(), self.file)
gen.write()
self.file.write(";;\n")
self.file.write("*) _message \"don't know how to" \
" complete $service\";;\nesac")
开发者ID:Alberto-Beralix,项目名称:Beralix,代码行数:35,代码来源:zshcomp.py
示例4: fromPlugins
def fromPlugins(cls, clock):
"""
Create a :obj:`MimicCore` from all :obj:`IAPIMock` and
:obj:`IAPIDomainMock` plugins.
"""
service_catalog_plugins = getPlugins(IAPIMock, plugins)
domain_plugins = getPlugins(IAPIDomainMock, plugins)
return cls(clock, service_catalog_plugins, domain_plugins)
开发者ID:ksheedlo,项目名称:mimic,代码行数:8,代码来源:core.py
示例5: test_detectFilesRemoved
def test_detectFilesRemoved(self):
"""
Check that when a dropin file is removed, L{plugin.getPlugins} doesn't
return it anymore.
"""
FilePath(__file__).sibling("plugin_extra1.py").copyTo(self.package.child("pluginextra.py"))
try:
# Generate a cache with pluginextra in it.
list(plugin.getPlugins(ITestPlugin, self.module))
finally:
self._unimportPythonModule(sys.modules["mypackage.pluginextra"], True)
plgs = list(plugin.getPlugins(ITestPlugin, self.module))
self.assertEqual(1, len(plgs))
开发者ID:ssilverek,项目名称:kodb,代码行数:14,代码来源:test_plugin.py
示例6: _init_faf_events
def _init_faf_events(self):
"""
Generate classes and initializes listeners for 'system events'.
Please use global registration as last resort.
"""
for klass_name in events_lst + [evname for elst in getPlugins(IEventList, events) for evname in elst]:
self.registerEventFromName(klass_name, Evt)
# Register Listeners
for event_handler in getPlugins(IEventListener, events):
if not isinstance(event_handler.__class__.EVENT, list):
self.addListener(event_handler.__class__.EVENT, event_handler)
else:
for event_name in event_handler.__class__.EVENT:
self.addListener(event_name, event_handler)
开发者ID:PMcGoldrick,项目名称:Jane,代码行数:14,代码来源:__init__.py
示例7: _init_resp_events
def _init_resp_events(self):
full_lst = responding_events + [
evname for elst in getPlugins(IRespondingEventList,
events) for evname in elst
]
for klass_name in full_lst:
self.registerEventFromName(klass_name, RespondingEvt)
for event_handler in getPlugins(IEventResponder):
if not isinstance(event_handler.__class__.RESPEVENT, list):
self.addResponder(event_handler.__class__.RESPEVENT, event_handler.respond)
else:
for event_name in event_handler.__class__.RESPEVENT:
self.addResponder(event_name, event_handler.respond)
开发者ID:PMcGoldrick,项目名称:Jane,代码行数:15,代码来源:__init__.py
示例8: handlePlugins
def handlePlugins(self, info):
"""Give control to plugins.
@param info: C{(proxy, equipment)} tuple
"""
proxy, equipment = info
proxy.version = 2 # Switch to version 2. Plugins should
# switch back to version 1 if needed.
# Filter out plugins that do not handle our equipment
plugins = [ plugin for plugin
in getPlugins(ICollector,
wiremaps.collector.equipment)
if plugin.handleEquipment(str(equipment.oid)) ]
if not plugins:
print "No plugin found for OID %s, using generic one" % str(equipment.oid)
plugins = [generic]
print "Using %s to collect data from %s" % ([str(plugin.__class__)
for plugin in plugins],
proxy.ip)
d = defer.succeed(None)
# Run each plugin to complete C{equipment}
for plugin in plugins:
plugin.config = self.config
d.addCallback(lambda x: plugin.collectData(equipment, proxy))
# At the end, write C{equipment} to the database
d.addCallback(lambda _: DatabaseWriter(equipment, self.config).write(self.dbpool))
return d
开发者ID:ragingCow2,项目名称:wiremaps,代码行数:27,代码来源:core.py
示例9: subCommands
def subCommands(self):
from twisted import plugin
plugins = plugin.getPlugins(service.IServiceMaker)
self.loadedPlugins = {}
for plug in plugins:
self.loadedPlugins[plug.tapname] = plug
yield (plug.tapname, None, lambda: plug.options(), plug.description)
开发者ID:antong,项目名称:twisted,代码行数:7,代码来源:app.py
示例10: loadApps
def loadApps():
"""
Read the IResource plugins
"""
import goonsite.plugins
plugins = getPlugins(IAppFactory, goonsite.plugins)
return dict([(p.name, p) for p in plugins])
开发者ID:corydodt,项目名称:Personal,代码行数:7,代码来源:app.py
示例11: test_detectNewFiles
def test_detectNewFiles(self):
"""
Check that L{plugin.getPlugins} is able to detect plugins added at
runtime.
"""
FilePath(__file__).sibling('plugin_extra1.py'
).copyTo(self.package.child('pluginextra.py'))
try:
# Check that the current situation is clean
self.failIfIn('mypackage.pluginextra', sys.modules)
self.failIf(hasattr(sys.modules['mypackage'], 'pluginextra'),
"mypackage still has pluginextra module")
plgs = list(plugin.getPlugins(ITestPlugin, self.module))
# We should find 2 plugins: the one in testplugin, and the one in
# pluginextra
self.assertEquals(len(plgs), 2)
names = ['TestPlugin', 'FourthTestPlugin']
for p in plgs:
names.remove(p.__name__)
p.test1()
finally:
self._unimportPythonModule(
sys.modules['mypackage.pluginextra'],
True)
开发者ID:Alberto-Beralix,项目名称:Beralix,代码行数:27,代码来源:test_plugin.py
示例12: __init__
def __init__(self, config):
self.channels = config['channels']
self.nickname = config['nickname']
self.history = {}
self.loader = PluginLoader()
self.loader.config = config
if 'db' in config:
print('Loading db from config: ' + config['db'])
db_engine = sqlalchemy.create_engine(config['db'])
else:
print('Using in-memory db')
db_engine = sqlalchemy.create_engine('sqlite:///:memory:')
DBSession = orm.sessionmaker(db_engine)
session = DBSession()
self.loader.db = DB(db_engine, session)
# Load all plugins mentioned in the configuration. Allow globbing.
config_matches = set()
for plugin in getPlugins(BaseInterface, package=plugins):
for pattern in config['plugins']:
if fnmatch(plugin.name, pattern):
self.loader.registerPlugin(plugin)
config_matches.add(pattern)
break
for pattern in config['plugins']:
if pattern not in config_matches:
log.warning('No plugin matched pattern "%s"', pattern)
开发者ID:CMB,项目名称:hamper,代码行数:30,代码来源:commander.py
示例13: loadModule
def loadModule(self, name):
for module in getPlugins(IBotModule, heufybot.modules):
if module.name and module.name.lower() == name.lower():
rebuild(importlib.import_module(module.__module__))
self._loadModuleData(module)
return module.name
raise ModuleLoaderError(name, "The module could not be found.", ModuleLoadType.LOAD)
开发者ID:Heufneutje,项目名称:PyHeufyBot,代码行数:7,代码来源:modulehandler.py
示例14: _scan_sites
def _scan_sites(env):
"""
Search for available site configuration objects.
Register any found objects with the internal structures.
"""
global host_tree, debug_site_scan
import modu.sites
reload(modu.sites)
plugins = plugin.getPlugins(ISite, modu.sites)
for site_plugin in plugins:
if(debug_site_scan):
env['wsgi.errors'].write('Found %r\n' % (site_plugin,))
site = site_plugin()
app = Application(site)
root = app.tree.get_data_at('/')
webroot = os.path.join(app.approot, app.webroot)
app.tree.register('/', (static.FileResource, (webroot, root), {}), clobber=True)
domain = app.base_domain
if(domain.find(':') == -1):
domain += ':' + env['SERVER_PORT']
host_node = host_tree.setdefault(domain, url.URLNode())
base_path = app.base_path
if not(base_path):
base_path = '/'
if(debug_site_scan):
env['wsgi.errors'].write('Loaded %r for %r\n' % (site_plugin, domain))
host_node.register(base_path, app, clobber=True)
开发者ID:philchristensen,项目名称:modu,代码行数:35,代码来源:app.py
示例15: activate
def activate(self):
self.installable = []
self.uninstallable = []
installed = {}
available = {}
for i in self.store.query(InstalledPlugin):
installed[i.path] = i.version
for a in getPlugins(isqueal.ISquealPlugin, plugins):
path = a.__name__ + "." + a.__module__
available[path] = a
for path, plugin in available.items():
if path in installed:
self.uninstallable.append({
'plugin': plugin,
'version': a.version,
'path': path
})
if plugin.version > installed[path]:
log.warning("Plugin %s has been upgraded" % path)
elif plugin.version < installed[path]:
log.warning("Plugin %s has been downgraded" % path)
else:
self.installable.append({
'plugin': plugin,
'version': a.version,
'path': path
})
for path, version in installed.items():
if path not in available:
print "Plugin", path, "has been removed"
开发者ID:bne,项目名称:squeal,代码行数:33,代码来源:extension.py
示例16: retrieve_plugins
def retrieve_plugins(interface, cached=True, cache={}):
"""
Look up all plugins for a certain interface.
If the plugin cache is enabled, this function will not attempt to reload
plugins from disk or discover new plugins.
:param interface interface: the interface to use
:param bool cached: whether to use the in-memory plugin cache
:returns: a dict of plugins, keyed by name
:raises PluginException: no plugins could be found for the given interface
"""
if cached and interface in cache:
return cache[interface]
log.msg("Discovering %s..." % interface)
d = {}
for p in getPlugins(interface, bravo.plugins):
try:
verifyObject(interface, p)
log.msg(" ( ^^) Plugin: %s" % p.name)
d[p.name] = p
except BrokenImplementation, bi:
if hasattr(p, "name"):
log.msg(" ( ~~) Plugin %s is missing attribute %r!" %
(p.name, bi.name))
else:
log.msg(" ( >&) Plugin %s is unnamed and useless!" % p)
except BrokenMethodImplementation, bmi:
log.msg(" ( Oo) Plugin %s has a broken %s()!" % (p.name,
bmi.method))
log.err()
开发者ID:iamjagman,项目名称:bravo,代码行数:34,代码来源:plugin.py
示例17: __init__
def __init__(self, config):
self.loader = PluginLoader()
self.loader.config = config
self.history = {}
if 'db' in config:
print('Loading db from config: ' + config['db'])
db_engine = sqlalchemy.create_engine(config['db'])
else:
print('Using in-memory db')
db_engine = sqlalchemy.create_engine('sqlite:///:memory:')
DBSession = orm.sessionmaker(db_engine)
session = DBSession()
self.loader.db = DB(db_engine, session)
# Load all plugins mentioned in the configuration. Allow globbing.
print "Loading plugins", config["plugins"]
plugins = getPlugins(BaseInterface, package=hamper.plugins)
for plugin in plugins:
for pattern in config["plugins"]:
if fnmatch(plugin.name, pattern):
self.loader.registerPlugin(plugin)
开发者ID:CMB,项目名称:hamper,代码行数:25,代码来源:cli.py
示例18: _put_plugin_resources
def _put_plugin_resources(wcommon, client_resource):
"""Plugin-defined resources and client-configuration."""
load_list_css = []
load_list_js = []
mode_table = {}
plugin_resources = Resource()
client_resource.putChild('plugins', plugin_resources)
for resource_def in getPlugins(_IClientResourceDef, shinysdr.plugins):
# Add the plugin's resource to static serving
plugin_resources.putChild(resource_def.key, resource_def.resource)
plugin_resource_url = '/client/plugins/' + urllib.parse.quote(resource_def.key, safe='') + '/'
# Tell the client to load the plugins
# TODO constrain path values to be relative (not on a different origin, to not leak urls)
if resource_def.load_css_path is not None:
load_list_css.append(plugin_resource_url + resource_def.load_cs_path)
if resource_def.load_js_path is not None:
# TODO constrain value to be in the directory
load_list_js.append(plugin_resource_url + resource_def.load_js_path)
for mode_def in get_modes():
mode_table[mode_def.mode] = {
u'info_enum_row': mode_def.info.to_json(),
u'can_transmit': mode_def.mod_class is not None
}
plugin_index = {
'css': load_list_css,
'js': load_list_js,
'modes': mode_table,
}
client_resource.putChild('client-configuration', ClientConfigurationResource(wcommon, plugin_index))
开发者ID:kpreid,项目名称:shinysdr,代码行数:30,代码来源:webapp.py
示例19: retrieve_plugins
def retrieve_plugins(interface, cached=True, cache={}):
"""
Look up all plugins for a certain interface.
If the plugin cache is enabled, this function will not attempt to reload
plugins from disk or discover new plugins.
:param interface interface: the interface to use
:param bool cached: whether to use the in-memory plugin cache
:returns: a dict of plugins, keyed by name
:raises PluginException: no plugins could be found for the given interface
"""
if cached and interface in cache:
return cache[interface]
print "Discovering %s..." % interface
d = {}
for p in getPlugins(interface, beta.plugins):
print " ~ Plugin: %s" % p.name
d[p.name] = p
cache[interface] = d
return d
开发者ID:welterde,项目名称:beta,代码行数:25,代码来源:plugin.py
示例20: retrieve_plugins
def retrieve_plugins(interface, cached=True, cache={}):
"""
Look up all plugins for a certain interface.
If the plugin cache is enabled, this function will not attempt to reload
plugins from disk or discover new plugins.
:param interface interface: the interface to use
:param bool cached: whether to use the in-memory plugin cache
:returns: a dict of plugins, keyed by name
:raises PluginException: no plugins could be found for the given interface
"""
if cached and interface in cache:
return cache[interface]
log.msg("Discovering %s..." % interface)
d = {}
for p in getPlugins(interface, bravo.plugins):
try:
verify_plugin(interface, p)
d[p.name] = p
except PluginException:
pass
if issubclass(interface, ISortedPlugin):
# Sortable plugins need their edges mirrored.
d = add_plugin_edges(d)
cache[interface] = d
return d
开发者ID:gr8firedragon,项目名称:bravo,代码行数:32,代码来源:plugin.py
注:本文中的twisted.plugin.getPlugins函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论