本文整理汇总了Python中util.debug函数的典型用法代码示例。如果您正苦于以下问题:Python debug函数的具体用法?Python debug怎么用?Python debug使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了debug函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: makeContext
def makeContext ( self, ctx ):
"""generate a dictionary containing the bindings for all objects from the
NamingContext"""
dict = {}
how_many = 10
try:
namecontext = ctx._narrow( CosNaming.NamingContext )
(binding_list, binding_iterator) = namecontext.list( how_many )
# process returned bindinglist
for binding in binding_list:
name = binding.binding_name
dict[name[0].id] = namecontext.resolve( name )
# process iterator
if binding_iterator:
(cond, binding) = binding_iterator.next_one()
while cond:
name = binding.binding_name
dict[name[0].id] = namecontext.resolve( name )
(cond, binding) = binding_iterator.next_one()
except :
util.debug( "makeContext" )
return dict
开发者ID:BackupTheBerlios,项目名称:qedo,代码行数:25,代码来源:Script.py
示例2: inventory_generator
def inventory_generator(self, sections=['pack', 'equiped', 'spells']):
"""
>>> c = Character({})
>>> mhand = Item(load_yaml('items', 'mainhand_dagger.yaml'))
>>> mhand = c.acquire_item(mhand)
>>> len(list(c.inventory_generator(['pack'])))
1
>>> c.equip_item(mhand)
(True, '[ ] has equiped Dagger')
>>> len(list(c.inventory_generator(['pack'])))
0
>>> len(list(c.inventory_generator(['equiped'])))
1
>>> for item in c.inventory_generator(['equiped']): item[1].displayname()
'Dagger'
"""
for section in sections:
if section in ['pack', 'spells']:
items = self.get('inventory/%s' % section, [])
idx = -1
for item in items:
idx += 1
yield (section, Item(item), idx)
else:
item = self.getsubtree('inventory/%s' % section)
for slot in FlattenedDict(item).subkeys():
debug('Getting inventory from ', slot)
itemdata = self.getsubtree('inventory/equiped/%s' %slot)
if itemdata:
yield(section, Item(itemdata),slot)
开发者ID:ajventer,项目名称:mirthless,代码行数:30,代码来源:character.py
示例3: install
def install(host, src, dstdir):
if isLocal(host):
if not exists(host, src):
util.output("file does not exist: %s" % src)
return False
dst = os.path.join(dstdir, os.path.basename(src))
if exists(host, dst):
# Do not clobber existing files/dirs (this is not an error)
return True
util.debug(1, "cp %s %s" % (src, dstdir))
try:
if os.path.isfile(src):
shutil.copy2(src, dstdir)
elif os.path.isdir(src):
shutil.copytree(src, dst)
except OSError:
# Python 2.6 has a bug where this may fail on NFS. So we just
# ignore errors.
pass
else:
util.error("install() not yet supported for remote hosts")
return True
开发者ID:cubic1271,项目名称:broctl,代码行数:27,代码来源:execute.py
示例4: handle_viola_packet
def handle_viola_packet(packet, parsed, server):
"""
Handle a generic viola packet. Parse its opcode and call the correct
function for this specific type of packet.
"""
util.debug("Parsing viola packet.")
# XXX terrible functionify
opcode = packet[:2]
packet_payload = packet[2:]
if opcode == "00":
msg = introduction.handle_introduction_packet(packet_payload, parsed)
elif opcode == "01":
msg = handle_room_join_packet(packet_payload, parsed, server)
elif opcode == "02":
msg = handle_key_transport_packet(packet_payload, parsed, server)
elif opcode == "03":
msg = handle_room_message_packet(packet_payload, parsed, server)
else:
util.debug("Received viola packet with opcode: %s" % opcode)
raise NotImplementedError("wtf")
return msg
开发者ID:adrianhust,项目名称:viola,代码行数:25,代码来源:viola.py
示例5: user_left_channel
def user_left_channel(irc_msg, server):
"""A user left a channel we are in. Remove them from the channel if we are captain."""
account = accounts.get_my_account()
parsed = util.parse_irc_quit_kick_part(irc_msg, server)
nick = parsed['from_nick']
channel = parsed['channel']
command = parsed['command']
assert(command.upper() == "PART")
util.debug("Received %s from %s in channel %s." % (command, nick, channel))
try:
viola_room = account.get_viola_room(channel, server)
except accounts.NoSuchRoom:
util.debug("No viola room at %s. Sending plaintext." % channel)
return
try:
viola_room.remove_member_and_rekey(nick)
except room.NoSuchMember:
util.control_msg("A non-existent nick left the room. WTF.") # XXX i think this also catches ourselves
return
开发者ID:adrianhust,项目名称:viola,代码行数:25,代码来源:viola.py
示例6: main_sql_to_excel
def main_sql_to_excel():
"""Read from database then write in excel"""
# To do
# read database
# database variable
db_host = 'localhost'
db_user = 'root'
db_password = ''
db_name = 'rimus'
conn = MySQLdb.connect(db_host, db_user, db_password, db_name)
cursor = conn.cursor()
query = "SELECT * FROM `tweets`"
try:
cursor.execute(query)
result = cursor.fetchall()
# return result
except Exception, e:
util.debug('db_control.read error' + str(e))
conn.rollback()
result = None
开发者ID:ismailsunni,项目名称:f3-factor-finder,代码行数:25,代码来源:run_it.py
示例7: submit_fragment
def submit_fragment(self, fragment, sender, server, target):
"""
Submit fragment for assembling. If this fragment completes the packet, then
return the packet payload.
"""
peer_sid = "%s:%s:%s" % (sender, server, target)
n_current_fragment, n_total_fragments = self.get_fragment_details(fragment)
if n_current_fragment == n_total_fragments == 1:
# Message was only one fragment long. Return it immediately.
return get_fragment_payload(fragment)
# Check if this is the beginning of a new fragmented message.
if peer_sid not in self.active_incomplete_msgs:
if n_current_fragment != 1:
util.debug("Received fragment %d from unknown peer. Ignoring." % n_current_fragment)
return None
incomplete_msg = IncompleteMessage(peer_sid, fragment, n_total_fragments)
self.register_new_incomplete_msg(peer_sid, incomplete_msg)
return None
# This is an old fragmented message. Submit newly found fragments.
incomplete_msg = self.active_incomplete_msgs[peer_sid]
complete_msg = incomplete_msg.submit_new_message_fragment(fragment, n_current_fragment, n_total_fragments)
# Check if we just completed this message and if so, return the full
# thing to our caller. Also clean up.
if complete_msg:
self.active_incomplete_msgs.pop(peer_sid) # cleanup
util.debug("Completed fragmented message (%d)! Returning!" % n_total_fragments)
return complete_msg
else:
return None
开发者ID:Mandragorian,项目名称:flute,代码行数:35,代码来源:transport.py
示例8: printBatteryProbabilities
def printBatteryProbabilities(self):
debug("***BATTERY PROBABILITIES***")
for y in range(len(self.robotSquares)):
p = []
for x in range(len(self.robotSquares[0])):
p.append(self.robotSquares[x][y].probBat)
debug(p)
开发者ID:JoshAudibert,项目名称:IntroToAI,代码行数:7,代码来源:robot.py
示例9: download
def download(self, urls=None):
total_downloaded = 0
if urls is None:
connections = [self.connect(self.host) for i in range(self.runs)]
else:
connections = [self.connect(h['host']) for h in urls]
total_start_time = time()
for current_file in self.DOWNLOAD_FILES:
threads = []
for run in range(self.runs):
thread = Thread(
target=self.downloadthread,
args=(connections[run],
'%s?x=%d' % (current_file, int(time() * 1000))
if urls is None else urls[run]['url']))
thread.run_number = run + 1
thread.start()
threads.append(thread)
for thread in threads:
try:
thread.join()
total_downloaded += thread.downloaded
util.debug('[SC] Run %d for %s finished' %
(thread.run_number, current_file))
except:
pass
total_ms = (time() - total_start_time) * 1000
for connection in connections:
connection.close()
util.info('[SC] Took %d ms to download %d bytes' % (total_ms,
total_downloaded))
return total_downloaded * 8000 / total_ms
开发者ID:milokmet,项目名称:plugin.video.stream-cinema,代码行数:32,代码来源:speedtest.py
示例10: move
def move(self, world_square):
if world_square.loc != self.loc:
# if not initialization, calculate battery usage
path_len = self.findPath(world_square)
self.changeBattery(-path_len)
self.loc = world_square.loc
# search location
# update RobotSquare
if world_square.bomb or self.battery == 0:
self.explode()
return
# found a battery! Update the current bombStates
if world_square.battery:
# update robot map and checkedSquares
print "SHOULDN'T GET HERE"
self.robotMap.removeBat(self.loc)
self.robotMap.filterBatteryStates(self.loc)
self.robotMap.getSquare(self.loc).setChecked()
# add WorldSquare to checkedSquares
self.robotMap.checkedSquares.append(world_square)
# remove current location from fringe
self.robotMap.fringe.remove(self.robotMap.getSquare(self.loc))
# add neighbors of current location to fringe
self.robotMap.updateProbabilities(world_square)
neighbors = self.robotMap.getNeighbors(self.loc)
for neighbor in neighbors:
if not neighbor.checked and not neighbor.flagged and neighbor not in self.robotMap.fringe:
self.robotMap.fringe.append(neighbor)
debug("Fringe:")
for square in self.robotMap.fringe:
debug(square)
开发者ID:JoshAudibert,项目名称:IntroToAI,代码行数:35,代码来源:robot.py
示例11: printBombProbabilities
def printBombProbabilities(self):
debug("***BOMB PROBABILITIES***")
for y in range(len(self.robotSquares)):
p = []
for x in range(len(self.robotSquares[0])):
p.append(self.robotSquares[x][y].probBomb)
debug(p)
开发者ID:JoshAudibert,项目名称:IntroToAI,代码行数:7,代码来源:robot.py
示例12: __init__
def __init__( self, deck ):
self.deck = deck
self.deckName = str( deck.name() )
self.deckPath = str( deck.path )
self.dbsPath = deckDbPath + os.sep + self.deckName
self.cfgPath = self.dbsPath + os.sep + 'config'
self.allPath = self.dbsPath + os.sep + 'all.db'
self.cfg = {
# user configurable
'mature threshold':21,
'learnt threshold':3,
'known threshold':3,
'interval dbs to make':range(1,21)+[30,100,365],
'morph fields':['Expression'],
'i+N known field':'iPlusN',
'i+N mature field':'iPlusNmature',
'unknowns field':'unknowns',
'unmatures field':'unmatures',
'vocab rank field':'vocabRank',
'morph man index field':'morphManIndex',
'copy i+1 known to':'vocabExpression',
'copy i+0 mature to':'sentenceExpression',
'whitelist':u'',
'blacklist':u'記号,UNKNOWN',
'enabled':'no',
# internal
'last deck update':0, # TimeStamp
'last deck update took':0, # Seconds
'last all.db update took':0, # Seconds
'last db update':{}, # Map DbPath TimeStamp
}
self.loadCfg()
debug( 'Loaded DeckMgr for %s' % self.deckName )
开发者ID:fanatic84,项目名称:JapaneseStudy,代码行数:35,代码来源:auto.py
示例13: activate
def activate(self):
debug("in launchconfig.py activate")
conn = util.as_conn()
name = self.name()
if self.exists():
pprint("in launchconfig.py self.exists()")
# NOTE: I don't think program logic ever gets here
if not util.confirm("LaunchConfig {} already exists, overwrite?".format(name)):
pprint("in launchconfig.py activate: Confirmed overwriting LaunchConfig")
return True
# delete existing
pprint("in launchconfig.py activate: deleting LaunchConfig")
conn.delete_launch_configuration(LaunchConfigurationName=name)
# get configuration for this LC
cfg = self.role_config
# NOTE: wrap the following in a try block to catch errors
lc = conn.create_launch_configuration(
AssociatePublicIpAddress = True, # this is required to make your stuff actually work
LaunchConfigurationName = name,
IamInstanceProfile = cfg.get('iam_profile'),
ImageId = cfg.get('ami'),
InstanceType = cfg.get('instance_type'),
KeyName = cfg.get('keypair_name'),
UserData = self.cloud_init_script(),
SecurityGroups = cfg.get('security_groups')
)
#if not conn.create_launch_configuration(lc):
# print "Error creating LaunchConfig {}".format(name)
# return False
util.message_integrations("Activated LaunchConfig {}".format(name))
return lc
开发者ID:joegross,项目名称:udo,代码行数:34,代码来源:launchconfig.py
示例14: cloud_init_script
def cloud_init_script(self):
debug("in launchconfig.py cloud_init")
# load cloud-init script template
cloud_init_template = LCTemplate(cloud_init_script)
cloud_init_config = _cfg.get_root()
# add extra template vars
cloud_init_config['base_packages'] = " ".join(_cfg.get('packages')) or ''
cloud_init_config['yum_plugin_url'] = _cfg.get('repo', 'plugin_url') or ''
# from role config
cloud_init_config['role_packages'] = " ".join(self.role_config.get('packages')) or ''
cloud_init_config['repo_url'] = self.role_config.get('repo_url') or ''
# append extra commands from config
cloud_init_config['cloud_init_pre'] = _cfg.get('cloud_init') or _cfg.get('cloud_init_pre') or ''
cloud_init_config['cloud_init_post'] = _cfg.get('cloud_init_post') or ''
# cluster/role configurable extra cloud-init stuff
cloud_init_config['cloud_init_extra'] = self.role_config.get('cloud_init_extra') or ''
cloud_init_config['cluster_name'] = self.cluster_name or ''
cloud_init_config['role_name'] = self.role_name or ''
cloud_init = cloud_init_template.substitute(**cloud_init_config)
return cloud_init
开发者ID:joegross,项目名称:udo,代码行数:25,代码来源:launchconfig.py
示例15: handle_introduction_packet
def handle_introduction_packet(packet_payload, parsed):
util.debug("Parsing introduction packet")
payload = base64.b64decode(packet_payload)
if len(payload) < INTRODUCTION_PAYLOAD_LEN:
raise IncompletePacket
# Get packet fields
signature = payload[:64]
rcvd_pubkey = crypto.parse_signing_pubkey(payload[64:64+32])
# Verify signature
try:
rcvd_pubkey.verify(payload)
except nacl.exceptions.BadSignatureError:
util.control_msg("Could not verify signature of INTRODUCTION packet.")
return ""
hexed_key = crypto.get_hexed_key(rcvd_pubkey)
irc_nick = parsed['from_nick']
util.control_msg("You received an introduction from %s with identity key:" % irc_nick)
util.control_msg("\t%s" % otrlib.colorize(hexed_key, "green"))
util.control_msg("If you trust that key, please type:")
util.control_msg("\t /viola trust-key <name> %s" % hexed_key)
util.control_msg("where <name> is the nickname you want to assign to the key.")
util.control_msg("Example: /viola trust-key alice %s" % hexed_key)
util.control_msg("-" * 100)
return ""
开发者ID:piratas-ar,项目名称:viola,代码行数:30,代码来源:introduction.py
示例16: install
def install(host, src, dst):
if isLocal(host):
if not exists(host, src):
util.output("file does not exist: %s" % src)
return False
if os.path.isfile(dst):
try:
os.remove(dst)
except OSError, e:
print 'install: os.remove(%s): %s' % (dst, e.strerror)
sys.exit(1)
util.debug(1, "cp %s %s" % (src, dst))
try:
if os.path.isfile(src):
shutil.copy2(src, dst)
elif os.path.isdir(src):
shutil.copytree(src, dst)
except OSError:
# Python 2.6 has a bug where this may fail on NFS. So we just
# ignore errors.
pass
return True
开发者ID:decanio,项目名称:broctl,代码行数:26,代码来源:execute.py
示例17: list
def list(self, url):
print("Examining url", url)
if self.is_most_popular(url):
if "movie" in url:
return self.list_movies_by_letter(url)
if "tv" in url:
return self.list_tv_shows_by_letter(url)
if self.is_recently_added(url):
util.debug("is recently added")
if "movie" in url:
return self.list_movie_recently_added(url)
if "tv" in url:
util.debug("is TV")
return self.list_tv_recently_added(url)
if self.is_search(url):
return self.list_search(url)
if self.is_base_url(url):
self.base_url = url
if "movie" in url:
return self.a_to_z(MOVIES_A_TO_Z_TYPE)
if "tv" in url:
return self.a_to_z(TV_SHOWS_A_TO_Z_TYPE)
if self.particular_letter(url):
if "movie" in url:
return self.list_movies_by_letter(url)
if "tv" in url:
return self.list_tv_shows_by_letter(url)
if self.has_tv_show_flag(url):
return self.list_tv_show(self.remove_flags(url))
return [self.dir_item(title="I failed", url="fail")]
开发者ID:slavkodemeter,项目名称:archivczsk-doplnky,代码行数:33,代码来源:sosac.py
示例18: _say_new
def _say_new(self, what):
debug("say: new")
item_name = self.cache_new(what)
what = urllib.quote(urllib.unquote(what).encode('utf8'), '')
url = "http://translate.google.com/translate_tts?tl=cs&q=%s" % what
c = pycurl.Curl()
c.setopt(pycurl.URL, url)
#c.setopt(pycurl.RETEURNTRANSFER, 1)
c.setopt(pycurl.HEADER, 0)
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.ENCODING, "")
c.setopt(pycurl.USERAGENT, "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/535.2 (KHTML, like Gecko) Chrome/15.0.872.0 Safari/535.2")
c.setopt(pycurl.CONNECTTIMEOUT, 120)
c.setopt(pycurl.TIMEOUT, 120)
c.setopt(pycurl.MAXREDIRS, 10)
b = StringIO.StringIO()
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.perform()
content = b.getvalue()
mp3name = self._mk_mp3name(item_name)
with open(mp3name, "w") as f_out:
f_out.write(content)
return item_name
开发者ID:ticcky,项目名称:hearyou.webreader,代码行数:32,代码来源:webreader.py
示例19: _run_warning_cleanup
def _run_warning_cleanup(self, zpool):
util.debug("Performing warning level cleanup on %s" % \
zpool.name, \
self.verbose)
self._run_cleanup(zpool, "daily", self._warningLevel)
if zpool.get_capacity() > self._warningLevel:
self._run_cleanup(zpool, "hourly", self._warningLevel)
开发者ID:Lalufu,项目名称:time-slider,代码行数:7,代码来源:timesliderd.py
示例20: submit_new_message_fragment
def submit_new_message_fragment(self, new_fragment, n_current_fragment, n_total_fragments):
"""
Submit new message fragment:
If the message is finally complete, return it.
If message is still pending, return None.
Can also raise FragmentationError if corrupted format.
"""
if n_total_fragments != self.n_total_fragments:
raise FragmentationError("New fragment piece has different parameters!")
if n_current_fragment != (self.n_current_fragment + 1):
raise FragmentationError("Received fragment piece in wrong order (%d/%d)!",
n_current_fragment, (self.n_current_fragment + 1))
util.debug("Submitting new fragment (%d/%d) for %s" % (n_current_fragment, n_total_fragments, self.peer_sid))
# Increase fragment counter
self.n_current_fragment += 1
# Submit new payload
fragment_payload = get_fragment_payload(new_fragment)
self.collected_payloads.append(fragment_payload)
# If message is complete, return it. Otherwise return None.
if self.n_current_fragment == self.n_total_fragments:
print "LOL: %s" % str(self.collected_payloads)
return "".join(self.collected_payloads)
else:
return None
开发者ID:Mandragorian,项目名称:flute,代码行数:28,代码来源:transport.py
注:本文中的util.debug函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论