本文整理汇总了Python中util.Util类的典型用法代码示例。如果您正苦于以下问题:Python Util类的具体用法?Python Util怎么用?Python Util使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Util类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: optimizeSubdirectory
def optimizeSubdirectory(self, path, callback, new_path, extension = "", minimize = False):
"""optimizes a subdirectory within a directory being optimized
Arguments:
path -- path to directory
callback -- function to run the file through
new_path -- path to optimized parent directory
extension -- extension to search for in the directory
minimize -- whether or not we should minimize the file contents (html)
Returns:
void
"""
subdir_path = new_path + "/" + path.split("/").pop()
skip = self.prepareDirectory(subdir_path)
if skip is True:
return
for dir_file in Util.getFilesFromDir(path, extension):
if Util.isDir(dir_file):
self.optimizeSubdirectory(dir_file, callback, subdir_path, extension, minimize)
continue
new_file_path = subdir_path + "/" + Util.getFileName(dir_file)
self.optimizeFile(dir_file, callback, minimize, new_file_path)
开发者ID:Mondego,项目名称:pyreco,代码行数:26,代码来源:allPythonContent.py
示例2: callback
def callback(self, cmd, action, target, msg):
if msg is None:
self._home.publish_msg(cmd, u"时间格式错误")
return False, None
if msg.endswith(u'点') or \
msg.endswith(u'分'):
t = Util.gap_for_timestring(msg)
elif msg.endswith(u"秒"):
t = int(Util.cn2dig(msg[:-1]))
elif msg.endswith(u"分钟"):
t = int(Util.cn2dig(msg[:-2]))*60
elif msg.endswith(u"小时"):
t = int(Util.cn2dig(msg[:-2]))*60*60
else:
self._home.publish_msg(cmd, u"时间格式错误")
return False
if t is None:
self._home.publish_msg(cmd, u"时间格式错误")
return False, None
DEBUG("thread wait for %d sec" % (t, ))
self._home.publish_msg(cmd, action + target + msg)
threading.current_thread().waitUtil(t)
if threading.current_thread().stopped():
return False
self._home.setResume(True)
count = 7
Sound.play( Res.get_res_path("sound/com_bell") , True, count)
self._home.setResume(False)
return True
开发者ID:euwen,项目名称:LEHome,代码行数:31,代码来源:tools.py
示例3: parse_args
def parse_args(args_list):
log.info("Stetl version = %s" % __version__)
argparser = argparse.ArgumentParser(description='Invoke Stetl')
argparser.add_argument('-c ', '--config', type=str, help='ETL config file in .ini format', dest='config_file',
required=False)
argparser.add_argument('-s ', '--section', type=str, help='Section in the config file to execute, default is [etl]',
dest='config_section', required=False)
argparser.add_argument('-a ', '--args', type=str,
help='Arguments or .properties files to be substituted for symbolic {argN}s in Stetl config file,\
as -a "arg1=foo arg2=bar" and/or -a args.properties, multiple -a options are possible',
dest='config_args', required=False, action='append')
argparser.add_argument('-d ', '--doc', type=str,
help='Get component documentation like its configuration parameters, e.g. stetl doc stetl.inputs.fileinput.FileInput',
dest='doc_args', required=False)
args = argparser.parse_args(args_list)
if args.config_args:
args_total = dict()
for arg in args.config_args:
if os.path.isfile(arg):
log.info('Found args file at: %s' % arg)
args_total = Util.merge_two_dicts(args_total, Util.propsfile_to_dict(arg))
else:
# Convert string to dict: http://stackoverflow.com/a/1248990
args_total = Util.merge_two_dicts(args_total, Util.string_to_dict(arg))
args.config_args = args_total
return args
开发者ID:fsteggink,项目名称:stetl,代码行数:34,代码来源:main.py
示例4: processMaps
def processMaps(self):
"""loops through classes and ids to process to determine shorter names to use for them
and creates a dictionary with these mappings
Returns:
void
"""
# reverse sort so we can figure out the biggest savings
classes = self.class_counter.items()
classes.sort(key = itemgetter(1), reverse=True)
for class_name, savings in classes:
small_class = "." + VarFactory.getNext("class")
# adblock extensions may block class "ad" so we should never generate it
# also if the generated class already exists as a class to be processed
# we can't use it or bad things will happen
while small_class == ".ad" or Util.keyInTupleList(small_class, classes):
small_class = "." + VarFactory.getNext("class")
self.class_map[class_name] = small_class
ids = self.id_counter.items()
ids.sort(key = itemgetter(1), reverse=True)
for id, savings in ids:
small_id = "#" + VarFactory.getNext("id")
# same holds true for ids as classes
while small_id == "#ad" or Util.keyInTupleList(small_id, ids):
small_id = "#" + VarFactory.getNext("id")
self.id_map[id] = small_id
开发者ID:Mondego,项目名称:pyreco,代码行数:34,代码来源:allPythonContent.py
示例5: __init__
def __init__(self, name, ip, iface, hostname="", loginfo=""):
"""
Create a new host object, with the given name, IP specification and interface
name -- Nickname for the host
ip -- IP specification for the host or subnet (e.g. "127.0.0.1 10.0.0.0/24")
iface -- Interface nickname this is connected to (only one!)
"""
# verify and store name
if name == "" and not Util.verify_name(name):
raise PyromanException("Host '%s' lacking a valid name at %s" \
% (name, iface, loginfo))
if Firewall.hosts.has_key(name):
raise PyromanException("Duplicate host specification: '%s' at %s" % (name, loginfo))
self.name = name
# verify and store IPs
if ip == "":
raise PyromanException("Host '%s' definition lacking IP address at %s" % (name, loginfo))
self.ip = Util.splitter.split(ip)
for i in self.ip:
if not Util.verify_ipnet(i):
raise PyromanException("IP specification '%s' invalid for host '%s' at %s" \
% (i, name, loginfo))
# verify and store interface
self.iface = iface
if iface == "":
raise PyromanException("Host definition '%s' lacking kernel interfaces at %s" \
% (name, loginfo))
# store "real" hostname (which may be longer than nick)
# this is used for "localhost detection"
self.hostname = hostname
# store loginfo
self.loginfo = loginfo
# register with firewall
Firewall.hosts[name] = self
开发者ID:wil,项目名称:pyroman,代码行数:35,代码来源:host.py
示例6: __del__
def __del__(self):
Util.trace("destroy DirCheck instance")
streamRef = self._streamRef
# Stop / Invalidate / Release
FSEventStreamStop(streamRef)
FSEventStreamInvalidate(streamRef)
FSEventStreamRelease(streamRef)
开发者ID:nyakagawan,项目名称:vfiler,代码行数:7,代码来源:dirCheck.py
示例7: _get_uid
def _get_uid(cls):
user_id = Util.get_user_id()
if not user_id:
print("welcome, creating id for you.")
user_id = requests.get(cls.get_user_id_url).text
Util.set_user_id(user_id)
return user_id
开发者ID:dineshkumar-cse,项目名称:ShareFile,代码行数:7,代码来源:service_api.py
示例8: main
def main():
worm = Worm("http://11.11.0.64:8099/", "http://11.11.0.64:4041/stages/", True, False, "Flint")
# worm = Worm("http://211.69.198.208:8080/", "http://211.69.198.208:8080/history/app-20150722101951-0008/stages/", "Flint")
# finish_sparks = worm.get_finish_sparks()
running_spark = worm.get_finish_spark()
if running_spark != None:
running_stages = running_spark.get_running_stages()
finished_stages = running_spark.get_finished_stages()
stages = []
for finished_stage in finished_stages:
stage_dict = {
"stage_id": finished_stage.get_stage_id(),
"stage_duration": Util.format_time(finished_stage.get_duration()),
"submit_time": finished_stage.get_submit_time(),
"tasks_percent": 100.0,
"gc_time": round(finished_stage.get_gc_time(), 1),
}
stages.append(stage_dict)
for running_stage in running_stages:
stage_dict = {
"stage_id": running_stage.get_stage_id(),
"stage_duration": Util.format_time(running_stage.get_duration()),
"submit_time": running_stage.get_submit_time(),
"tasks_percent": Util.format_tasks_percent(running_stage.get_tasks_percent()),
"gc_time": round(running_stage.get_gc_time(), 1),
}
stages.append(stage_dict)
# print stages
format_spark = {}
format_spark["app_name"] = running_spark.get_app_name()
format_spark["total_time"] = Util.format_time(running_spark.get_total_time())
format_spark["status"] = running_spark.get_status()
format_spark["property"] = running_spark.get_property()
format_spark["stages"] = stages
print format_spark
开发者ID:kzx1025,项目名称:SparkMonitor,代码行数:35,代码来源:worm.py
示例9: on_search_enter_key
def on_search_enter_key(self, entry):
# find /home/shercoder/ \( ! -regex '.*/\..*' \) | grep "soccer"
search_terms = entry.get_text()
is_tag_search = False
if search_terms.startswith('@'):
search_terms = search_terms[1:]
is_tag_search = True
else:
search_terms = search_terms.split(' ')
if entry.get_text():
allfiles = []
if is_tag_search:
results = self._index_manager.search_documents(search_terms)
for hit in results:
allfiles.append((hit['filename'], hit['filepath']))
else:
for root, dirs, files in os.walk(HOME):
files = [f for f in files if not f[0] == '.']
dirs[:] = [d for d in dirs if not d[0] == '.']
for term in search_terms:
for filename in fnmatch.filter(files, "*{}*".format(term)):
allfiles.append((filename, os.path.join(root, filename)))
self._treeview.get_model().generate_search_tree(allfiles)
else:
self._treeview.get_model().generate_tree(HOME)
Util.clear_notebook(self._notebook)
开发者ID:shercoder,项目名称:ingress,代码行数:27,代码来源:main_window.py
示例10: kernel_version
def kernel_version(min=None, max=None):
"""
Return kernel version or test for a minimum and/or maximum version
min -- minimal kernel version required
max -- maximum kernel version required
"""
if not Firewall._kernelversion:
# query iptables version
kvcmd = subprocess.Popen(Firewall.kernelversioncmd,
stdout=subprocess.PIPE)
result = kvcmd.communicate()[0]
Firewall._kernelversion = result.strip()
# still no version number? - raise PyromanException(an exception)
if not Firewall._kernelversion:
raise Error("Couldn't get kernel version!")
if not min and not max:
return Firewall._kernelversion
if min:
if Util.compare_versions(Firewall._kernelversion, min) < 0:
return False
if max:
if Util.compare_versions(Firewall._kernelversion, max) > 0:
return False
return True
开发者ID:wil,项目名称:pyroman,代码行数:25,代码来源:pyroman.py
示例11: _dumpVariable
def _dumpVariable( self ):
Util.trace( "---------------------------" )
Util.trace( "index: " + str( self.index_ ) )
Util.trace( "name: " + str( self.name_ ) )
#Util.trace( "ext: " + str( self.ext_ ) )
Util.trace( "mtime: " + str( self.mtime_ ) )
Util.trace( "bgColor: " + str( self.bgColor_ ) )
开发者ID:nyakagawan,项目名称:vfiler,代码行数:7,代码来源:element.py
示例12: build
def build(self, is_main=False, is_method=False, is_class=False):
'''
Copies the base java file, moves the changes into it and compiles it.
If an Exception occurs, the change is rolled back.
'''
shutil.copyfile(START_FILE, MAIN_FILE)
self.parse_file(MAIN_FILE)
# Attempt to compile the new change
try:
Util.compile_java([MAIN_FILE] + self.full_class_names)
except Exception as e:
if verbose_output == True:
print 'Exception: %s' % e
else:
print 'An Exception occurred compiling the program'
# Determine who caused the exception and remove the code
if is_main is True:
self.main_input.pop()
elif is_method is True:
self.methods.pop()
self.method_names.pop()
elif is_class is True:
self.classes.pop()
self.class_names.pop()
filename = self.full_class_names.pop()
Util.remove_file(filename)
return
if is_main is True:
self.run()
开发者ID:aeggum,项目名称:JavaConsole,代码行数:33,代码来源:console.py
示例13: post
def post(self, answerId):
print 'hiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii'
print answerId
message = self.get_argument("message")
userId = self.get_current_user_id()
answer = Answer.queryByAnswerId(answerId)
# 异常1 回答不存在
if not answer :
self.write(Util.response(None, -1, u'回复不存在'))
return
# 请输入评论内容
if message == "":
self.write(Util.response(None, -1, u'请输入评论内容'))
return
# 不能评论锁定的问题
question = Question.queryById(answer.question_id)
if question.lock == 1 :
self.write(Util.response(None, -1, u'不能评论锁定的问题'))
return
# 你没有发表评论的权限(略)
# 插入一条评论
AnswerComment.addAnswerComment(answerId, userId, message)
rsm = {'item_id':answerId, 'type_name':'answer'}
self.write(Util.response(rsm, 1, None))
开发者ID:hellowego,项目名称:auto,代码行数:28,代码来源:questionHandler.py
示例14: update
def update():
'''
Sets target's siteurl, blogname, and homepage
'''
Util.validate_role()
config = env.config
db_table = 'wp_options'
entries = {
'siteurl': config['site_url'],
'blogname': config['site_name'],
'home': config['site_url']
}
with local_tunnel(config['db_port']):
cnx = mysql.connector.connect(user=config['db_user'],
password=config['db_pass'],
host='127.0.0.1',
port=config['db_port'],
database=config['db_name'])
cnx.start_transaction()
cursor = cnx.cursor()
update_option = ("UPDATE `{db_table}` "
"SET `option_value`=%s "
"WHERE `option_name` LIKE %s".format(db_table=db_table))
for key, value in entries.iteritems():
cursor.execute(update_option, (value, key))
cnx.commit()
cnx.close()
开发者ID:cgspeck,项目名称:fabric_press,代码行数:33,代码来源:db.py
示例15: on_post_save
def on_post_save(self, view):
file = view.file_name()
if not (Util.is_scala(file) or Util.is_java(file)):
return
env = getEnvironment(view.window())
if env and env.is_connected() and env.client.analyzer_ready:
TypeCheckFilesReq([view.file_name()]).run_in(env, async=True)
开发者ID:ensime,项目名称:ensime-sublime,代码行数:7,代码来源:commands.py
示例16: __init__
def __init__(self):
'''
initializes:
1. graph database connection
2. datastore connection
3. graph database indices required
4. url and templates for interaction with the graph database REST API
'''
Util.__init__(self)
if os.environ.get('NEO4J_URL'):
graph_db_url = urlparse(os.environ.get('NEO4J_URL'))
neo4j.authenticate(
"{host}:{port}".format(host = graph_db_url.hostname, port = graph_db_url.port),
graph_db_url.username, graph_db_url.password
)
self.graphdb = neo4j.GraphDatabaseService(
'http://{host}:{port}/db/data'.format(host = graph_db_url.hostname, port = graph_db_url.port)
)
else:
self.graphdb = neo4j.GraphDatabaseService()
self.node_index = self.graphdb.get_or_create_index(neo4j.Node, 'NODE')
self.disambiguation_index = self.graphdb.get_or_create_index(neo4j.Node, self.DISAMBIGUATION)
self._url = lambda present_node: 'http://localhost:7474/db/data/node/{0}'.format(present_node)
self._template = lambda target_node: {
"to" : self._url(target_node),
"relationships": {
"type": "sibling"
},
"cost_property": "weight",
"algorithm" : "dijkstra"
}
self.DataM = RelDataStore()
开发者ID:saurabhkb,项目名称:tailor,代码行数:32,代码来源:learner.py
示例17: run
def run(self):
path = self._path
self._streamRef = FSEventStreamCreate(
kCFAllocatorDefault,
self.eventsCallback,
path,
[path],
kFSEventStreamEventIdSinceNow, # sinceWhen
1.0, # latency
0,
)
if self._streamRef is None:
Util.trace("FSEventStreamCreate is failed")
return
if False:
FSEventStreamShow(self._streamRef)
FSEventStreamScheduleWithRunLoop(self._streamRef, CFRunLoopGetCurrent(), kCFRunLoopDefaultMode)
startedOK = FSEventStreamStart(self._streamRef)
if not startedOK:
Util.trace("failed to start the FSEventStream")
return
# if True:
# timer = CFRunLoopTimerCreate(
# FSEventStreamGetSinceWhen(streamRef),
# CFAbsoluteTimeGetCurrent() + settings.flush_seconds,
# settings.flush_seconds,
# 0, 0, timer_callback, streamRef)
# CFRunLoopAddTimer(CFRunLoopGetCurrent(), timer, kCFRunLoopDefaultMode)
CFRunLoopRun()
开发者ID:nyakagawan,项目名称:vfiler,代码行数:34,代码来源:dirCheck.py
示例18: printScreen
def printScreen(self):
scn = self.__screen
scn.clear()
scn.border(0)
# Title
scn.addstr(0, 35, "PIZZA CLUB", curses.A_STANDOUT)
# construct inner Window
begin_x = 1 ; begin_y = 1
height = 22 ; width = 78
win = curses.newwin(height, width, begin_y, begin_x)
# Menu
menu_text = "press q to exit"
win.addstr(height - 1, width - len(menu_text) - 1, menu_text)
# Brewskeeball
win.addstr(3, 0, Util.centerString("Next Matchup", width))
win.addstr(5, 0, Util.centerString(self.__brewskeeball.matchupTeams(), width))
win.addstr(7, 0, Util.centerString("%s at %s" % (self.__brewskeeball.matchupDate(), self.__brewskeeball.matchupTime()), width))
# Render screen
scn.refresh()
win.refresh()
开发者ID:trg,项目名称:raspberrystatus,代码行数:25,代码来源:pizzaclub.py
示例19: _get_day_link
def _get_day_link(self):
'''
获取当天的分享链接
'''
cur_day = Util.get_time()
if cur_day == None:
self.errors.append('Get sys-time error')
return []
mainpage = self._get_content(self.config.get_vip_url())
if mainpage != None:
#first try to find year-month-day published
time_str = '-'.join(cur_day)
#print time_str
re_str = '<em>%s</em>.*</a></label> <a href="(.*\.html)"' % time_str
#print re_str
links = self._get_links(mainpage, re_str)
if len(links) != 0:
#ok, no need more
pass
else:
#seconf try to find year-mond-(day-1) published for the next day
day_before = Util.get_day_before(cur_day)
chars = self._gen_choices(cur_day)
time_str = '-'.join(day_before)
re_str = '<em>%s</em>.*</a></label> <a href="(.*\.html)" title=".*[%s].*"' % (time_str, '|'.join(chars))
links = self._get_links(mainpage, re_str)
return links
return []
开发者ID:Augus-Wang,项目名称:xunlei_vip,代码行数:28,代码来源:crawl.py
示例20: create_grid
def create_grid(self):
general_header = Util.create_label("<big>GENERAL</big>",
align=Gtk.Align.START)
general_header.set_use_markup(True)
self.attach(general_header, 0, 0, 1, 1)
# create attribute labels
filename_label = Util.create_label("Name:")
filesize_label = Util.create_label("Filesize:")
location_label = Util.create_label("Location:")
last_modified_label = Util.create_label("Last Modified:")
last_access_label = Util.create_label("Last Access:")
# create attribute values
filename = Util.create_info_label(os.path.basename(self._filepath))
filesize = self.file_or_folder_size_widget()
location = Util.create_info_label(os.path.dirname(self._filepath), ellipsize=True)
last_modified = Util.create_info_label(time.ctime(self._filestat.st_mtime))
last_access = Util.create_info_label(time.ctime(self._filestat.st_atime))
# add all widgets to the grid
self.attach(filename_label, 0, 1, 1, 1)
self.attach(filename, 1, 1, 1, 1)
self.attach(filesize_label, 0, 2, 1, 1)
self.attach(filesize, 1, 2, 1, 1)
self.attach(location_label, 0, 3, 1, 1)
self.attach(location, 1, 3, 1, 1)
self.attach(last_modified_label, 0, 4, 1, 1)
self.attach(last_modified, 1, 4, 1, 1)
self.attach(last_access_label, 0, 5, 1, 1)
self.attach(last_access, 1, 5, 1, 1)
开发者ID:shercoder,项目名称:ingress,代码行数:32,代码来源:fileinfo.py
注:本文中的util.Util类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论