本文整理汇总了Python中mo_logs.strings.expand_template函数的典型用法代码示例。如果您正苦于以下问题:Python expand_template函数的具体用法?Python expand_template怎么用?Python expand_template使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了expand_template函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: assertAlmostEqualValue
def assertAlmostEqualValue(test, expected, digits=None, places=None, msg=None, delta=None):
"""
Snagged from unittest/case.py, then modified (Aug2014)
"""
if expected is NULL:
if test == None: # pandas dataframes reject any comparision with an exception!
return
else:
raise AssertionError(expand_template("{{test}} != {{expected}}", locals()))
if expected == None: # None has no expectations
return
if test == expected:
# shortcut
return
if not is_number(expected):
# SOME SPECIAL CASES, EXPECTING EMPTY CONTAINERS IS THE SAME AS EXPECTING NULL
if is_list(expected) and len(expected) == 0 and test == None:
return
if is_data(expected) and not expected.keys() and test == None:
return
if test != expected:
raise AssertionError(expand_template("{{test}} != {{expected}}", locals()))
return
num_param = 0
if digits != None:
num_param += 1
if places != None:
num_param += 1
if delta != None:
num_param += 1
if num_param>1:
raise TypeError("specify only one of digits, places or delta")
if digits is not None:
with suppress_exception:
diff = log10(abs(test-expected))
if diff < digits:
return
standardMsg = expand_template("{{test}} != {{expected}} within {{digits}} decimal places", locals())
elif delta is not None:
if abs(test - expected) <= delta:
return
standardMsg = expand_template("{{test}} != {{expected}} within {{delta}} delta", locals())
else:
if places is None:
places = 15
with suppress_exception:
diff = mo_math.log10(abs(test-expected))
if diff < mo_math.ceiling(mo_math.log10(abs(test)))-places:
return
standardMsg = expand_template("{{test|json}} != {{expected|json}} within {{places}} places", locals())
raise AssertionError(coalesce(msg, "") + ": (" + standardMsg + ")")
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:60,代码来源:fuzzytestcase.py
示例2: _setup_grcov
def _setup_grcov(self):
sudo("apt-get install -y gcc")
response = http.get_json("https://api.github.com/repos/marco-c/grcov/releases/latest")
with cd("~/ActiveData-ETL"):
for asset in response.assets:
if self.settings.grcov.platform in asset.browser_download_url:
run("wget "+asset.browser_download_url)
run(expand_template("tar xf grcov-{{platform}}.tar.bz2", self.settings.grcov))
run(expand_template("rm grcov-{{platform}}.tar.bz2", self.settings.grcov))
开发者ID:klahnakoski,项目名称:SpotManager,代码行数:10,代码来源:etl.py
示例3: _aggop
def _aggop(self, query):
"""
SINGLE ROW RETURNED WITH AGGREGATES
"""
if isinstance(query.select, list):
# RETURN SINGLE OBJECT WITH AGGREGATES
for s in query.select:
if s.aggregate not in aggregates:
Log.error("Expecting all columns to have an aggregate: {{select}}", select=s)
selects = FlatList()
for s in query.select:
selects.append(sql_alias(aggregates[s.aggregate].replace("{{code}}", s.value),quote_column(s.name)))
sql = expand_template("""
SELECT
{{selects}}
FROM
{{table}}
{{where}}
""", {
"selects": SQL(",\n".join(selects)),
"table": self._subquery(query["from"])[0],
"where": self._where2sql(query.filter)
})
return sql, lambda sql: self.db.column(sql)[0] # RETURNING SINGLE OBJECT WITH AGGREGATE VALUES
else:
# RETURN SINGLE VALUE
s0 = query.select
if s0.aggregate not in aggregates:
Log.error("Expecting all columns to have an aggregate: {{select}}", select=s0)
select = sql_alias(aggregates[s0.aggregate].replace("{{code}}", s0.value) , quote_column(s0.name))
sql = expand_template("""
SELECT
{{selects}}
FROM
{{table}}
{{where}}
""", {
"selects": SQL(select),
"table": self._subquery(query["from"])[0],
"where": self._where2sql(query.where)
})
def post(sql):
result = self.db.column_query(sql)
return result[0][0]
return sql, post # RETURN SINGLE VALUE
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:52,代码来源:__init__.py
示例4: forall
def forall(self, sql, param=None, _execute=None):
assert _execute
num = 0
self._execute_backlog()
try:
old_cursor = self.cursor
if not old_cursor: # ALLOW NON-TRANSACTIONAL READS
self.cursor = self.db.cursor()
if param:
sql = expand_template(sql, quote_param(param))
sql = self.preamble + outdent(sql)
self.debug and Log.note("Execute SQL:\n{{sql}}", sql=indent(sql))
self.cursor.execute(sql)
columns = tuple([utf8_to_unicode(d[0]) for d in self.cursor.description])
for r in self.cursor:
num += 1
_execute(wrap(dict(zip(columns, [utf8_to_unicode(c) for c in r]))))
if not old_cursor: # CLEANUP AFTER NON-TRANSACTIONAL READS
self.cursor.close()
self.cursor = None
except Exception as e:
Log.error("Problem executing SQL:\n{{sql|indent}}", sql=sql, cause=e, stack_depth=1)
return num
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:29,代码来源:mysql.py
示例5: execute
def execute(
self,
command,
param=None,
retry=True # IF command FAILS, JUST THROW ERROR
):
if param:
command = expand_template(command, self.quote_param(param))
output = None
done = False
while not done:
try:
with self.locker:
if not self.connection:
self._connect()
with Closer(self.connection.cursor()) as curs:
curs.execute(command)
if curs.rowcount >= 0:
output = curs.fetchall()
self.connection.commit()
done = True
except Exception as e:
with suppress_exception:
self.connection.rollback()
# TODO: FIGURE OUT WHY rollback() DOES NOT HELP
self.connection.close()
self.connection = None
self._connect()
if not retry:
Log.error("Problem with command:\n{{command|indent}}", command= command, cause=e)
return output
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:33,代码来源:redshift.py
示例6: column_query
def column_query(self, sql, param=None):
"""
RETURN RESULTS IN [column][row_num] GRID
"""
self._execute_backlog()
try:
old_cursor = self.cursor
if not old_cursor: # ALLOW NON-TRANSACTIONAL READS
self.cursor = self.db.cursor()
self.cursor.execute("SET TIME_ZONE='+00:00'")
self.cursor.close()
self.cursor = self.db.cursor()
if param:
sql = expand_template(sql, quote_param(param))
sql = self.preamble + outdent(sql)
self.debug and Log.note("Execute SQL:\n{{sql}}", sql=indent(sql))
self.cursor.execute(sql)
grid = [[utf8_to_unicode(c) for c in row] for row in self.cursor]
# columns = [utf8_to_unicode(d[0]) for d in coalesce(self.cursor.description, [])]
result = transpose(*grid)
if not old_cursor: # CLEANUP AFTER NON-TRANSACTIONAL READS
self.cursor.close()
self.cursor = None
return result
except Exception as e:
if isinstance(e, InterfaceError) or e.message.find("InterfaceError") >= 0:
Log.error("Did you close the db connection?", e)
Log.error("Problem executing SQL:\n{{sql|indent}}", sql=sql, cause=e, stack_depth=1)
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:32,代码来源:mysql.py
示例7: write
def write(self, template, params):
try:
with self.file_lock:
self.file.append(expand_template(template, params))
except Exception as e:
Log.warning("Problem writing to file {{file}}, waiting...", file=file.name, cause=e)
time.sleep(5)
开发者ID:klahnakoski,项目名称:SpotManager,代码行数:7,代码来源:log_usingFile.py
示例8: _send_email
def _send_email(self):
try:
if not self.accumulation:
return
with Emailer(self.settings) as emailer:
# WHO ARE WE SENDING TO
emails = Data()
for template, params in self.accumulation:
content = expand_template(template, params)
emails[literal_field(self.settings.to_address)] += [content]
for c in self.cc:
if any(d in params.params.error for d in c.contains):
emails[literal_field(c.to_address)] += [content]
# SEND TO EACH
for to_address, content in emails.items():
emailer.send_email(
from_address=self.settings.from_address,
to_address=listwrap(to_address),
subject=self.settings.subject,
text_data="\n\n".join(content)
)
self.accumulation = []
except Exception as e:
Log.warning("Could not send", e)
finally:
self.next_send = Date.now() + self.settings.average_interval * (2 * Random.float())
开发者ID:rv404674,项目名称:TUID,代码行数:28,代码来源:log_usingEmail.py
示例9: query
def query(self, sql, param=None, stream=False, row_tuples=False):
"""
RETURN LIST OF dicts
"""
if not self.cursor: # ALLOW NON-TRANSACTIONAL READS
Log.error("must perform all queries inside a transaction")
self._execute_backlog()
try:
if param:
sql = expand_template(sql, quote_param(param))
sql = self.preamble + outdent(sql)
self.debug and Log.note("Execute SQL:\n{{sql}}", sql=indent(sql))
self.cursor.execute(sql)
if row_tuples:
if stream:
result = self.cursor
else:
result = wrap(list(self.cursor))
else:
columns = [utf8_to_unicode(d[0]) for d in coalesce(self.cursor.description, [])]
if stream:
result = (wrap({c: utf8_to_unicode(v) for c, v in zip(columns, row)}) for row in self.cursor)
else:
result = wrap([{c: utf8_to_unicode(v) for c, v in zip(columns, row)} for row in self.cursor])
return result
except Exception as e:
e = Except.wrap(e)
if "InterfaceError" in e:
Log.error("Did you close the db connection?", e)
Log.error("Problem executing SQL:\n{{sql|indent}}", sql=sql, cause=e, stack_depth=1)
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:33,代码来源:mysql.py
示例10: quote_value
def quote_value(self, value):
"""
convert values to mysql code for the same
mostly delegate directly to the mysql lib, but some exceptions exist
"""
try:
if value == None:
return SQL("NULL")
elif isinstance(value, SQL):
if not value.param:
# value.template CAN BE MORE THAN A TEMPLATE STRING
return self.quote_sql(value.template)
param = {k: self.quote_sql(v) for k, v in value.param.items()}
return SQL(expand_template(value.template, param))
elif isinstance(value, basestring):
return SQL(self.db.literal(value))
elif isinstance(value, Mapping):
return SQL(self.db.literal(json_encode(value)))
elif Math.is_number(value):
return SQL(unicode(value))
elif isinstance(value, datetime):
return SQL("str_to_date('" + value.strftime("%Y%m%d%H%M%S.%f") + "', '%Y%m%d%H%i%s.%f')")
elif isinstance(value, Date):
return SQL("str_to_date('"+value.format("%Y%m%d%H%M%S.%f")+"', '%Y%m%d%H%i%s.%f')")
elif hasattr(value, '__iter__'):
return SQL(self.db.literal(json_encode(value)))
else:
return self.db.literal(value)
except Exception as e:
Log.error("problem quoting SQL", e)
开发者ID:klahnakoski,项目名称:SpotManager,代码行数:30,代码来源:mysql.py
示例11: _send_email
def _send_email(self):
try:
if not self.accumulation:
return
with Closer(connect_to_region(
self.settings.region,
aws_access_key_id=unwrap(self.settings.aws_access_key_id),
aws_secret_access_key=unwrap(self.settings.aws_secret_access_key)
)) as conn:
# WHO ARE WE SENDING TO
emails = Data()
for template, params in self.accumulation:
content = expand_template(template, params)
emails[literal_field(self.settings.to_address)] += [content]
for c in self.cc:
if any(c in params.params.error for c in c.contains):
emails[literal_field(c.to_address)] += [content]
# SEND TO EACH
for to_address, content in emails.items():
conn.send_email(
source=self.settings.from_address,
to_addresses=listwrap(to_address),
subject=self.settings.subject,
body="\n\n".join(content),
format="text"
)
self.next_send = Date.now() + self.settings.max_interval
self.accumulation = []
except Exception as e:
self.next_send = Date.now() + self.settings.max_interval
Log.warning("Could not send", e)
开发者ID:klahnakoski,项目名称:SpotManager,代码行数:34,代码来源:log_usingSES.py
示例12: write
def write(self, template, params):
value = expand_template(template, params)
self.locker.acquire()
try:
self.writer(value + CR)
finally:
self.locker.release()
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:7,代码来源:log_usingStream.py
示例13: to_es_script
def to_es_script(self, schema, not_null=False, boolean=False, many=True):
term = FirstOp(self.term).partial_eval()
value = term.to_es_script(schema)
if is_op(value.frum, CoalesceOp_):
return CoalesceOp(
[StringOp(t).partial_eval() for t in value.frum.terms]
).to_es_script(schema)
if value.miss is TRUE or value.type is IS_NULL:
return empty_string_script
elif value.type == BOOLEAN:
return EsScript(
miss=self.term.missing().partial_eval(),
type=STRING,
expr=value.expr + ' ? "T" : "F"',
frum=self,
schema=schema,
)
elif value.type == INTEGER:
return EsScript(
miss=self.term.missing().partial_eval(),
type=STRING,
expr="String.valueOf(" + value.expr + ")",
frum=self,
schema=schema,
)
elif value.type == NUMBER:
return EsScript(
miss=self.term.missing().partial_eval(),
type=STRING,
expr=expand_template(NUMBER_TO_STRING, {"expr": value.expr}),
frum=self,
schema=schema,
)
elif value.type == STRING:
return value
else:
return EsScript(
miss=self.term.missing().partial_eval(),
type=STRING,
expr=expand_template(NUMBER_TO_STRING, {"expr": value.expr}),
frum=self,
schema=schema,
)
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:45,代码来源:painless.py
示例14: execute
def execute(self, sql, param=None):
if self.transaction_level == 0:
Log.error("Expecting transaction to be started before issuing queries")
if param:
sql = expand_template(sql, quote_param(param))
sql = outdent(sql)
self.backlog.append(sql)
if self.debug or len(self.backlog) >= MAX_BATCH_SIZE:
self._execute_backlog()
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:10,代码来源:mysql.py
示例15: append_query
def append_query(self, es_query, start):
self.start = start
es_field = self.query.frum.schema.leaves(self.var)[0].es_column
es_query = wrap({"aggs": {
"_match": set_default({"terms": {
"script": expand_template(LIST_TO_PIPE, {"expr": 'doc[' + quote(es_field) + '].values'})
}}, es_query)
}})
return es_query
开发者ID:rv404674,项目名称:TUID,代码行数:11,代码来源:decoders.py
示例16: table2csv
def table2csv(table_data):
"""
:param table_data: expecting a list of tuples
:return: text in nice formatted csv
"""
text_data = [tuple(value2json(vals, pretty=True) for vals in rows) for rows in table_data]
col_widths = [max(len(text) for text in cols) for cols in zip(*text_data)]
template = ", ".join(
"{{" + unicode(i) + "|left_align(" + unicode(w) + ")}}"
for i, w in enumerate(col_widths)
)
text = "\n".join(expand_template(template, d) for d in text_data)
return text
开发者ID:klahnakoski,项目名称:SpotManager,代码行数:14,代码来源:convert.py
示例17: execute_sql
def execute_sql(
host,
username,
password,
sql,
schema=None,
param=None,
kwargs=None
):
"""EXECUTE MANY LINES OF SQL (FROM SQLDUMP FILE, MAYBE?"""
kwargs.schema = coalesce(kwargs.schema, kwargs.database)
if param:
with MySQL(kwargs) as temp:
sql = expand_template(sql, quote_param(param))
# We have no way to execute an entire SQL file in bulk, so we
# have to shell out to the commandline client.
args = [
"mysql",
"-h{0}".format(host),
"-u{0}".format(username),
"-p{0}".format(password)
]
if schema:
args.append("{0}".format(schema))
try:
proc = subprocess.Popen(
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=-1
)
if is_text(sql):
sql = sql.encode("utf8")
(output, _) = proc.communicate(sql)
except Exception as e:
raise Log.error("Can not call \"mysql\"", e)
if proc.returncode:
if len(sql) > 10000:
sql = "<" + text_type(len(sql)) + " bytes of sql>"
Log.error(
"Unable to execute sql: return code {{return_code}}, {{output}}:\n {{sql}}\n",
sql=indent(sql),
return_code=proc.returncode,
output=output
)
开发者ID:klahnakoski,项目名称:pyLibrary,代码行数:50,代码来源:mysql.py
示例18: inner
def inner(changeset_id):
if self.es.cluster.version.startswith("1.7."):
query = {
"query": {"filtered": {
"query": {"match_all": {}},
"filter": {"and": [
{"prefix": {"changeset.id": changeset_id}},
{"range": {"etl.timestamp": {"gt": MIN_ETL_AGE}}}
]}
}},
"size": 1
}
else:
query = {
"query": {"bool": {"must": [
{"prefix": {"changeset.id": changeset_id}},
{"range": {"etl.timestamp": {"gt": MIN_ETL_AGE}}}
]}},
"size": 1
}
try:
# ALWAYS TRY ES FIRST
with self.es_locker:
response = self.es.search(query)
json_diff = response.hits.hits[0]._source.changeset.diff
if json_diff:
return json_diff
except Exception as e:
pass
url = expand_template(DIFF_URL, {"location": revision.branch.url, "rev": changeset_id})
DEBUG and Log.note("get unified diff from {{url}}", url=url)
try:
response = http.get(url)
diff = response.content.decode("utf8")
json_diff = diff_to_json(diff)
num_changes = _count(c for f in json_diff for c in f.changes)
if json_diff:
if revision.changeset.description.startswith("merge "):
return None # IGNORE THE MERGE CHANGESETS
elif num_changes < MAX_DIFF_SIZE:
return json_diff
else:
Log.warning("Revision at {{url}} has a diff with {{num}} changes, ignored", url=url, num=num_changes)
for file in json_diff:
file.changes = None
return json_diff
except Exception as e:
Log.warning("could not get unified diff from {{url}}", url=url, cause=e)
开发者ID:rv404674,项目名称:TUID,代码行数:50,代码来源:hg_mozilla_org.py
示例19: to_es_script
def to_es_script(self, schema):
term = FirstOp("first", self.term).partial_eval()
value = term.to_es_script(schema)
if isinstance(value.frum, CoalesceOp):
return CoalesceOp("coalesce", [StringOp("string", t).partial_eval() for t in value.frum.terms]).to_es_script(schema)
if value.type == BOOLEAN:
return EsScript(
miss=self.term.missing().partial_eval(),
type=STRING,
expr=value.expr + ' ? "T" : "F"',
frum=self
)
elif value.type == INTEGER:
return EsScript(
miss=self.term.missing().partial_eval(),
type=STRING,
expr="String.valueOf(" + value.expr + ")",
frum=self
)
elif value.type == NUMBER:
return EsScript(
miss=self.term.missing().partial_eval(),
type=STRING,
expr=expand_template(TO_STRING, {"expr":value.expr}),
frum=self
)
elif value.type == STRING:
return value
else:
return EsScript(
miss=self.term.missing().partial_eval(),
type=STRING,
expr=expand_template(TO_STRING, {"expr":value.expr}),
frum=self
)
开发者ID:rv404674,项目名称:TUID,代码行数:37,代码来源:expressions.py
示例20: compileString2Term
def compileString2Term(edge):
if edge.esscript:
Log.error("edge script not supported yet")
value = edge.value
if is_variable_name(value):
value = strings.expand_template("getDocValue({{path}})", {"path": quote(value)})
else:
Log.error("not handled")
def fromTerm(value):
return edge.domain.getPartByKey(value)
return Data(
toTerm={"head": "", "body": value},
fromTerm=fromTerm
)
开发者ID:rv404674,项目名称:TUID,代码行数:17,代码来源:util.py
注:本文中的mo_logs.strings.expand_template函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论