本文整理汇总了Python中mpx.service.subscription_manager.SUBSCRIPTION_MANAGER类的典型用法代码示例。如果您正苦于以下问题:Python SUBSCRIPTION_MANAGER类的具体用法?Python SUBSCRIPTION_MANAGER怎么用?Python SUBSCRIPTION_MANAGER使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SUBSCRIPTION_MANAGER类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setUp
def setUp(self):
DefaultTestFixture.setUp(self)
self.__event_updated_values = {}
self.new_node_tree()
root = as_internal_node('/')
self._cov_counter = 0
GetException().configure({'name':'exception', 'parent':'/services'})
SUBSCRIPTION_MANAGER.configure({'debug':0,
'_normal_pool_size':2,
'_slow_pool_size':2,
'_prime_pool_size':2,
'_minimum_poll_interval':0.001,
'_slow_poll_threshold':0.500,
}
)
for i in range(0,10):
f = FastNode()
f.configure({'parent':root, 'name':"FastNode-%03d"%i})
s = SlowNode()
s.configure({'parent':root, 'name':"SlowNode-%03d"%i})
e = ErrorNode()
e.configure({'parent':root, 'name':"ErrorNode-%03d"%i})
b = BatchNode(i & 1)
b.configure({'parent':root, 'name':"BatchNode-%03d"%i})
root.start()
return
开发者ID:mcruse,项目名称:monotone,代码行数:26,代码来源:_test_case_subscription_manager.py
示例2: test_timeout
def test_timeout(self):
sids = []
for i in range(2):
if not i:
timeout = 1.0
else:
timeout = None
# ID3 is /services/time/UTC/milliseconds which should
# change "really fast."
sid = SUBSCRIPTION_MANAGER.create_polled({self.ID3:self.ID3},
timeout)
# Make sure it comes up.
t1 = time.time()
self.__values_changing(sid)
sids.append(sid)
# Double check the values are changing and that the subscriptions
# stay valid while we poll for values.
t1 = time.time()
while (time.time() - t1) < 2.0:
for sid in sids:
self.__values_changing(sid)
time.sleep(0.1)
# Now ensure that sid[0] times out...
sid = sids.pop(0)
t1 = time.time()
while sid in SUBSCRIPTION_MANAGER.diag_get_sids():
if (time.time()-t1) > 2.0:
raise "%r did not timeout." % sid
time.sleep(0.1)
# Finally, make sure that the other subscription is valid.
sid = sids.pop(0)
self.__values_changing(sid)
return
开发者ID:mcruse,项目名称:monotone,代码行数:33,代码来源:_test_case_subscription_manager.py
示例3: test_poll_all
def test_poll_all(self):
sid = SUBSCRIPTION_MANAGER.create_polled(self.nrt1to4)
nrt = SUBSCRIPTION_MANAGER.node_reference_table(sid)
if nrt != self.nrt1to4:
raise "Initial node reference table mismatch."
# Check that each invokation gets all values.
for i in range(0,10):
all_values = SUBSCRIPTION_MANAGER.poll_all(sid)
if len(all_values) != len(self.nrt1to4):
# We did not get all 4 values!
raise (
"poll_all(self.nrt1to4) did not return all values."
" (%d out of %d)" % (len(all_values),len(self.nrt1to4))
)
# Check that (eventually) all the values are result dictionaries.
all_values = SUBSCRIPTION_MANAGER.poll_all(sid)
t1 = time.time()
while (time.time() - t1) < 1.0:
if None not in all_values.values():
return
time.sleep(0.1)
all_values = SUBSCRIPTION_MANAGER.poll_all(sid)
if None in all_values.values():
raise (
"Never got changes for all four result dictionaries, %d." %
len(all_values)
)
return
开发者ID:mcruse,项目名称:monotone,代码行数:28,代码来源:_test_case_subscription_manager.py
示例4: test_targeted_event_handling
def test_targeted_event_handling(self):
event_maker = EventProducerTestClass()
event_maker.configure({'name':'EventProducerTester','parent':'/'})
event_maker.start()
nr = {1:event_maker}
sid = SUBSCRIPTION_MANAGER.create_delivered(self, nr)
# Wait for polling to start and verify value made it without any events
t1 = time.time()
while (time.time() - t1) < 1.0:
all_values = SUBSCRIPTION_MANAGER.poll_all(sid)
time.sleep(0.1)
# Check that subscription value is the initial value of 100
if all_values[1]['value'] != 100:
raise ("polled_event_handling did not return inital value: " +
str(all_values[1]['value']))
# make a rapid series of changes to the node value
for i in range(10):
event_maker._cov_check(i)
time.sleep(0.1)
# check change count, should be approx 10
value_updates = self.__event_updated_values[1]['changes']
cov_counts = self._cov_counter
if value_updates < cov_counts:
raise (
"Targeted event handling event count did not match %d vs %d"
% (value_updates, cov_counts)
)
开发者ID:mcruse,项目名称:monotone,代码行数:28,代码来源:_test_case_subscription_manager.py
示例5: test_replace
def test_replace(self):
sid = SUBSCRIPTION_MANAGER.create_polled(self.nrt1to2)
nrt = SUBSCRIPTION_MANAGER.node_reference_table(sid)
if nrt != self.nrt1to2:
raise "Initial node reference table mismatch."
SUBSCRIPTION_MANAGER.replace(sid, self.nrt3to4)
nrt = SUBSCRIPTION_MANAGER.node_reference_table(sid)
if nrt != self.nrt3to4:
raise "Replaced node reference table mismatch."
return
开发者ID:mcruse,项目名称:monotone,代码行数:10,代码来源:_test_case_subscription_manager.py
示例6: test_empty
def test_empty(self):
sid = SUBSCRIPTION_MANAGER.create_polled(self.nrt1to4)
nrt = SUBSCRIPTION_MANAGER.node_reference_table(sid)
if nrt != self.nrt1to4:
raise "Initial node reference table mismatch."
SUBSCRIPTION_MANAGER.empty(sid)
nrt = SUBSCRIPTION_MANAGER.node_reference_table(sid)
if nrt != {}:
raise "Node reference table not empty."
return
开发者ID:mcruse,项目名称:monotone,代码行数:10,代码来源:_test_case_subscription_manager.py
示例7: __values_changing
def __values_changing(self, sid):
r1 = SUBSCRIPTION_MANAGER.poll_all(sid)
t1 = time.time()
while 1:
changed_values = SUBSCRIPTION_MANAGER.poll_changed(sid)
if len(changed_values):
return
if (time.time() - t1) > 1.0:
raise "Never got changes for any of the values."
time.sleep(0.1)
assert 1, "Can't reach here."
开发者ID:mcruse,项目名称:monotone,代码行数:11,代码来源:_test_case_subscription_manager.py
示例8: cancel_batch_async
def cancel_batch_async(self, sessionID):
if self._subscriptions.has_key(sessionID):
try:
subscription = self._subscriptions[sessionID]
del self._subscriptions[sessionID]
del self._qualified_method_list[sessionID]
del self._services[sessionID]
del self._methods[sessionID]
SUBSCRIPTION_MANAGER.destroy(subscription)
except:
msglog.log('RNA_XMLRPC_Handler',msglog.types.WARN,
"Error destroying subscription %r." % (subscription,))
msglog.exception()
return
开发者ID:mcruse,项目名称:monotone,代码行数:14,代码来源:rna_xmlrpc.py
示例9: test_add_and_get
def test_add_and_get(self):
st_time = time.time()
rdict = SUBSCRIPTION_MANAGER.create_polled_and_get(self.nrt1to2)
# Ensure we got back values for everything
assert rdict['sid'] != None, "sid is not set in results dictionary."
for x in rdict['values'].values():
assert x != None, "Got None in values: %s." % str(rdict['values'])
开发者ID:mcruse,项目名称:monotone,代码行数:7,代码来源:_test_case_subscription_manager.py
示例10: _print_subscriptions
def _print_subscriptions(self):
print ""
print "*"*60
for s in SUBSCRIPTION_MANAGER.diag_get_subscriptions():
print s
print "*"*60
return
开发者ID:mcruse,项目名称:monotone,代码行数:7,代码来源:_test_case_subscription_manager.py
示例11: _print_mnbs
def _print_mnbs(self):
print ""
print "*"*60
for s in SUBSCRIPTION_MANAGER.diag_get_mnbs():
print s
print "*"*60
return
开发者ID:mcruse,项目名称:monotone,代码行数:7,代码来源:_test_case_subscription_manager.py
示例12: test_poll_changed
def test_poll_changed(self):
sid = SUBSCRIPTION_MANAGER.create_polled(self.nrt1to4)
nrt = SUBSCRIPTION_MANAGER.node_reference_table(sid)
if nrt != self.nrt1to4:
raise "Initial node reference table mismatch."
all_values = {}
time.sleep(0.1)
t1 = time.time()
while (time.time() - t1) < 1.0:
time.sleep(0.1)
changed_values = SUBSCRIPTION_MANAGER.poll_changed(sid)
all_values.update(changed_values)
if len(all_values) == 4:
# We got all 4 values!
return
raise "Never got changes for all four values, %d." % len(all_values)
return
开发者ID:mcruse,项目名称:monotone,代码行数:17,代码来源:_test_case_subscription_manager.py
示例13: test_poll_all_plus_exceptions
def test_poll_all_plus_exceptions(self):
SUBSCRIPTION_MANAGER._set_tunable_parameters({
'minimum_poll_interval':0.0,
})
nrt1to4bad5to6 = {}
nrt1to4bad5to6.update(self.nrt1to4)
nrt1to4bad5to6['/services/time/is/an/illusion'] = (
'/services/time/is/an/illusion'
)
nrt1to4bad5to6['/services/exception'] = '/services/exception'
sid = SUBSCRIPTION_MANAGER.create_polled(nrt1to4bad5to6)
nrt = SUBSCRIPTION_MANAGER.node_reference_table(sid)
if nrt != nrt1to4bad5to6:
raise "Initial node reference table mismatch."
# Check that each invokation gets all values.
for i in range(0,10):
all_values = SUBSCRIPTION_MANAGER.poll_all(sid)
if len(all_values) != len(nrt1to4bad5to6):
# We did not get all 4 values!
raise (
"poll_all(self.nrt1to4) did not return all values."
" (%d out of %d)" % (len(all_values),len(nrt1to4bad5to6))
)
# Check that (eventually) all the values are result dictionaries.
all_values = SUBSCRIPTION_MANAGER.poll_all(sid)
t1 = time.time()
while (time.time() - t1) < 1.0:
if None not in all_values.values():
self.__all_plus_exceptions_check(all_values)
# Finally, test that a new subscription gets the correct
# results.
time.sleep(0.1)
sid = SUBSCRIPTION_MANAGER.create_polled(nrt1to4bad5to6)
time.sleep(0.1)
all_values = SUBSCRIPTION_MANAGER.poll_all(sid)
self.__all_plus_exceptions_check(all_values)
time.sleep(0.1)
all_values = SUBSCRIPTION_MANAGER.poll_all(sid)
self.__all_plus_exceptions_check(all_values)
return
time.sleep(0.1)
all_values = SUBSCRIPTION_MANAGER.poll_all(sid)
if None in all_values.values():
raise ("Never got values for all nodes: %r." % all_values)
return
开发者ID:mcruse,项目名称:monotone,代码行数:45,代码来源:_test_case_subscription_manager.py
示例14: test_create_delivered
def test_create_delivered(self):
sid = SUBSCRIPTION_MANAGER.create_delivered(self, self.nrt1to4)
nrt = SUBSCRIPTION_MANAGER.node_reference_table(sid)
if nrt != self.nrt1to4:
raise "Initial node reference table mismatch."
time.sleep(0.1)
t1 = time.time()
while (time.time() - t1) < 1.0:
self.__event_lock.acquire()
try:
if len(self.__event_updated_values) == 4:
# We got all 4 values!
return
finally:
self.__event_lock.release()
time.sleep(0.1)
if len(self.__event_updated_values) != 4:
raise (("Never got changes for all four values, only %d.\n"
"Values: %r") % (len(self.__event_updated_values),
self.__event_updated_values))
开发者ID:mcruse,项目名称:monotone,代码行数:20,代码来源:_test_case_subscription_manager.py
示例15: __init__
def __init__(self):
ServiceNode.__init__(self)
# Instantiate implicit children.
from mpx.service.interactive import InteractiveService
InteractiveService().configure({'name':'Interactive Service',
'parent':self,'debug':0,
'port':9666,'interface':'localhost'})
from mpx.service.time import Time
Time().configure({'name':'time','parent':self})
from mpx.service.session import SessionManager
SessionManager().configure({'name':'session_manager', 'parent':self})
from mpx.service.user_manager import UserManager
UserManager().configure({'name':'User Manager', 'parent':self})
from mpx.service.subscription_manager import SUBSCRIPTION_MANAGER
SUBSCRIPTION_MANAGER.configure({'name':'Subscription Manager',
'parent':self})
# Guarantee that /services/garbage_collector will exist, whether or not
# there is an entry in the XML file.
from mpx.service.garbage_collector import GARBAGE_COLLECTOR
# @todo Make ReloadableSingleton!
GARBAGE_COLLECTOR.configure({'name':'garbage_collector',
'parent':self})
开发者ID:mcruse,项目名称:monotone,代码行数:22,代码来源:__init__.py
示例16: test_modify
def test_modify(self):
sid = SUBSCRIPTION_MANAGER.create_polled(self.nrt1to2)
nrt = SUBSCRIPTION_MANAGER.node_reference_table(sid)
if nrt != self.nrt1to2:
raise "Initial node reference table mismatch."
SUBSCRIPTION_MANAGER.modify(sid, self.ID2, self.ID3)
nrt = SUBSCRIPTION_MANAGER.node_reference_table(sid)
if nrt == self.nrt1to2:
raise "Modified node reference table not modified."
if nrt != {self.ID1:self.ID1,self.ID2:self.ID3}:
raise "Modified node reference table mismatch."
try:
SUBSCRIPTION_MANAGER.modify(sid, self.ID3, self.ID2)
except ENoSuchNodeID:
pass
else:
raise "No such NodeID not detected."
return
开发者ID:mcruse,项目名称:monotone,代码行数:18,代码来源:_test_case_subscription_manager.py
示例17: test_add
def test_add(self):
sid = SUBSCRIPTION_MANAGER.create_polled(self.nrt1to2)
nrt = SUBSCRIPTION_MANAGER.node_reference_table(sid)
if nrt != self.nrt1to2:
raise "Initial node reference table mismatch."
SUBSCRIPTION_MANAGER.add(sid, self.ID5, self.ID5)
nrt = SUBSCRIPTION_MANAGER.node_reference_table(sid)
nrt125 = {}
nrt125.update(self.nrt1to2)
nrt125[self.ID5] = self.ID5
if nrt != nrt125:
raise "Node reference table mismatch."
try:
SUBSCRIPTION_MANAGER.add(sid, self.ID5, self.ID5)
except ENodeIDExists:
pass
else:
raise "Node ID in use not detected."
return
开发者ID:mcruse,项目名称:monotone,代码行数:19,代码来源:_test_case_subscription_manager.py
示例18: test_remove
def test_remove(self):
sid = SUBSCRIPTION_MANAGER.create_polled(self.nrt1to4)
nrt = SUBSCRIPTION_MANAGER.node_reference_table(sid)
if nrt != self.nrt1to4:
raise "Initial node reference table mismatch."
SUBSCRIPTION_MANAGER.remove(sid, self.ID2)
nrt = SUBSCRIPTION_MANAGER.node_reference_table(sid)
nrt134 = {}
nrt134.update(self.nrt1to4)
del nrt134[self.ID2]
if nrt != nrt134:
raise "Node reference table mismatch."
try:
SUBSCRIPTION_MANAGER.remove(sid, self.ID2)
except ENoSuchNodeID:
pass
else:
raise "No such NodeID not detected."
return
开发者ID:mcruse,项目名称:monotone,代码行数:19,代码来源:_test_case_subscription_manager.py
示例19: test_destroy_batch
def test_destroy_batch(self):
sids = []
for i in range(2):
# BatchNodes change "really fast."
sid = SUBSCRIPTION_MANAGER.create_polled(self.nrtB10)
# Make sure it comes up.
t1 = time.time()
self.__values_changing(sid)
sids.append(sid)
# Double check the values are changing.
for sid in sids:
self.__values_changing(sid)
# Now nuke one of the suscriptions and see that the other stays valid.
sid = sids.pop(0)
SUBSCRIPTION_MANAGER.destroy(sid)
try:
SUBSCRIPTION_MANAGER.destroy(sid)
except ENoSuchSubscription:
pass
else:
raise "No such subscription not detected."
# Make sure that the other subscription is valid.
sid = sids.pop(0)
self.__values_changing(sid)
if len(SUBSCRIPTION_MANAGER.diag_get_mnrs()) != 10:
raise (
"Bogus test, there should be 10 mnr at this point, not %r." %
len(SUBSCRIPTION_MANAGER.diag_get_mnrs()))
if len(SUBSCRIPTION_MANAGER.diag_get_mnbs()) != 1:
raise (
"Bogus test, there should be 1 mnb at this point, not %r." %
len(SUBSCRIPTION_MANAGER.diag_get_mnbs()))
SUBSCRIPTION_MANAGER.destroy(sid)
# Make sure that the mnr is removed when the last snr is deleted.
if len(SUBSCRIPTION_MANAGER.diag_get_mnrs()) != 0:
raise (
"There should not be any mnrs at this point,"
" but there are %r." %
len(SUBSCRIPTION_MANAGER.diag_get_mnrs()))
# Finally, make sure that the mnb is removed when the last mnr is
# deleted.
if len(SUBSCRIPTION_MANAGER.diag_get_mnbs()) != 0:
raise (
"There should not be any mnbs at this point,"
" but there are %r." %
len(SUBSCRIPTION_MANAGER.diag_get_mnbs()))
return
开发者ID:mcruse,项目名称:monotone,代码行数:47,代码来源:_test_case_subscription_manager.py
示例20: test_node_reference_table
def test_node_reference_table(self):
sid = SUBSCRIPTION_MANAGER.create_polled(self.nrt1to2)
nrt = SUBSCRIPTION_MANAGER.node_reference_table(sid)
if nrt != self.nrt1to2:
raise "Node reference table mismatch."
return
开发者ID:mcruse,项目名称:monotone,代码行数:6,代码来源:_test_case_subscription_manager.py
注:本文中的mpx.service.subscription_manager.SUBSCRIPTION_MANAGER类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论