本文整理汇总了Python中pyLibrary.strings.expand_template函数的典型用法代码示例。如果您正苦于以下问题:Python expand_template函数的具体用法?Python expand_template怎么用?Python expand_template使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了expand_template函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: assertAlmostEqualValue
def assertAlmostEqualValue(test, expected, digits=None, places=None, msg=None, delta=None):
"""
Snagged from unittest/case.py, then modified (Aug2014)
"""
if expected == None: # None has no expectations
return
if test == expected:
# shortcut
return
if not Math.is_number(expected):
# SOME SPECIAL CASES, EXPECTING EMPTY CONTAINERS IS THE SAME AS EXPECTING NULL
if isinstance(expected, list) and len(expected)==0 and test == None:
return
if isinstance(expected, Mapping) and not expected.keys() and test == None:
return
if test != expected:
raise AssertionError(expand_template("{{test}} != {{expected}}", locals()))
return
num_param = 0
if digits != None:
num_param += 1
if places != None:
num_param += 1
if delta != None:
num_param += 1
if num_param>1:
raise TypeError("specify only one of digits, places or delta")
if digits is not None:
with suppress_exception:
diff = Math.log10(abs(test-expected))
if diff < digits:
return
standardMsg = expand_template("{{test}} != {{expected}} within {{digits}} decimal places", locals())
elif delta is not None:
if abs(test - expected) <= delta:
return
standardMsg = expand_template("{{test}} != {{expected}} within {{delta}} delta", locals())
else:
if places is None:
places = 15
with suppress_exception:
diff = Math.log10(abs(test-expected))
if diff < Math.ceiling(Math.log10(abs(test)))-places:
return
standardMsg = expand_template("{{test|json}} != {{expected|json}} within {{places}} places", locals())
raise AssertionError(coalesce(msg, "") + ": (" + standardMsg + ")")
开发者ID:klahnakoski,项目名称:TestFailures,代码行数:55,代码来源:fuzzytestcase.py
示例2: _aggop
def _aggop(self, query):
"""
SINGLE ROW RETURNED WITH AGGREGATES
"""
if isinstance(query.select, list):
# RETURN SINGLE OBJECT WITH AGGREGATES
for s in query.select:
if s.aggregate not in aggregates:
Log.error("Expecting all columns to have an aggregate: {{select}}", select=s)
selects = DictList()
for s in query.select:
selects.append(aggregates[s.aggregate].replace("{{code}}", s.value) + " AS " + self.db.quote_column(s.name))
sql = expand_template("""
SELECT
{{selects}}
FROM
{{table}}
{{where}}
""", {
"selects": SQL(",\n".join(selects)),
"table": self._subquery(query["from"])[0],
"where": self._where2sql(query.filter)
})
return sql, lambda sql: self.db.column(sql)[0] # RETURNING SINGLE OBJECT WITH AGGREGATE VALUES
else:
# RETURN SINGLE VALUE
s0 = query.select
if s0.aggregate not in aggregates:
Log.error("Expecting all columns to have an aggregate: {{select}}", select=s0)
select = aggregates[s0.aggregate].replace("{{code}}", s0.value) + " AS " + self.db.quote_column(s0.name)
sql = expand_template("""
SELECT
{{selects}}
FROM
{{table}}
{{where}}
""", {
"selects": SQL(select),
"table": self._subquery(query["from"])[0],
"where": self._where2sql(query.where)
})
def post(sql):
result = self.db.column_query(sql)
return result[0][0]
return sql, post # RETURN SINGLE VALUE
开发者ID:klahnakoski,项目名称:MoDataSubmission,代码行数:52,代码来源:jx_usingMySQL.py
示例3: column_query
def column_query(self, sql, param=None):
"""
RETURN RESULTS IN [column][row_num] GRID
"""
self._execute_backlog()
try:
old_cursor = self.cursor
if not old_cursor: # ALLOW NON-TRANSACTIONAL READS
self.cursor = self.db.cursor()
self.cursor.execute("SET TIME_ZONE='+00:00'")
self.cursor.close()
self.cursor = self.db.cursor()
if param:
sql = expand_template(sql, self.quote_param(param))
sql = self.preamble + outdent(sql)
if self.debug:
Log.note("Execute SQL:\n{{sql}}", sql=indent(sql))
self.cursor.execute(sql)
grid = [[utf8_to_unicode(c) for c in row] for row in self.cursor]
# columns = [utf8_to_unicode(d[0]) for d in coalesce(self.cursor.description, [])]
result = zip(*grid)
if not old_cursor: # CLEANUP AFTER NON-TRANSACTIONAL READS
self.cursor.close()
self.cursor = None
return result
except Exception, e:
if isinstance(e, InterfaceError) or e.message.find("InterfaceError") >= 0:
Log.error("Did you close the db connection?", e)
Log.error("Problem executing SQL:\n{{sql|indent}}", sql= sql, cause=e,stack_depth=1)
开发者ID:klahnakoski,项目名称:MoTreeherder,代码行数:33,代码来源:mysql.py
示例4: quote_value
def quote_value(self, value):
"""
convert values to mysql code for the same
mostly delegate directly to the mysql lib, but some exceptions exist
"""
try:
if value == None:
return "NULL"
elif isinstance(value, SQL):
if not value.param:
# value.template CAN BE MORE THAN A TEMPLATE STRING
return self.quote_sql(value.template)
param = {k: self.quote_sql(v) for k, v in value.param.items()}
return expand_template(value.template, param)
elif isinstance(value, basestring):
return self.db.literal(value)
elif isinstance(value, datetime):
return "str_to_date('" + value.strftime("%Y%m%d%H%M%S") + "', '%Y%m%d%H%i%s')"
elif hasattr(value, '__iter__'):
return self.db.literal(json_encode(value))
elif isinstance(value, Mapping):
return self.db.literal(json_encode(value))
elif Math.is_number(value):
return unicode(value)
else:
return self.db.literal(value)
except Exception, e:
Log.error("problem quoting SQL", e)
开发者ID:klahnakoski,项目名称:MoTreeherder,代码行数:28,代码来源:mysql.py
示例5: execute
def execute(
self,
command,
param=None,
retry=True # IF command FAILS, JUST THROW ERROR
):
if param:
command = expand_template(command, self.quote_param(param))
output = None
done = False
while not done:
try:
with self.locker:
if not self.connection:
self._connect()
with Closer(self.connection.cursor()) as curs:
curs.execute(command)
if curs.rowcount >= 0:
output = curs.fetchall()
self.connection.commit()
done = True
except Exception, e:
try:
self.connection.rollback()
# TODO: FIGURE OUT WHY rollback() DOES NOT HELP
self.connection.close()
except Exception, f:
pass
self.connection = None
self._connect()
if not retry:
Log.error("Problem with command:\n{{command|indent}}", command= command, cause=e)
开发者ID:klahnakoski,项目名称:MoDataSubmission,代码行数:34,代码来源:redshift.py
示例6: write
def write(self, template, params):
with self.locker:
if params.params.warning.template or params.params.warning.template:
self.accumulation.append(expand_template(template, params))
if Date.now() > self.last_sent + WAIT_TO_SEND_MORE:
self._send_email()
开发者ID:klahnakoski,项目名称:Activedata-ETL,代码行数:7,代码来源:log_usingEmail.py
示例7: json2value
def json2value(json_string, params={}, flexible=False, leaves=False):
"""
:param json_string: THE JSON
:param params: STANDARD JSON PARAMS
:param flexible: REMOVE COMMENTS
:param leaves: ASSUME JSON KEYS ARE DOT-DELIMITED
:return: Python value
"""
if isinstance(json_string, str):
Log.error("only unicode json accepted")
try:
if flexible:
# REMOVE """COMMENTS""", # COMMENTS, //COMMENTS, AND \n \r
# DERIVED FROM https://github.com/jeads/datasource/blob/master/datasource/bases/BaseHub.py# L58
json_string = re.sub(r"\"\"\".*?\"\"\"", r"\n", json_string, flags=re.MULTILINE)
json_string = "\n".join(remove_line_comment(l) for l in json_string.split("\n"))
# ALLOW DICTIONARY'S NAME:VALUE LIST TO END WITH COMMA
json_string = re.sub(r",\s*\}", r"}", json_string)
# ALLOW LISTS TO END WITH COMMA
json_string = re.sub(r",\s*\]", r"]", json_string)
if params:
# LOOKUP REFERENCES
json_string = expand_template(json_string, params)
try:
value = wrap(json_decoder(unicode(json_string)))
except Exception, e:
Log.error("can not decode\n{{content}}", content=json_string, cause=e)
if leaves:
value = wrap_leaves(value)
return value
开发者ID:klahnakoski,项目名称:TestFailures,代码行数:35,代码来源:convert.py
示例8: forall
def forall(self, sql, param=None, _execute=None):
assert _execute
num = 0
self._execute_backlog()
try:
old_cursor = self.cursor
if not old_cursor: # ALLOW NON-TRANSACTIONAL READS
self.cursor = self.db.cursor()
if param:
sql = expand_template(sql, self.quote_param(param))
sql = self.preamble + outdent(sql)
if self.debug:
Log.note("Execute SQL:\n{{sql}}", sql= indent(sql))
self.cursor.execute(sql)
columns = tuple([utf8_to_unicode(d[0]) for d in self.cursor.description])
for r in self.cursor:
num += 1
_execute(wrap(dict(zip(columns, [utf8_to_unicode(c) for c in r]))))
if not old_cursor: # CLEANUP AFTER NON-TRANSACTIONAL READS
self.cursor.close()
self.cursor = None
except Exception, e:
Log.error("Problem executing SQL:\n{{sql|indent}}", sql= sql, cause=e, stack_depth=1)
开发者ID:klahnakoski,项目名称:MoTreeherder,代码行数:28,代码来源:mysql.py
示例9: json2value
def json2value(json_string, params={}, flexible=False, leaves=False):
"""
:param json_string: THE JSON
:param params: STANDARD JSON PARAMS
:param flexible: REMOVE COMMENTS
:param leaves: ASSUME JSON KEYS ARE DOT-DELIMITED
:return: Python value
"""
if isinstance(json_string, str):
Log.error("only unicode json accepted")
try:
if flexible:
# REMOVE """COMMENTS""", # COMMENTS, //COMMENTS, AND \n \r
# DERIVED FROM https://github.com/jeads/datasource/blob/master/datasource/bases/BaseHub.py# L58
json_string = re.sub(r"\"\"\".*?\"\"\"", r"\n", json_string, flags=re.MULTILINE)
json_string = "\n".join(remove_line_comment(l) for l in json_string.split("\n"))
# ALLOW DICTIONARY'S NAME:VALUE LIST TO END WITH COMMA
json_string = re.sub(r",\s*\}", r"}", json_string)
# ALLOW LISTS TO END WITH COMMA
json_string = re.sub(r",\s*\]", r"]", json_string)
if params:
json_string = expand_template(json_string, params)
# LOOKUP REFERENCES
value = wrap(json_decoder(json_string))
if leaves:
value = wrap_leaves(value)
return value
except Exception, e:
e = Except.wrap(e)
if "Expecting '" in e and "' delimiter: line" in e:
line_index = int(strings.between(e.message, " line ", " column ")) - 1
column = int(strings.between(e.message, " column ", " ")) - 1
line = json_string.split("\n")[line_index].replace("\t", " ")
if column > 20:
sample = "..." + line[column - 20:]
pointer = " " + (" " * 20) + "^"
else:
sample = line
pointer = (" " * column) + "^"
if len(sample) > 43:
sample = sample[:43] + "..."
Log.error("Can not decode JSON at:\n\t" + sample + "\n\t" + pointer + "\n")
base_str = unicode2utf8(strings.limit(json_string, 1000))
hexx_str = bytes2hex(base_str, " ")
try:
char_str = " " + (" ".join(c.decode("latin1") if ord(c) >= 32 else ".") for c in base_str)
except Exception:
char_str = " "
Log.error("Can not decode JSON:\n" + char_str + "\n" + hexx_str + "\n", e)
开发者ID:mozilla,项目名称:ChangeDetector,代码行数:59,代码来源:convert.py
示例10: execute
def execute(self, sql, param=None):
if self.transaction_level == 0:
Log.error("Expecting transaction to be started before issuing queries")
if param:
sql = expand_template(sql, self.quote_param(param))
sql = outdent(sql)
self.backlog.append(sql)
if self.debug or len(self.backlog) >= MAX_BATCH_SIZE:
self._execute_backlog()
开发者ID:klahnakoski,项目名称:MoTreeherder,代码行数:10,代码来源:mysql.py
示例11: compileString2Term
def compileString2Term(edge):
if edge.esscript:
Log.error("edge script not supported yet")
value = edge.value
if isKeyword(value):
value = strings.expand_template("getDocValue({{path}})", {"path": convert.string2quote(value)})
else:
Log.error("not handled")
def fromTerm(value):
return edge.domain.getPartByKey(value)
return Dict(toTerm={"head": "", "body": value}, fromTerm=fromTerm)
开发者ID:klahnakoski,项目名称:MoDevETL,代码行数:14,代码来源:util.py
示例12: assertAlmostEqualValue
def assertAlmostEqualValue(test, expected, digits=None, places=None, msg=None, delta=None):
"""
Snagged from unittest/case.py, then modified (Aug2014)
"""
if test == expected:
# shortcut
return
if not Math.is_number(expected):
# SOME SPECIAL CASES, EXPECTING EMPTY CONTAINERS IS THE SAME AS EXPECTING NULL
if isinstance(expected, list) and len(expected) == 0 and test == None:
return
if isinstance(expected, Mapping) and not expected.keys() and test == None:
return
if test != expected:
raise AssertionError(expand_template("{{test}} != {{expected}}", locals()))
return
num_param = 0
if digits != None:
num_param += 1
if places != None:
num_param += 1
if delta != None:
num_param += 1
if num_param > 1:
raise TypeError("specify only one of digits, places or delta")
if digits is not None:
try:
diff = Math.log10(abs(test - expected))
if diff < digits:
return
except Exception, e:
pass
standardMsg = expand_template("{{test}} != {{expected}} within {{digits}} decimal places", locals())
开发者ID:mozilla,项目名称:ChangeDetector,代码行数:37,代码来源:fuzzytestcase.py
示例13: get_job_classification
def get_job_classification(self, branch, revision):
results = http.get_json(expand_template(RESULT_SET_URL, {"branch": branch, "revision": revision[0:12:]}))
for r in results.results:
jobs = http.get_json(expand_template(JOBS_URL, {"branch": branch, "result_set_id": r.id}))
for j in jobs:
notes = http.get_json(expand_template(NOTES_URL, {"branch": branch, "job_id": j.id}))
for n in notes:
if not n.note:
continue
Log.note(
"{{note|json}}",
note={
"job_id": j.id,
"result_set_id": r.id,
"branch": branch,
"revision": r.revision,
"failure_classification_id": j.failure_classification_id,
"result": j.result,
"note_timestamp": n.timestamp,
"note": n.note
}
)
开发者ID:klahnakoski,项目名称:Activedata-ETL,代码行数:24,代码来源:treeherder.py
示例14: assertAlmostEqualValue
def assertAlmostEqualValue(first, second, digits=None, places=None, msg=None, delta=None):
"""
Snagged from unittest/case.py, then modified (Aug2014)
"""
if first == second:
# shortcut
return
places = places if places is not None else digits
if delta is not None and places is not None:
raise TypeError("specify delta or places not both")
if delta is not None:
if abs(first - second) <= delta:
return
standardMsg = expand_template("{{first}} != {{second}} within {{delta}} delta", {
"first": first,
"second": second,
"delta": delta
})
else:
if places is None:
places = 18
diff = log10(abs(first-second))
if diff < Math.ceiling(log10(abs(first)))-places:
return
standardMsg = expand_template("{{first}} != {{second}} within {{places}} places", {
"first": first,
"second": second,
"": places
})
raise AssertionError(nvl(msg, "") + ": (" + standardMsg + ")")
开发者ID:klahnakoski,项目名称:Datazilla2ElasticSearch,代码行数:36,代码来源:fuzzytestcase.py
示例15: add
def add(self, data):
data = wrap(data)
uid, count = self.uid.advance()
link = expand_template(
LINK_PATTERN,
{
"region": self.bucket.settings.region,
"bucket": self.bucket.settings.bucket,
"uid": uid
}
)
data.etl.id = count
data.etl.source.href = link
data[UID_PATH] = uid
self.temp_queue.add(data)
return link, count
开发者ID:klahnakoski,项目名称:MoDataSubmission,代码行数:16,代码来源:storage.py
示例16: __str__
def __str__(self):
output = self.type + ": " + self.template + "\n"
if self.params:
output = expand_template(output, self.params)
if self.trace:
output += indent(format_trace(self.trace))
if self.cause:
cause_strings = []
for c in listwrap(self.cause):
try:
cause_strings.append(unicode(c))
except Exception, e:
pass
output += "caused by\n\t" + "and caused by\n\t".join(cause_strings)
开发者ID:klahnakoski,项目名称:intermittents,代码行数:17,代码来源:logs.py
示例17: quote_sql
def quote_sql(self, value, param=None):
"""
USED TO EXPAND THE PARAMETERS TO THE SQL() OBJECT
"""
try:
if isinstance(value, SQL):
if not param:
return value
param = {k: self.quote_sql(v) for k, v in param.items()}
return expand_template(value, param)
elif isinstance(value, basestring):
return value
elif isinstance(value, Mapping):
return self.db.literal(json_encode(value))
elif hasattr(value, '__iter__'):
return "(" + ",".join([self.quote_sql(vv) for vv in value]) + ")"
else:
return unicode(value)
except Exception, e:
Log.error("problem quoting SQL", e)
开发者ID:klahnakoski,项目名称:MoTreeherder,代码行数:20,代码来源:mysql.py
示例18: execute_sql
def execute_sql(
host,
username,
password,
sql,
schema=None,
param=None,
settings=None
):
"""EXECUTE MANY LINES OF SQL (FROM SQLDUMP FILE, MAYBE?"""
settings.schema = coalesce(settings.schema, settings.database)
if param:
with MySQL(settings) as temp:
sql = expand_template(sql, temp.quote_param(param))
# MWe have no way to execute an entire SQL file in bulk, so we
# have to shell out to the commandline client.
args = [
"mysql",
"-h{0}".format(settings.host),
"-u{0}".format(settings.username),
"-p{0}".format(settings.password)
]
if settings.schema:
args.append("{0}".format(settings.schema))
try:
proc = subprocess.Popen(
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=-1
)
if isinstance(sql, unicode):
sql = sql.encode("utf8")
(output, _) = proc.communicate(sql)
except Exception, e:
Log.error("Can not call \"mysql\"", e)
开发者ID:klahnakoski,项目名称:MoTreeherder,代码行数:40,代码来源:mysql.py
示例19: time_delta_pusher
def time_delta_pusher(please_stop, appender, queue, interval):
"""
appender - THE FUNCTION THAT ACCEPTS A STRING
queue - FILLED WITH LOG ENTRIES {"template":template, "params":params} TO WRITE
interval - timedelta
USE IN A THREAD TO BATCH LOGS BY TIME INTERVAL
"""
if not isinstance(interval, timedelta):
Log.error("Expecting interval to be a timedelta")
next_run = datetime.utcnow() + interval
while not please_stop:
Thread.sleep(till=next_run)
next_run = datetime.utcnow() + interval
logs = queue.pop_all()
if logs:
lines = []
for log in logs:
try:
if log is Thread.STOP:
please_stop.go()
next_run = datetime.utcnow()
else:
expanded = expand_template(log.get("template"), log.get("params"))
lines.append(expanded)
except Exception, e:
Log.warning("Trouble formatting logs", e)
# SWALLOW ERROR, GOT TO KEEP RUNNING
try:
if DEBUG_LOGGING and please_stop:
sys.stdout.write("Call to appender with " + str(len(lines)) + " lines\n")
appender(u"\n".join(lines) + u"\n")
if DEBUG_LOGGING and please_stop:
sys.stdout.write("Done call to appender with " + str(len(lines)) + " lines\n")
except Exception, e:
sys.stderr.write("Trouble with appender: " + str(e.message) + "\n")
开发者ID:klahnakoski,项目名称:intermittents,代码行数:38,代码来源:log_usingThreadedStream.py
示例20: write
def write(self, template, params):
with self.file_lock:
self.file.append(expand_template(template, params))
开发者ID:klahnakoski,项目名称:MoDataSubmission,代码行数:3,代码来源:text_logs.py
注:本文中的pyLibrary.strings.expand_template函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论