本文整理汇总了Python中stopwatch.Stopwatch类的典型用法代码示例。如果您正苦于以下问题:Python Stopwatch类的具体用法?Python Stopwatch怎么用?Python Stopwatch使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Stopwatch类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_sanity
def test_sanity(self):
lock1 = threading.RLock()
lock2 = threading.RLock()
def sleep_and_inc(lock,sleep_time):
with synchronize_using(lock):
time.sleep(sleep_time)
lock.sync_test_counter = getattr(lock,'sync_test_counter',0) + 1
sleep_time = 0.5
n = 4
s = Stopwatch()
lst_threads = []
for lock in [lock1,lock2]:
for _ in xrange(n):
t = threading.Thread(target=sleep_and_inc,args=[lock,sleep_time])
lst_threads.append(t)
t.start()
# wait for all threads, then check results
for t in lst_threads:
t.join()
duration = s.duration()
self.assertEqual(lock1.sync_test_counter,n)
self.assertEqual(lock2.sync_test_counter,n)
ideal_time = n*sleep_time
self.assert_(ideal_time*0.9 < duration < ideal_time*1.1, "duration=%s, ideal=%s" % (duration,ideal_time))
开发者ID:idobarkan,项目名称:my-code,代码行数:29,代码来源:test_synchronization.py
示例2: timeTrial
def timeTrial(n):
a = stdarray.create1D(n, 0)
for i in range(n):
a[i] = stdrandom.uniformInt(-1000000, 1000000)
watch = Stopwatch()
count = threesum.countTriples(a)
return watch.elapsedTime()
开发者ID:davidhuizhou,项目名称:python,代码行数:7,代码来源:doublingtest.py
示例3: RaisArmTime
class RaisArmTime(Command):
def __init__(self, robot, raise_speed, stop_time, name=None, timeout=None):
'''
Constructor
'''
super().__init__(name, timeout)
self._robot = robot
self._raise_speed = raise_speed
self._stop_time = stop_time
self.requires(robot.arm)
self._stopwatch = Stopwatch()
def initialize(self):
"""Called before the Command is run for the first time."""
self._stopwatch.start()
def execute(self):
"""Called repeatedly when this Command is scheduled to run"""
self._robot.arm.move_arm(self._raise_speed)
def isFinished(self):
"""Returns true when the Command no longer needs to be run"""
return self._stopwatch.elapsed_time_in_secs() >= self._stop_time
def end(self):
"""Called once after isFinished returns true"""
self._robot.arm.move_arm(0)
def interrupted(self):
"""Called when another command which requires one or more of the same subsystems is scheduled to run"""
self.end()
开发者ID:TechnoJays,项目名称:robot2016,代码行数:32,代码来源:raise_arm_time.py
示例4: GetDataForCatsFromServer
def GetDataForCatsFromServer(cat, desc, orgId, orgName, action):
#action = 'GetData'
#action = 'GetDataTest'
reqUrl = base_url + '/api/Search/' + action #GetData
SearchDataRequest = {
'ContextTypes': 'Organization',
'BuiltInCategories': [cat],
'Version': '2019',
'OrgId': orgId
}
sw = Stopwatch()
while True:
verbose = False
if verbose:
print("before GetData", SearchDataRequest['BuiltInCategories'], desc, reqUrl )
r = requests.post( reqUrl, json = SearchDataRequest, headers=headers)
if r.status_code == 200:
break
print("after GetData", r.status_code, orgId, orgName)
if r.status_code == 401:
ensureLogin()
else:
print(r.text)
raise
break # or even throw?
sw = Stopwatch() # start new stopwatch.
m = sw.measure(silent=True)
return (r,m)
开发者ID:pylgrym,项目名称:first_app,代码行数:34,代码来源:bimtok.py
示例5: test_n_threads
def test_n_threads(self):
sleep_time = 0.1
class Sleeper(object):
def __init__(self):
self._lock = threading.RLock() # used by @synchronized decorator
self.i = 0
self.thread_ids = set()
@synchronized
def _critical_section(self):
self.thread_ids.add(id(threading.currentThread()))
self.i += 1
def sleep(self):
time.sleep(sleep_time)
self._critical_section()
n_threads = 5
n_calls = 20
ao = AO.AO(Sleeper(), n_threads)
ao.start()
self.aos_to_stop.append(ao)
s = Stopwatch()
futures = [ao.sleep() for i in xrange(n_calls)]
for f in futures:
f.get()
duration = s.duration()
expected = sleep_time * n_calls / float(n_threads)
self.assert_(0.9 * expected < duration < 1.2 * expected, "duration=%s, expected=%s" % (duration, expected))
self.assertEqual(ao.obj.i, n_calls)
self.assertEqual(len(ao.obj.thread_ids), n_threads)
self.failIf(id(threading.currentThread()) in ao.obj.thread_ids)
开发者ID:giltayar,项目名称:Python-Exercises,代码行数:34,代码来源:test_AO.py
示例6: do_GET
def do_GET(self):
if 0:
self.send_response(200)
self.end_headers()
self.wfile.write(b'fast')
return
watch = Stopwatch()
global ourData
try:
args = self.path.split('?')
if len(args)<2:
print("no size arg", args)
return
sizearg = int(args[1])
front = '[%s]\n' % sizearg
except Exception as e:
self.send_response(500)
self.end_headers()
err_out = str(e).encode()
self.wfile.write(err_out)
return
self.send_response(200)
self.end_headers()
self.wfile.write(front.encode())
#print('len:', len(ourData.slices.get(sizearg,'missing')))
self.wfile.write( ourData.slices.get(sizearg,'missing') )
watch.measure()
开发者ID:pylgrym,项目名称:first_app,代码行数:33,代码来源:netloader.py
示例7: test_no_sync
def test_no_sync(self):
stopwatch = Stopwatch()
threading.Thread(target=self.s.add,args=[10]).start()
time.sleep(0.05) # make sure other thread gets head start
val = self.s.add(5)
self.assertEqual(val,0)
duration = stopwatch.duration()
self.assert_(duration < 1.2*self.s.sleep_time, 'calls took too long. duration=%s' % duration)
开发者ID:idobarkan,项目名称:my-code,代码行数:9,代码来源:test_synchronization.py
示例8: test_normal
def test_normal(self):
stopwatch = Stopwatch()
threading.Thread(target=self.s.sync_add,args=[10]).start()
time.sleep(0.05) # make sure other thread gets head start (and the mutex)
val = self.s.sync_add(5)
self.assertEqual(val,10)
duration = stopwatch.duration()
self.assert_(duration > 1.9*self.s.sleep_time, 'calls completed too quickly. duration=%s' % duration)
开发者ID:idobarkan,项目名称:my-code,代码行数:9,代码来源:test_synchronization.py
示例9: test_timeout
def test_timeout(self):
# preset future - should return within timeout
f = future.Future.preset(3)
self.assertEqual(f.get(100),3)
# unset future - should raise exception
timeout = 1 # timeout in seconds
f = future.Future()
stopwatch = Stopwatch()
stopwatch.start()
self.assertRaises(future.FutureTimeoutException,f.get,timeout*1000)
duration = stopwatch.duration()
self.assert_(0.9*timeout < duration < 1.3*timeout, 'duration=%s' % duration)
开发者ID:idobarkan,项目名称:my-code,代码行数:13,代码来源:test_future.py
示例10: doDB_Work
def doDB_Work(limit=9999):
dbServer = r'jg-pc\jg1'
dbName = 'Ajour_System_A/S_10aee6e8b7dd4f4a8fd0cbf9cedf91cb'
theDBCfg = { 'server': dbServer, 'db': dbName, 'user': 'sa', 'pwd': 'morOg234' }
with getConn(theDBCfg) as conn:
csr = conn.cursor()
sw = Stopwatch()
a = selectFromDB(csr)
sw.measure()
print(len(a))
开发者ID:pylgrym,项目名称:first_app,代码行数:13,代码来源:sqls_tiny2.py
示例11: getBCbyVsId
def getBCbyVsId(
vsId = 'B14462',
releaseId='624d9489-0f8c-48db-99f9-67701a040742'
):
global headers, url
bcUrl = url + '/api/v1/BuildingComponent/VSID/{vsId}/Release/{releaseId}'.format(vsId=vsId, releaseId=releaseId)
watch = Stopwatch('req')
r = requests.get( bcUrl, headers=headers)
watch.measure()
logIfBad(r)
exes = json.loads(r.text)
print(len(r.text))
if 0:
pprint.pprint(exes)
开发者ID:pylgrym,项目名称:first_app,代码行数:14,代码来源:mol1.py
示例12: getBC_chunk
def getBC_chunk(chunkSet):
global headers, url
bcUrl = url + '/api/v1/BuildingComponent/VSIDs/Release/624d9489-0f8c-48db-99f9-67701a040742?' + chunkSet #bcArgs
print(bcUrl)
#return
watch = Stopwatch('req')
r = requests.post( bcUrl, headers=headers)
watch.measure()
logIfBad(r)
exes = json.loads(r.text)
print('len:', len(r.text))
with open( 'BCs.json', "w") as outfile:
json.dump(exes, outfile, indent=2)
if 0:
pprint.pprint(exes)
开发者ID:pylgrym,项目名称:first_app,代码行数:15,代码来源:mol1.py
示例13: getBCs
def getBCs(
releaseId='624d9489-0f8c-48db-99f9-67701a040742'
):
global headers, url
bcUrl = url + '/api/v1/BuildingComponent/VSIDs/Release/{releaseId}?VSID1=B33321&VSID2=B96932&VSID3=B75326&VSID4=B36547&VSID5=B20553&VSID6=B59061&VSID7=B58491&VSID8=B80296&VSID9=B81223'.format(releaseId=releaseId)
print(bcUrl)
watch = Stopwatch('req')
r = requests.post( bcUrl, headers=headers)
watch.measure()
logIfBad(r)
exes = json.loads(r.text)
print(len(r.text))
with open( 'BCs.json', "w") as outfile:
json.dump(exes, outfile, indent=2)
if 0:
pprint.pprint(exes)
开发者ID:pylgrym,项目名称:first_app,代码行数:16,代码来源:mol1.py
示例14: getPrices
def getPrices():
global headers, url
releaseID = '624d9489-0f8c-48db-99f9-67701a040742'
priceUrl = url + '/api/v1/BuildingComponent/Calculate/Release/%s?' % releaseID
priceUrl += 'id1=d03b8b89-fb70-47da-9a18-011132b22921&id2=32e920bb-e927-4a10-ad85-14c32bd197ff&id3=3dbf3ce5-f7e5-4745-bf18-1bd766c1b54d'
priceUrl += '&amount1=10&amount2=10&amount3=10'
priceUrl += "&yourCalculationId=1246" #{myRandomCalculationNumber}
print(priceUrl)
watch = Stopwatch('req')
r = requests.post( priceUrl, headers=headers)
watch.measure()
logIfBad(r)
prices = json.loads(r.text)
if 0:
pprint.pprint(prices)
with open( 'prices.json', "w") as outfile:
json.dump(prices, outfile, indent=2)
开发者ID:pylgrym,项目名称:first_app,代码行数:17,代码来源:mol1.py
示例15: getPrice_chunk
def getPrice_chunk(chunkSet, chunkIx):
global headers, url
releaseID = '624d9489-0f8c-48db-99f9-67701a040742'
priceUrl = url + '/api/v1/BuildingComponent/Calculate/Release/%s?' % releaseID
priceUrl += chunkSet
print(chunkIx, priceUrl)
#return
watch = Stopwatch('req')
r = requests.post( priceUrl, headers=headers)
watch.measure()
logIfBad(r)
prices = json.loads(r.text)
if 0:
pprint.pprint(prices)
with open( 'prices.json', "w") as outfile:
json.dump(prices, outfile, indent=2)
开发者ID:pylgrym,项目名称:first_app,代码行数:17,代码来源:mol1.py
示例16: getBCbyGroup
def getBCbyGroup( # get the BCs within a BC-group.
groupId = 'B14462',
releaseId='624d9489-0f8c-48db-99f9-67701a040742'
):
global headers, url
bcUrl = url + '/api/v1/BuildingComponent/BuildingComponentGroup/{groupId}/Release/{releaseId}'.format(groupId=groupId, releaseId=releaseId)
watch = Stopwatch('req')
r = requests.get( bcUrl, headers=headers)
watch.measure(show=False)
logIfBad(r)
exes = json.loads(r.text)
if 0:
print(len(r.text))
if 0:
pprint.pprint(exes)
exes = exes['buildingComponents']
return exes
开发者ID:pylgrym,项目名称:first_app,代码行数:17,代码来源:mol1.py
示例17: __init__
def __init__(self, name=None):
if name is None:
self._name_argument_string = ""
else:
self._name_argument_string = "(%s)" % name
self._fps_history = collections.deque(maxlen=10)
self._previous_time = None
self._previous_calculated_fps_time = None
self._stopwatch = Stopwatch()
self._fps = None
开发者ID:gaborpapp,项目名称:AIam,代码行数:10,代码来源:fps_meter.py
示例18: init
def init(self):
self.initStyles()
self.initMaps()
Terrain.initImages()
Terrain.initSounds()
self.initMenus()
self.stopwatch = Stopwatch()
self.initHelp()
self.mode = "menu"
self.menu = self.mainMenu
开发者ID:daniel-wen,项目名称:bouncier,代码行数:10,代码来源:main.py
示例19: __init__
def __init__(self, robot, raise_speed, stop_time, name=None, timeout=None):
'''
Constructor
'''
super().__init__(name, timeout)
self._robot = robot
self._raise_speed = raise_speed
self._stop_time = stop_time
self.requires(robot.arm)
self._stopwatch = Stopwatch()
开发者ID:TechnoJays,项目名称:robot2016,代码行数:10,代码来源:raise_arm_time.py
示例20: GetToken_Password
def GetToken_Password(): # oauth, exchange a password for a token.
global env, url
print(env['username'], env['password'])
tokenReq = {
'grant_type': 'password',
'username': env['username'], #+'2',
'password': env['password'],
'client_id': env['client_id'],
'client_secret': env['client_secret'] #+'2'
}
token_url = url + '/token'
print('before', token_url)
watch = Stopwatch('req')
r = requests.post(token_url, tokenReq)
watch.measure()
logIfBad(r)
return r
开发者ID:pylgrym,项目名称:first_app,代码行数:19,代码来源:mol1.py
注:本文中的stopwatch.Stopwatch类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论