本文整理汇总了Python中twisted.protocols.amp.AMP类的典型用法代码示例。如果您正苦于以下问题:Python AMP类的具体用法?Python AMP怎么用?Python AMP使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了AMP类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, agent):
"""
:param IConvergenceAgent agent: Convergence agent to notify of changes.
"""
locator = _AgentLocator(agent)
AMP.__init__(self, locator=locator)
self.agent = agent
开发者ID:ALSEDLAH,项目名称:flocker,代码行数:7,代码来源:_protocol.py
示例2: connected_amp_protocol
def connected_amp_protocol():
"""
:return: ``AMP`` hooked up to transport.
"""
p = AMP()
p.makeConnection(StringTransport())
return p
开发者ID:332054781,项目名称:flocker,代码行数:7,代码来源:amp.py
示例3: test_adds_user
def test_adds_user(self):
"""
When L{UserAdder} is connected to L{IdentityAdmin}, the L{AddUser}
command is called and L{IdentityAdmin} adds the user to its factory's
store.
"""
admin = IdentityAdmin()
admin.factory = self.adminFactory
serverTransport = makeFakeServer(admin)
serverTransport.getQ2QHost = lambda: Q2QAddress('Q2Q Host')
client = AMP()
pump = connect(admin, serverTransport, client, makeFakeClient(client))
d = client.callRemote(AddUser, name='q2q username',
password='q2q password')
pump.flush()
# The username and password are added, along with the domain=q2q
# host, to the IdentityAdmin's factory's store
self.assertEqual([call('Q2Q Host', 'q2q username', 'q2q password')],
self.addUser.calls)
# The server responds with {}
self.assertEqual({}, self.successResultOf(d))
开发者ID:chellygel,项目名称:vertex,代码行数:26,代码来源:test_standalone.py
示例4: connectionLost
def connectionLost(self, reason):
for id, producer in self._producers.items():
consumer = self._consumers.get(id)
if consumer is None:
self._finishReceiving(id, reason)
else:
self._finishSending(id, reason)
AMP.connectionLost(self, reason)
开发者ID:habnabit,项目名称:txampproducer,代码行数:8,代码来源:__init__.py
示例5: __init__
def __init__(self, *a, **kw):
AMP.__init__(self, *a, **kw)
self._producers = {}
self._consumers = {}
self._buffers = {}
self._pending = {}
self._waitingOnCompletion = {}
self._draining = set()
开发者ID:habnabit,项目名称:txampproducer,代码行数:8,代码来源:__init__.py
示例6: connectionLost
def connectionLost(self, reason):
"""
If a login has happened, perform a logout.
"""
AMP.connectionLost(self, reason)
if self.logout is not None:
self.logout()
self.boxReceiver = self.logout = None
开发者ID:bne,项目名称:squeal,代码行数:8,代码来源:ampauth.py
示例7: run_amp_command
def run_amp_command(description, command, args, reactor=None):
if reactor is None:
from twisted.internet import reactor
endpoint = endpoints.clientFromString(reactor, description)
amp = AMP()
d = endpoints.connectProtocol(endpoint, amp)
d.addCallback(lambda ign: amp.callRemote(command, **args))
return d
开发者ID:habnabit,项目名称:passacre-urwid,代码行数:8,代码来源:purwid.py
示例8: startReceivingBoxes
def startReceivingBoxes(self, sender):
AMP.startReceivingBoxes(self, sender)
counts = []
for i in range(random.randrange(1, 5)):
d = self.callRemote(Count)
d.addCallback(display, self.identifier)
counts.append(d)
gatherResults(counts).chainDeferred(self.finished)
开发者ID:pombredanne,项目名称:epsilon,代码行数:9,代码来源:route_client.py
示例9: __init__
def __init__(self, reactor, agent):
"""
:param IReactorTime reactor: A reactor to use to schedule periodic ping
operations.
:param IConvergenceAgent agent: Convergence agent to notify of changes.
"""
locator = _AgentLocator(agent)
AMP.__init__(self, locator=locator)
self.agent = agent
self._pinger = Pinger(reactor)
开发者ID:achanda,项目名称:flocker,代码行数:10,代码来源:_protocol.py
示例10: __init__
def __init__(self, reactor, control_amp_service):
"""
:param reactor: See ``ControlServiceLocator.__init__``.
:param ControlAMPService control_amp_service: The service managing AMP
connections to the control service.
"""
locator = ControlServiceLocator(reactor, control_amp_service)
AMP.__init__(self, locator=locator)
self.control_amp_service = control_amp_service
self._pinger = Pinger(reactor)
开发者ID:stmcginnis,项目名称:flocker,代码行数:10,代码来源:_protocol.py
示例11: __init__
def __init__(self, reactor, agent):
"""
:param IReactorTime reactor: A reactor to use to schedule periodic ping
[email protected]
:param IConvergenceAgent agent: Convergence agent to notify of changes.
"""
locator = _AgentLocator(agent, timeout_for_protocol(reactor, self))
AMP.__init__(self, locator=locator)
self.agent = agent
self._pinger = Pinger(reactor)
开发者ID:Elenw,项目名称:flocker,代码行数:10,代码来源:_protocol.py
示例12: connectionLost
def connectionLost(self, reason):
"""
Inform the associated L{conncache.ConnectionCache} that this
protocol has been disconnected.
"""
self.nexus.conns.connectionLostForKey((endpoint.Q2QEndpoint(
self.nexus.svc,
self.nexus.addr,
self.transport.getQ2QPeer(),
PROTOCOL_NAME), None))
AMP.connectionLost(self, reason)
开发者ID:chellygel,项目名称:vertex,代码行数:11,代码来源:sigma.py
示例13: test_periodic_noops
def test_periodic_noops(self):
"""
When connected, the protocol sends ``NoOp`` commands at a fixed
interval.
"""
expected_pings = 3
reactor = Clock()
locator = _NoOpCounter()
peer = AMP(locator=locator)
protocol = self.build_protocol(reactor)
pump = connectedServerAndClient(lambda: protocol, lambda: peer)[2]
for i in range(expected_pings):
reactor.advance(PING_INTERVAL.total_seconds())
peer.callRemote(NoOp) # Keep the other side alive past its timeout
pump.flush()
self.assertEqual(locator.noops, expected_pings)
开发者ID:wangbinxiang,项目名称:flocker,代码行数:16,代码来源:test_protocol.py
示例14: test_stuff
def test_stuff(self):
svc = FakeQ2QService()
serverAddr = q2q.Q2QAddress("domain", "accounts")
server = AMP()
def respond(box):
self.assertEqual(box['_command'], "add_user")
self.assertEqual(box['name'], "user")
self.assertEqual(box['password'], "password")
return AmpBox()
server.amp_ADD_USER = respond
factory = Factory.forProtocol(lambda: server)
chooser = {"identity-admin": factory}
svc.listenQ2Q(serverAddr, chooser, "Admin")
d = q2qclient.enregister(svc, q2q.Q2QAddress("domain", "user"), "password")
svc.flush()
self.successResultOf(d)
开发者ID:derwolfe,项目名称:vertex,代码行数:21,代码来源:test_q2qclient.py
示例15: setUp
def setUp(self):
self.managerTransport = StringTransport()
self.managerAMP = LocalWorkerAMP()
self.managerAMP.makeConnection(self.managerTransport)
self.result = TestResult()
self.workerTransport = StringTransport()
self.worker = AMP()
self.worker.makeConnection(self.workerTransport)
config = trial.Options()
self.testName = "twisted.doesnexist"
config['tests'].append(self.testName)
self.testCase = trial._getSuite(config)._tests.pop()
self.managerAMP.run(self.testCase, self.result)
self.managerTransport.clear()
开发者ID:12019,项目名称:OpenWrt_Luci_Lua,代码行数:16,代码来源:test_worker.py
示例16: setUp
def setUp(self):
"""
Create a L{CredReceiver} hooked up to a fake L{IBoxSender} which
records boxes sent through it.
"""
self.username = '[email protected]'
self.password = 'foo bar baz'
self.checker = InMemoryUsernamePasswordDatabaseDontUse()
self.checker.addUser(self.username, self.password)
self.avatar = StubAvatar()
self.realm = StubRealm(self.avatar)
self.portal = Portal(self.realm, [self.checker])
self.server = CredReceiver()
self.server.portal = self.portal
self.client = AMP()
self.finished = loopbackAsync(self.server, self.client)
开发者ID:pombredanne,项目名称:epsilon,代码行数:16,代码来源:test_ampauth.py
示例17: __init__
def __init__(self):
MultiplexingCommandLocator.__init__(self)
AMP.__init__(self)
开发者ID:BanzaiMan,项目名称:merlyn,代码行数:3,代码来源:service.py
示例18: startReceivingBoxes
def startReceivingBoxes(self, sender):
"""
Start observing log events for stat events to send.
"""
AMP.startReceivingBoxes(self, sender)
log.addObserver(self._emit)
开发者ID:fusionapp,项目名称:mantissa,代码行数:6,代码来源:stats.py
示例19: __init__
def __init__(self, identifier):
AMP.__init__(self)
self.identifier = identifier
self.finished = Deferred()
开发者ID:pombredanne,项目名称:epsilon,代码行数:4,代码来源:route_client.py
示例20: connectionLost
def connectionLost(self, reason):
AMP.connectionLost(self, reason)
self.agent.disconnected()
self._pinger.stop()
开发者ID:Elenw,项目名称:flocker,代码行数:4,代码来源:_protocol.py
注:本文中的twisted.protocols.amp.AMP类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论