本文整理汇总了Python中top.utils.log.log.debug函数的典型用法代码示例。如果您正苦于以下问题:Python debug函数的具体用法?Python debug怎么用?Python debug使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了debug函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: send_table
def send_table(self,
recipients,
table_data,
identifier=None,
files=None,
template='proc_err',
dry=False):
"""Send an table-structured email message based on the list
*table_data*.
Acts as a wrapper that will create the MIME message including
a list of *files* to attach and send to the list of *recipients*.
E-mail is sent via the SMTP gateway.
E-mail message is based on the *template*..
**Args:**
*recipients*: list of email addresses to send e-mail to
*messages*: list of messages to be sent. List will be
converted into a HTML table
**Kwargs:**
*identifier*: identifying token that will be displayed in the
message. For example, :mod:`top.LoaderDaemon` processing
errors are usually identified by the T1250 EDI file that
caused the error
*files*: list of files to send as an attachment
*dry*: do not send, only report what would happen
**Returns:**
``True`` for successful email send
``False`` otherwise
"""
log.debug('Received table_data list: "%s"' % table_data)
status = False
if len(table_data):
alert_table = self.create_table(table_data)
data = {'file': file,
'identifier': identifier,
'facility': self.facility,
'err_table': alert_table}
mime = self.emailer.create_comms(data=data,
template=template,
files=files,
prod=self.prod)
self.emailer.set_recipients(recipients)
status = self.emailer.send(mime_message=mime, dry=dry)
else:
log.debug('No table data generated -- suppressing comms')
return status
开发者ID:loum,项目名称:top,代码行数:60,代码来源:daemonservice.py
示例2: sanitise_longitude
def sanitise_longitude(self, longitude):
"""Sanitise *longitude* by checking that it is within the
acceptable decimal range set for longitudes of -90 and 90.
**Args:**
*longitude*: decimal longitude value to santise
**Returns:**
sanitised longitude value on success or None otherwise
"""
log.debug("Sanitising agent.longitude '%s' ..." % longitude)
lng = None
if (longitude is not None and len(str(longitude))):
lng = float(longitude)
if lng < -180 or lng > 180:
log.error('Longitude value %s outside range' %
str(longitude))
lng = None
log.debug('Longitude %s sanitised to %s' %
(str(longitude), str(lng)))
return lng
开发者ID:loum,项目名称:top,代码行数:26,代码来源:adp.py
示例3: _cleanse
def _cleanse(self, row):
"""Runs over the "jobitem" record and modifies to suit the
requirements of the report.
**Args:**
row: tuple representing the columns from the "jobitem" table
record.
**Returns:**
tuple representing the cleansed data suitable for
exporter output.
"""
log.debug('cleansing row: "%s"' % str(row))
row_list = list(row)
# "pickup_ts" column should have microseconds removed.
# Handle sqlite and MSSQL dates differently.
pickup_ts = row[2]
if isinstance(row[2], str):
m = re.match('(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\.\d*',
row[2])
try:
pickup_ts = m.group(1)
except AttributeError, err:
log.error('Cannot cleanse pickup_ts "%s": %s' %
(row[2], err))
开发者ID:loum,项目名称:top,代码行数:27,代码来源:exporter.py
示例4: set_comms_delivery_partners
def set_comms_delivery_partners(self, values):
self._comms_delivery_partners.clear()
if values is not None:
self._comms_delivery_partners = values
log.debug('%s comms_delivery_partners set to: "%s"' %
(self.facility, self._comms_delivery_partners))
开发者ID:loum,项目名称:top,代码行数:7,代码来源:loaderdaemon.py
示例5: flag_comms_previous
def flag_comms_previous(self, action, id, service, dry=False):
"""Check if the comms file flag has already been set.
Additionally, if the comms has errored (``*.err`` file exists)
then comms event file should not be created.
**Args:**
*action*: type of communication (either ``sms`` or ``email``)
*id*: the ``job_item.id`` for comms
*service*: the comms service template
**Returns:**
``True`` comms flag file has previously been set
``False`` comms flag has not been previously set
"""
status = False
comms_file = "%s.%d.%s" % (action, id, service)
abs_comms_file = os.path.join(self.comms_dir, comms_file)
log.debug('Checking if comms file "%s" set previously' %
abs_comms_file)
if (os.path.exists(abs_comms_file + '.err') or
os.path.exists(abs_comms_file)):
status = True
log.debug('Comms file "%s" previously set' % abs_comms_file)
return status
开发者ID:loum,项目名称:top,代码行数:32,代码来源:service.py
示例6: process
def process(self, file, column='JOB_KEY', dry=False):
"""Translated *column* values in *file* according to the
:meth:`token_generator` method. Upon success, will created a new
file with the extension ``.xlated`` appended.
**Args:**
*file*: fully qualified name to the report file
*column*: the column header name that typically relates to the
signature file (default ``JOB_KEY``)
**Returns:**
list of new, translated tokens
"""
log.info('Translating file "%s" ...' % file)
# Create a temporary file that will hold our translated content.
temp_fh = tempfile.NamedTemporaryFile()
log.debug('Created temp file "%s"' % temp_fh.name)
# Open the existing report file.
fh = None
try:
fh = open(file)
except IOError, err:
log.error('Could not open file "%s"' % file)
开发者ID:loum,项目名称:top,代码行数:27,代码来源:podtranslator.py
示例7: validate_file
def validate_file(self, filename):
"""Parse the T1250-format filename string and attempt to extract
the Business Unit and file timestamp.
**Kwargs:**
filename: the filename string to parse
**Returns:**
tuple stucture as (<business_unit>, <timestamp>)
"""
log.debug('Validating filename: "%s"' % filename)
m = re.search('T1250_(TOL.*)_(\d{14})\.txt', filename)
bu = None
dt_formatted = None
if m is None:
log.error('Could not parse BU/time from file "%s"' % filename)
else:
bu = m.group(1).lower()
file_timestamp = m.group(2)
parsed_time = time.strptime(file_timestamp, "%Y%m%d%H%M%S")
log.debug('parsed_time: %s' % parsed_time)
dt = datetime.datetime.fromtimestamp(time.mktime(parsed_time))
dt_formatted = dt.isoformat(' ')
log.info('Parsed BU/time "%s/%s" from file "%s"' %
(bu, dt_formatted, filename))
return (bu, dt_formatted)
开发者ID:loum,项目名称:top,代码行数:28,代码来源:loaderdaemon.py
示例8: set_on_delivery
def set_on_delivery(self, db=None, ts_db_kwargs=None, comms_dir=None):
"""Create a OnDelivery object,
**Kwargs:**
*db*: :mod:`top.DbSession` object
*ts_db_kwargs*: dictionary of key/value pairs representing
the TransSend connection
*comms_dir*: directory where to place comms events file
for further processing
"""
log.debug('Setting the OnDelivery object ...')
if db is None:
db = self.db_kwargs
if ts_db_kwargs is None:
ts_db_kwargs = self.ts_db_kwargs
if comms_dir is None:
comms_dir = self.comms_dir
if self._od is None:
self._od = top.OnDelivery(db_kwargs=db,
ts_db_kwargs=ts_db_kwargs,
comms_dir=comms_dir)
if self.config is not None:
self._od.set_delivered_header(self.config.delivered_header)
event_key = self.config.delivered_event_key
self._od.set_delivered_event_key(event_key)
self._od.set_scan_desc_header(self.config.scan_desc_header)
self._od.set_scan_desc_keys(self.config.scan_desc_keys)
开发者ID:loum,项目名称:top,代码行数:34,代码来源:ondeliverydaemon.py
示例9: delivery_partner_lookup
def delivery_partner_lookup(self, bu_id):
"""Lookup method that identifies the business unit name associated
with *bu_id* and returns a tuple of Delivery Partners to filter
uncollected parcels against.
**Args:**
*bu_id*: the Business Unit ID that typically relates to the
business_unit.id table column
**Returns**:
tuple of Delivery Partners that are enabled to have comms event
files triggered
"""
dps = ()
bu = None
for k, v in self.business_units.iteritems():
if v == bu_id:
bu = k
break
if bu is not None:
tmp_dps = self.comms_delivery_partners.get(bu)
if tmp_dps is not None:
dps = tuple(tmp_dps)
else:
log.warn('BU name for ID %d was not identified' % bu_id)
log.debug('BU ID: %d Delivery Partner lookup produced: %s' %
(bu_id, str(dps)))
return dps
开发者ID:loum,项目名称:top,代码行数:33,代码来源:ondeliverydaemon.py
示例10: daemonize
def daemonize(self):
"""Prepare the daemon environment.
Does the UNIX double-fork magic, see Stevens' "Advanced
Programming in the UNIX Environment" for details (ISBN 0201563177).
The second child PID is written out to the PID file. This acts
as a persistent state check, where no file implies that the
daemon process is idle.
The daemon process environment acts as a container for your process
logic. As such, once the daemon process ends it will remove its
associated PID file automatically.
"""
log.debug('attempting first fork')
try:
pid = os.fork()
# Exit first parent.
if pid > 0:
if self.term_parent:
sys.exit(0)
else:
return
except OSError, e:
sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno,
e.strerror))
sys.exit(1)
开发者ID:loum,项目名称:top,代码行数:29,代码来源:daemon.py
示例11: stop
def stop(self):
"""Stop the daemon.
Will run a series of checks around the existence of a PID file
before attempting to terminate the daemon.
**Returns:**
boolean::
``True`` -- success
``False`` -- failure
"""
stop_status = False
if self.pid:
# OK to terminate.
log.debug('stopping daemon process with PID: %s' % self.pid)
try:
os.kill(int(self.pid), signal.SIGTERM)
except OSError, err:
log.warn('err: %s' % str(err))
log.warn('PID "%s" does not exist' % self.pid)
if err.errno == 3:
# For a daemon process, remove the PID file.
if self.pidfile is not None:
log.warn('Removing PID file "%s"' % self.pidfile)
remove_files(self.pidfile)
else:
stop_status = True
self.pid = None
开发者ID:loum,项目名称:top,代码行数:32,代码来源:daemon.py
示例12: bu_ids_with_set_condition
def bu_ids_with_set_condition(self, flag):
"""Given condition *flag*, will traverse the condition map for
all Business Units and return the Business Unit IDs for the set
flags.
**Args:**
*flag*: string-based condition map flag to check. For example,
``pe_comms``
**Returns:**
tuple structure with the Business Unit ID's whose *flag* is
set
"""
log.debug('Getting BU ids for set "%s" condition' % flag)
set_bu_ids = []
for bu_name, id in self.business_units.iteritems():
if self.condition(self.bu_to_file(id), flag):
set_bu_ids.append(int(id))
set_bu_ids = tuple(sorted(set_bu_ids))
log.debug('"%s" flag set BU ids: %s' % (flag, str(set_bu_ids)))
return set_bu_ids
开发者ID:loum,项目名称:top,代码行数:25,代码来源:__init__.py
示例13: comms_map
def comms_map(self, bu):
"""The comms trigger has become somewhat of a convoluted mess
with various use-cases. This method returns the *bu* based comms
flags in a simpler dictionary lookup.
**Args:**
*bu*: the name of the Business Unit. Only the first 4
characters are accepted. For example, ``tolf_nsw`` will be
interpreted as ``tolf``.
**Returns:**
dictionary structure of the comms flags similar to::
{'send_email': False,
'send_sms': True}
"""
log.debug('Generating comms_map for Business Unit "%s"' % bu)
comms_map = {'send_email': False,
'send_sms': False}
for k in comms_map.keys():
comms_map[k] = self.condition(bu, k)
return comms_map
开发者ID:loum,项目名称:top,代码行数:25,代码来源:__init__.py
示例14: get_files
def get_files(self, dir=None, formats=None):
"""Identifies files that are to be processed.
**Args:**
*dir*: override :attr:`in_dirs` for directories to search
*formats*: list of regular expressions to apply across
directory files
**Returns:**
list of files found
"""
if formats is None:
formats = []
files_to_process = []
dirs_to_check = self.in_dirs
if dir is not None:
dirs_to_check = dir
for dir_to_check in dirs_to_check:
log.debug('Looking for files at: %s ...' % dir_to_check)
for file in get_directory_files(dir_to_check):
for format in formats:
if (check_filename(file, format)):
log.info('Found file: %s' % file)
files_to_process.append(file)
files_to_process.sort()
log.debug('Files set to be processed: "%s"' % str(files_to_process))
return files_to_process
开发者ID:loum,项目名称:top,代码行数:34,代码来源:daemonservice.py
示例15: get_return_date
def get_return_date(self, ts):
"""Creates the return date in a nicely formatted output.
Dates could be string based ("2013-09-19 08:52:13.308266") or
a :class:`datetime.datetime` object.
**Args:**
*ts*: the date the parcel was created
**Returns:**
string representation of the "return to sender" date in the
format "<Day full name> <day of month> <month> <year>". For
example::
Sunday 15 September 2013
"""
return_date = None
log.debug('Preparing return date against "%s" ...' % ts)
created_str = None
if ts is not None:
# Handle sqlite and MSSQL dates differently.
if isinstance(ts, str):
r = re.compile('(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\.\d*')
m = r.match(ts)
try:
created_str = m.group(1)
except AttributeError, err:
log.error('Date not found "%s": %s' % (ts, err))
else:
created_str = ts.strftime("%Y-%m-%d %H:%M:%S")
开发者ID:loum,项目名称:top,代码行数:32,代码来源:comms.py
示例16: set_in_files
def set_in_files(self, values):
del self._in_files[:]
self._in_files = []
if values is not None:
self._in_files.extend(values)
log.debug('Set input files to: %s' % self._in_files)
开发者ID:loum,项目名称:top,代码行数:7,代码来源:stopparser.py
示例17: parse_comms_filename
def parse_comms_filename(self, filename):
"""Parse the comm *filename* and extract the job_item.id
and template.
Tokens that are supported must be listed in
:attr:`template_tokens`
**Args:**
*filename*: the filename to parse
**Returns:**
tuple representation of the *filename* in the form::
(<action>, <id>, "<template>")
For example::
("email", 1, "pe")
"""
comm_parse = ()
log.debug('Parsing comms filename: "%s"' % filename)
r = re.compile('(email|sms)\.(\d+)\.(%s)' %
'|'.join(self.template_tokens))
m = r.match(filename)
if m:
try:
comm_parse = (m.group(1), int(m.group(2)), m.group(3))
except IndexError, err:
log.error('Unable to parse filename "%s": %s' %
(filename, err))
开发者ID:loum,项目名称:top,代码行数:32,代码来源:comms.py
示例18: delivery_partner_comms_trigger
def delivery_partner_comms_trigger(self, agent_id, delivery_partners):
"""Helper method to check whether the current Agent's Delivery
Partner and Business Unit are configured to trigger comms.
**Args:**
*agent_id*: Agent ID as per the ``agent.id`` column value
*delivery_partners*: list of strings that represent the
delivery partners for which comms are to be sent
"""
trigger_comms = False
if not len(delivery_partners):
log.debug('delivery_partners list empty')
trigger_comms = True
else:
# Get the Agent's Delivery Partner.
agent_dp = self.get_agent_delivery_partner(agent_id)
# Match against the list of delivery_partners.
if agent_dp in delivery_partners:
trigger_comms = True
log.debug('Agent ID %d delivery Partner trigger?: "%s"' %
(agent_id, trigger_comms))
return trigger_comms
开发者ID:loum,项目名称:top,代码行数:27,代码来源:loader.py
示例19: get_files
def get_files(self):
"""Checks inbound directories (defined by the
:attr:`top.b2cconfig.in_dirs` config option) for valid
T1250 files to be processed. In this context, valid is interpreted
as:
* T1250 files that conform to the T1250 syntax
* have not already been archived
* contain the T1250 EOF flag
**Returns:**
list of fully qualified and sorted (oldest first) T1250 files
to be processed
"""
files_to_process = []
for dir in self.config.in_dirs:
log.info('Looking for files at: %s ...' % dir)
for file in get_directory_files(dir):
if (check_filename(file, self.file_format) and
check_eof_flag(file)):
log.info('Found file: "%s" ' % file)
archive_path = self.get_customer_archive(file)
if (archive_path is not None and
os.path.exists(archive_path)):
log.error('File %s is archived' % file)
else:
files_to_process.append(file)
files_to_process.sort()
log.debug('Files set to be processed: "%s"' % str(files_to_process))
return files_to_process
开发者ID:loum,项目名称:top,代码行数:35,代码来源:loaderdaemon.py
示例20: get_agent_id
def get_agent_id(self, agent):
"""Helper method to verify if an Agent ID is defined.
**Args:**
*agent*: the raw agent string (for example, ``N001``)
**Returns:**
on success, integer value taken from ``agent.id`` representing
the *agent* details. ``None`` otherwise.
"""
log.info('Checking if Agent Id "%s" exists in system ...' % agent)
self.db(self.db._agent.check_agent_id(agent_code=agent))
agent_id_row_id = self.db.row
if agent_id_row_id is not None:
agent_id_row_id = agent_id_row_id[0]
log.info('Agent Id "%s" found: %s' % (agent, agent_id_row_id))
else:
self.set_alerts('Agent Id "%s" does not exist' % agent)
# For testing only, create the record.
# First insert will fail the test -- subsequent checks should
# be OK. This will ensure we get a mix during testing.
if self.db.host is None:
log.debug('TEST: Creating Agent Id "%s" record ...' % agent)
agent_fields = {'code': agent}
sql = self.db._agent.insert_sql(agent_fields)
# Just consume the new row id.
id = self.db.insert(sql)
return agent_id_row_id
开发者ID:loum,项目名称:top,代码行数:32,代码来源:loader.py
注:本文中的top.utils.log.log.debug函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论