本文整理汇总了Python中utils.Utils类的典型用法代码示例。如果您正苦于以下问题:Python Utils类的具体用法?Python Utils怎么用?Python Utils使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Utils类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_sync_files
def test_sync_files(self):
sourceFiles = ['/movie/transformers.mp4']
target = '/users/willsam100/Desktop/'
ftpMock = FTP('')
ftpMock.retrbinary = MagicMock()
now = util.now()
targetFile = util.join(target, util.basename(sourceFiles[0]))
self.__cleanUp__(targetFile)
utilMock = Utils();
utilMock.splitDirectoriesAndFiles = MagicMock(return_value=([], sourceFiles))
utilMock.exists = MagicMock(return_value=False)
utilMock.computeSpeed = MagicMock(return_value=40)
utilMock.now = MagicMock(return_value=now)
transfer = Transfer(ftpMock, utilMock, target)
transfer.sync(sourceFiles)
utilMock.splitDirectoriesAndFiles.assert_called_with(ftpMock, sourceFiles)
utilMock.exists.assert_called_with(targetFile)
ftpMock.retrbinary.assert_called_with('RETR ' + sourceFiles[0], mock.ANY)
self.assertTrue(util.exists(targetFile))
self.__cleanUp__(targetFile)
utilMock.computeSpeed.assert_called_with(now, targetFile)
开发者ID:willsam100,项目名称:FSync,代码行数:25,代码来源:fsyncTest.py
示例2: create_new_user
def create_new_user(self, rawname, role, email=""):
""" Creates a new Person record with a unique username, or returns a person wiht the matching username"""
ut = Utils()
username = ut.cleanUserName(rawname)
if ut.validateUserName(username):
# check for username
person = self.get_person(username)
# create a new person if we have not got this one
if person == None:
person = Person()
person.email = email
person.passhash = ""
# no passhash if not got a direct logon
person.username = username
person.displayname = ut.tidyUserName(
rawname
) # allow the username to be updated on the myHoots or profile or summink
person.before_put()
person.role = role
person.put()
self._person = person
return username
else:
return ""
开发者ID:Carhoots,项目名称:blog,代码行数:30,代码来源:user.py
示例3: send_file_multicast
def send_file_multicast(s, filename):
connections = {}
filesize = os.stat(filename).st_size
try:
while True:
readable, _, _ = select.select([s], [], [])
for rd in readable:
bytes_sent = 0
package, client_address = s.recvfrom(Constants.FILE_CHUNK_SIZE)
unpacked_package = Utils.unpack_package(package)
if not connections.has_key(client_address) or connections[client_address] is None:
connections[client_address] = open(filename, 'rb')
if unpacked_package['command'] == Constants.INIT_TRANSMIT:
bytes_sent = int(unpacked_package['payload'])
connections[client_address].seek(bytes_sent)
data = connections[client_address].read(Constants.FILE_CHUNK_SIZE)
if not data:
rd.sendto(Utils.pack_package(Constants.FIN, ''), client_address)
connections[client_address].close()
connections[client_address] = None
else:
rd.sendto(Utils.pack_package(Constants.ACK, data), client_address)
bytes_sent += len(data)
percent = int(float(bytes_sent) * 100 / float(filesize))
print "{0} / {1} Kb sent to client {2}({3}%)".format(Utils.to_kilobytes(bytes_sent),
Utils.to_kilobytes(filesize), client_address,
percent)
sys.stdout.write('\033M')
except socket.error, value:
print value
开发者ID:VadzimBura,项目名称:SPOLKS,代码行数:35,代码来源:file_transmitter.py
示例4: processTR35
def processTR35(self):
utils = Utils()
for i in range(0, 3):
year = str(2013 + i)
r = requests.get('http://www.technologyreview.com/lists/innovators-under-35/' + year)
soup = BeautifulSoup(r.text)
ul = soup.find('ul', class_='people')
soup = BeautifulSoup(ul.prettify())
file_name = self.get_file_name(self.subject + "/mit-tr35/tr35-" + year + "#", '')
file_name = file_name[0 : file_name.find('#')]
file_lines = self.countFileLineNum(file_name)
f = self.open_db(file_name + ".tmp")
self.count = 0
for li in soup.find_all('li'):
data = utils.removeDoubleSpace(li.text.strip().replace('\t', '').replace('\n', ''))
title = data[0 : data.find(',')].strip()
desc = 'description:' + data[data.find(',') + 1 :].strip()
print title
print desc
self.count += 1
self.write_db(f, 'tr35-' + year + '-' + str(self.count), title, 'http://www.technologyreview.com/' + li.a['href'], desc)
self.close_db(f)
if file_lines != self.count and self.count > 0:
self.do_upgrade_db(file_name)
print "before lines: " + str(file_lines) + " after update: " + str(self.count) + " \n\n"
else:
self.cancel_upgrade(file_name)
print "no need upgrade\n"
开发者ID:amitahire,项目名称:numberWave,代码行数:30,代码来源:update_rank.py
示例5: requestBuilder
def requestBuilder(data):
prop=Properties.getProperties()
if isinstance(data,dict):
try:
req=APIRequest()
if IS_FULL_URL(data['requestPath'].strip()):
req.url=data['requestPath'].strip()
else:
req.url='{}{}'.format(_url(prop.get('tomcat.host'),prop.get('tomcat.port')),data['requestPath'].strip())
if str(data.get('requestMethod')).upper() in METHODS:
req.method=data['requestMethod']
else:
req.method=DEFUALT_METHOD
if data.get('requestParameters'):
if data.get('requestParameters')!='':req.params=Utils.to_json(data['requestParameters'].strip())
if data.get('requestBody'):
if data.get('requestBody')!='':req.data=Utils.to_json(data['requestBody'].strip())
if data.get('requestJson'):
if data.get('requestJson')!='':req.json=Utils.to_json(data['requestJson'].strip())
if data.get('requestHeader'):
if data.get('requestHeader')!='':req.headers=Utils.to_json(data['requestHeader'].strip())
if data.get('keywords'):
if 'needUserToken:Y'.upper() in data.get('keywords').upper():
if 'mogoroom-partner' in data['requestPath'] and not prop.get('status.partner'):
if req.data:
for key,value in prop.get('partner.token').items():
req.data[key]=value
else:
req.data=prop.get('partner.token')
return req.prepare()
except:
raise ValueError("Worng args. for build HTTP request.")
开发者ID:leons1220,项目名称:APITestFramework,代码行数:32,代码来源:RequestBuilder.py
示例6: updateAllTimeTop10Collection
def updateAllTimeTop10Collection(top10):
if top10 != None:
if len(top10) > 0:
result = ttDB['alltime'].replace_one(
{'replace_key': 'AllTime'},
{
'replace_key': 'AllTime',
'top10' : [
top10[0].getObject(),
top10[1].getObject(),
top10[2].getObject(),
top10[3].getObject(),
top10[4].getObject(),
top10[5].getObject(),
top10[6].getObject(),
top10[7].getObject(),
top10[8].getObject(),
top10[9].getObject()
]
},
True,
False
)
if result.acknowledged == False:
Utils.emitWarning([str(datetime.utcnow()),"Failed to save alltime top10. Acknowledgment was False."])
else:
Utils.emitWarning([str(datetime.utcnow()),"Params were not as expected when trying to save alltime top10."])
开发者ID:JoshuaPortelance,项目名称:Twending,代码行数:27,代码来源:TrendingTop10DB.py
示例7: updateMonthlyTop10Collection
def updateMonthlyTop10Collection(top10, month, year):
if top10 != None and month != None and year != None:
if len(top10) > 0:
result = ttDB['month'].replace_one(
{'month': month, 'year': year},
{
'month': month,
'year': year,
'top10' : [
top10[0].getObject(),
top10[1].getObject(),
top10[2].getObject(),
top10[3].getObject(),
top10[4].getObject(),
top10[5].getObject(),
top10[6].getObject(),
top10[7].getObject(),
top10[8].getObject(),
top10[9].getObject()
]
},
True,
False
)
if result.acknowledged == False:
Utils.emitWarning([str(datetime.utcnow()),"Failed to save monthly top10. Acknowledgment was False."])
else:
Utils.emitWarning([str(datetime.utcnow()),"Params were not as expected when trying to save monthly top10."])
开发者ID:JoshuaPortelance,项目名称:Twending,代码行数:28,代码来源:TrendingTop10DB.py
示例8: test
def test(X, y, learned_params):
N = np.shape(X)[0] #no of instances
X = np.append(np.ones((N,1)), X,1) #appending a column of ones as bias (used in logistic regression weights prediction)
F = np.shape(X)[1] #no of features+1
p_old = 1
class_prob = []
for w in learned_params.keys():
p = Utils.logistic_transformation( learned_params[w], X )
class_prob.append(p_old-p)
p_old = p
class_prob.append(p_old)
max_prob = np.max(class_prob, 0)
predicted_y = []
output_label = range(min_class_label, max_class_label+1)
for i in xrange(np.size(max_prob)):
class_label = np.where(class_prob == max_prob[i])[0]
#print class_label
predicted_y.append(output_label[class_label[0]])
#print "predicted y :", predicted_y
#print "Actual y:", y
accuracy = Utils.calculate_accuracy(np.array(y), np.array(predicted_y))
f_score_mean, f_score_std = Utils.calculate_average_F1score(np.array(y), np.array(predicted_y), min_class_label, max_class_label)
return (accuracy, f_score_mean, f_score_std)
开发者ID:meyyar,项目名称:WoC,代码行数:30,代码来源:em_good_bad_experts_test.py
示例9: play
def play(self):
if len(self.hand) > 1:
pick = int(Utils.read('Card to Play? '))
played = self.hand[pick]
self.discard(played)
Utils.write('')
played.effect(self, self._other_players())
开发者ID:jawsthegame,项目名称:loveletter,代码行数:7,代码来源:player.py
示例10: create_server
def create_server(cls, nova_client, name, network_id, data):
image = "5cebb13a-f783-4f8c-8058-c4182c724ccd" # Ubuntu 12.04
flavor = 6 # 8GB
server = None
while server is None:
try:
server = nova_client.servers.create(
name = name,
image = image,
flavor = flavor,
nics = [
{"net-id": default_nics[0]},
{"net-id": default_nics[1]},
{"net-id": network_id},
],
files = {
"/root/.ssh/authorized_keys": \
Utils.read_data("/root/.ssh/id_rsa.pub"),
"/etc/prep.sh": \
Utils.read_data("vm_scripts/prep.sh"),
"/root/upgrade.sh": \
Utils.read_data("vm_scripts/upgrade.sh"),
"/root/install_oc.sh": data,
}
)
msg = "Scheduled server creation: %s | %s" % \
(server.id, server.name)
logger.info(msg)
except Exception,e:
logger.error(str(e))
logger.error("Retrying in 10 secs...")
sleep(10)
开发者ID:metral,项目名称:build_a_cloud,代码行数:34,代码来源:rackspace.py
示例11: draw
def draw(self):
if self.buttons[0].selected: #PvP
self.buttons[0].selected = False
self.window.player1.isComputer = False
self.window.player1.name = "Player 1"
self.window.player2.isComputer = False
self.window.player2.name = "Player 2"
self.window.setupScreenP1.resetBottomText()
self.window.setupScreenP2.resetBottomText()
self.window.currentScreen = self.window.setupScreenP1
elif self.buttons[1].selected: #PvPC
self.window.player1.isComputer = False
self.window.player1.name = "Player"
self.window.player2.isComputer = True
self.window.player2.name = "Computer"
self.window.setupScreenP1.resetBottomText()
self.window.currentScreen = self.window.setupScreenP1
self.buttons[1].selected = False
elif self.buttons[2].selected: #PCvPC
self.window.player1.isComputer = True
self.window.player1.name = "Computer 1"
self.window.player2.isComputer = True
self.window.player2.name = "Computer 2"
self.window.currentScreen = self.window.setupScreenP1
self.buttons[2].selected = False
else:
for button in self.buttons:
Utils.drawButton(button)
pyglet.text.Label('Stratego-Ascension',
font_name='Arial',
font_size=30,
x=self.width/2, y= self.height-200,
anchor_x='center', anchor_y='center').draw()
开发者ID:BungeeBuddies,项目名称:Stratego-Ascension,代码行数:33,代码来源:startscreen.py
示例12: convert_to_file_structure
def convert_to_file_structure(self, object_type=None, cur_path='./', console_print=False):
convert_dict = self._get_objects_from_context(object_type)
Utils.dict_to_filesystem({'visited_vertexes':convert_dict['visited_vertexes']},
cur_path=cur_path,
console=console_print, depth=3)
Utils.dict_to_filesystem({'summary_of_visited_vertexes': convert_dict['summary_of_visited_vertexes']},
cur_path=cur_path, console=console_print, depth=1)
开发者ID:miriyalar,项目名称:debug_infra,代码行数:7,代码来源:vertex_print.py
示例13: convert_json
def convert_json(self, object_type=None, detail=True, file_name='contrail_debug_output.json'):
print_list = self._get_objects_from_context(object_type)
print_list = Utils.remove_none(print_list)
print_list = Utils.remove_none(print_list)
with open(file_name, 'w') as fp:
json.dump(print_list, fp)
fp.close()
开发者ID:miriyalar,项目名称:debug_infra,代码行数:7,代码来源:vertex_print.py
示例14: test_sync_directories
def test_sync_directories(self):
sourceFiles = ['/movie/transformers', '/movie/transformers/transformers.mp4']
target = '/users/willsam100/Desktop/'
ftpMock = FTP('')
ftpMock.retrbinary = MagicMock()
ftpMock.nlst = MagicMock(return_value=[])
now = util.now()
targetDir = util.join(target, util.basename(sourceFiles[0]))
targetFile = util.join(targetDir, util.basename(sourceFiles[1]))
self.__cleanUp__(targetDir)
utilMock = Utils();
def splitDirectoriesAndFiles(*args):
def secondCall_splitDirectoriesAndFiles(*args):
return ([], sourceFiles[1:])
utilMock.splitDirectoriesAndFiles.side_effect = secondCall_splitDirectoriesAndFiles
return ([sourceFiles[0]], [])
utilMock.splitDirectoriesAndFiles = MagicMock(side_effect=splitDirectoriesAndFiles)
utilMock.exists = MagicMock(return_value=False)
utilMock.computeSpeed = MagicMock(return_value=40)
utilMock.now = MagicMock(return_value=now)
transfer = Transfer(ftpMock, utilMock, target)
transfer.sync(sourceFiles)
utilMock.splitDirectoriesAndFiles.call_args_list == (mock.call(ftpMock, targetDir), mock.call(ftpMock, targetFile))
utilMock.splitDirectoriesAndFiles.assert_called_with(ftpMock, [])
utilMock.exists.call_args_list == [mock.call(targetDir), mock.call(targetFile)]
ftpMock.retrbinary.assert_called_with('RETR ' + sourceFiles[1], mock.ANY)
self.assertTrue(util.exists(targetFile))
self.__cleanUp__(targetDir)
utilMock.computeSpeed.assert_called_with(now, targetFile)
开发者ID:willsam100,项目名称:FSync,代码行数:35,代码来源:fsyncTest.py
示例15: snapshot
class snapshot(unittest.TestCase):
def setUp(self):
self.configfile = sys.argv[1]
self.utils = Utils()
self.all_config = self.utils.init_allconfig(self.configfile)
self.utils.deploy_usx(self.all_config)
self.amc_ip = self.all_config['amc_ip']
self.tests = Ha(self.configfile)
self.tools = Tools(self.amc_ip)
def tearDown(self):
clean_testbed_op = ["clean_testbed:"]
self.tests._exec(clean_testbed_op)
print("done!!!!!!!!!!!!")
def snapshot(self):
volume_type_list = self.tests._get_volume_type_list_from_config()
for volume_type in volume_type_list:
self.assertEqual(self.test_snapshot(volume_type), True)
# self.assertEqual(self.test_snapshot('simplememory'), True)
def test_snapshot(self, volume):
daily_schedule_snapshot_op = ["create_daily_schedule_snapshot:'vols'[" + volume + "][0]:"]
self.tests._exec(daily_schedule_snapshot_op)
check_snapshot_by_lvs_op = ["check_snapshot_by_lvs:'vols'[" + volume + "][0]:"]
self.tests._exec(check_snapshot_by_lvs_op)
check_snapshot_number_op = ["check_snapshot_number:'vols'[" + volume + "][0]:1"]
return self.tests._exec(check_snapshot_number_op)
开发者ID:yuzhenjiea,项目名称:usx,代码行数:33,代码来源:test_26385-26389.py
示例16: clicked
def clicked(self, event):
"""Moving the user-defined FoV footprint."""
run_sequence = ShowSkyCoverage()
if event.widget == self.B02:
run_sequence.north() # north
if event.widget == self.B12:
move_fov = ShiftFoV()
move_fov.north_shift() # ↕↔
if event.widget == self.B30:
run_sequence.east() # east
if event.widget == self.B31:
move_fov = ShiftFoV()
move_fov.east_shift() # ↕↔
if event.widget == self.B32:
new_starting_fov = StartingFoV() # start FoV
new_starting_fov
if event.widget == self.B33:
move_fov = ShiftFoV()
move_fov.west_shift() # ↕↔
if event.widget == self.B34:
run_sequence.west() # west
if event.widget == self.B42:
move_fov = ShiftFoV()
move_fov.south_shift() # ↕↔
if event.widget == self.B52:
run_sequence.south() # south
if event.widget == self.B60: # ↞
adj = Adjustments()
adj.adj_east()
if event.widget == self.B61: # ↠
adj = Adjustments()
adj.adj_west()
if event.widget == self.B63: # ↟
adj = Adjustments()
adj.adj_north()
if event.widget == self.B64: # ↡
adj = Adjustments()
adj.adj_south()
if event.widget == self.B62: # ✓ Accept
adj = Adjustments()
adj.adj_accept()
if event.widget == self.B72: # ▶ Folder
Utils.move_to_folder(planes=['Q:*','P:*'],
folders=['Queries','FoV'])
开发者ID:ggreco77,项目名称:Multi-Order-Coverage-of-probability-skymaps,代码行数:60,代码来源:coverage31M.py
示例17: updateDailyTop10Collection
def updateDailyTop10Collection(top10, date):
if date['dateStr'] != None and date['day'] != None and date['month'] != None and date['year'] != None and top10 != None:
if len(top10) > 0:
result = ttDB['day'].replace_one(
date,
{
'dateStr': str(date['dateStr']),
'day': str(date['day']),
'month': str(date['month']),
'year': str(date['year']),
'top10' : [
top10[0].getObject(),
top10[1].getObject(),
top10[2].getObject(),
top10[3].getObject(),
top10[4].getObject(),
top10[5].getObject(),
top10[6].getObject(),
top10[7].getObject(),
top10[8].getObject(),
top10[9].getObject()
]
},
True,
False
)
if result.acknowledged == False:
Utils.emitWarning([str(datetime.utcnow()),"Failed to save daily top10. Acknowledgment was False."])
else:
Utils.emitWarning([str(datetime.utcnow()),"Params were not as expected when trying to save daily top10."])
开发者ID:JoshuaPortelance,项目名称:Twending,代码行数:30,代码来源:TrendingTop10DB.py
示例18: adj_south
def adj_south(self):
"""Adjustments FoV position -> south direction"""
entries_GWsky = self.load_entries("GWsky_entries")
fov_center_ra, fov_center_dec = entries_GWsky[0::2], entries_GWsky[1::2]
for ra_start, dec_start in zip (fov_center_ra, fov_center_dec):
ra_start, dec_start = float(ra_start), float(dec_start)
aladin.select("P:"+str(ra_start) + ',' + str(dec_start))
dist = self.intercardinal_distance(ra_start, dec_start,
self.shift_down, shift_right_left=0)
south_adj = [(dist),
(dec_start + 0 - self.shift_down)]
ra, dec = south_adj[0], south_adj[1]
aladin.set_target(ra, dec)
aladin.set_plane_id("P:"+str(ra) + ',' + str(dec))
new_sky_pos = [ra,dec] # cycle variables
self.entries_GWsky_new.extend(new_sky_pos)
#aladin.remove("Q:"+str(ra_start)+"/"+str(dec_start))
Utils.delete_pointing(infile="GWsky_pointings.txt",
ra=str(ra_start), dec=str(dec_start))
with open('GWsky_entries', 'wb') as data:
pickle.dump(self.entries_GWsky_new, data)
开发者ID:ggreco77,项目名称:Multi-Order-Coverage-of-probability-skymaps,代码行数:30,代码来源:coverage31M.py
示例19: updateWeeklyTop10Collection
def updateWeeklyTop10Collection(top10, startDate, endDate):
if top10 != None and startDate != None and endDate != None:
if len(top10) > 0:
result = ttDB['week'].replace_one(
{'startDateStr': startDate['dateStr'], 'endDateStr': endDate['dateStr']},
{
'startDateStr': str(startDate['dateStr']),
'startDay': str(startDate['day']),
'startMonth': str(startDate['month']),
'startYear': str(startDate['year']),
'endDateStr': str(endDate['dateStr']),
'endDay': str(endDate['day']),
'endMonth': str(endDate['month']),
'endYear': str(endDate['year']),
'top10' : [
top10[0].getObject(),
top10[1].getObject(),
top10[2].getObject(),
top10[3].getObject(),
top10[4].getObject(),
top10[5].getObject(),
top10[6].getObject(),
top10[7].getObject(),
top10[8].getObject(),
top10[9].getObject()
]
},
True,
False
)
if result.acknowledged == False:
Utils.emitWarning([str(datetime.utcnow()),"Failed to save weekly top10. Acknowledgment was False."])
else:
Utils.emitWarning([str(datetime.utcnow()),"Params were not as expected when trying to save weekly top10."])
开发者ID:JoshuaPortelance,项目名称:Twending,代码行数:34,代码来源:TrendingTop10DB.py
示例20: adj_west
def adj_west(self):
"""Adjustments FoV position -> weast direction"""
entries_GWsky = self.load_entries("GWsky_entries")
fov_center_ra, fov_center_dec = entries_GWsky[0::2], entries_GWsky[1::2]
for ra_start, dec_start in zip (fov_center_ra, fov_center_dec):
ra_start, dec_start = float(ra_start), float(dec_start)
aladin.select("P:"+str(ra_start) + ',' + str(dec_start))
ra_distance = self.ra0ra1((0 - self.SHIFT_CORRECTION + self.shift_right),
float(dec_start), float(dec_start))
aladin.select("P:"+str(ra_start) + ',' + str(dec_start))
west_adj = [(float(ra_start) - ra_distance), (float(dec_start) + 0)]
ra, dec = west_adj[0], west_adj[1]
aladin.set_target(ra, dec)
aladin.set_plane_id("P:"+str(ra) + ',' + str(dec))
new_sky_pos = [ra,dec] # cycle variables
self.entries_GWsky_new.extend(new_sky_pos)
Utils.delete_pointing(infile="GWsky_pointings.txt",
ra=str(ra_start), dec=str(dec_start))
with open('GWsky_entries', 'wb') as data:
pickle.dump(self.entries_GWsky_new, data)
开发者ID:ggreco77,项目名称:Multi-Order-Coverage-of-probability-skymaps,代码行数:30,代码来源:coverage31M.py
注:本文中的utils.Utils类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论