本文整理汇总了Python中toolz.curried.pipe函数的典型用法代码示例。如果您正苦于以下问题:Python pipe函数的具体用法?Python pipe怎么用?Python pipe使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了pipe函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: compare_streams
def compare_streams(db_engine, date_range, stream_names, allowed_parts_of_speech, max_num_words):
"""Compare tokens from each stream in the stream_names list"""
## Create token count dictionaries for each stream name
count_dicts_dict = {}
for stream_name in stream_names:
count_dicts_dict[stream_name] = tz.pipe(
get_content(
db_engine,
stream_name,
date_range),
parse_content_into_count(max_num_words, allowed_parts_of_speech))
## Create cross-stream count dictionary
all_streams_count_dict = reduce(
lambda x,y: tz.merge_with(sum, x, y),
count_dicts_dict.values())
## Calculate posterior probabilities of the tokens
posterior_probs = {}
for stream_name in stream_names:
posterior_probs[stream_name] = tz.pipe(
get_posterior_probs_freq(
500, # limited to the 500 most frequent words in this stream, at this time
all_streams_count_dict,
count_dicts_dict[stream_name]),
tz.map(lambda x: tz.merge({"stream":stream_name}, x)),
tz.take(max_num_words),
list,
)
return posterior_probs
开发者ID:johngeer,项目名称:social-media-comparison,代码行数:31,代码来源:distinctive_words.py
示例2: send_message_to
def send_message_to(self, recipient: ServerInfo, msg: Message):
# asyncio.sleep syspends the function and allows the event loop to continue processing on the next scheduled
# coroutine in the queue, until this one finishes its sleep
yield from asyncio.sleep(3)
reader, writer = yield from asyncio.open_connection(recipient.ip,
recipient.port,
loop=loop)
pipe(msg,
pickle.dumps,
writer.write)
writer.close()
开发者ID:michelrandahl,项目名称:python-distributed-system,代码行数:11,代码来源:SimpleAsyncServer.py
示例3: save_as_html
def save_as_html(distinct_words, file_name):
"""Generate and save an html display of the distinct words"""
## Wrangle data for presentation
# Convert tokens into a single string
def get_token_string(given_values):
"""Return a token string, if the given values are a list of dictionaries"""
# check if it is a list of token-related information
if (isinstance(given_values, list) and
len(given_values) > 0 and
isinstance(given_values[0], dict)):
return tz.pipe(
given_values,
tz.map(lambda x: x['token']),
tz.map(wrap_in_highlight_link), # wrap in link to highlight words
tz.reduce(lambda x,y: u"{}, {}".format(x, y)))
# return empty string for empty lists
elif isinstance(given_values, list) and len(given_values) == 0:
return ''
# check if it is a date range in need of formating
elif isinstance(given_values, list) and len(given_values) == 2:
return format_date_range(given_values)
else:
return given_values
def format_date_range(given_date_range):
"""Return a pretty version of the given date_range"""
date_range = map(
lambda x: dt.datetime.strptime(x, "%Y-%m-%dT%H:%M:%SZ"),
given_date_range)
return "{} to {} UTC".format(
date_range[0].strftime("%Y-%m-%d %H:%M"),
date_range[1].strftime("%H:%M"))
def wrap_in_highlight_link(given_string):
"""return the given string wrapped in the html code to highlight
other occurances of that same word"""
return u"""<a href="javascript:void($('.distinct_words').removeHighlight().highlight('{string}'));">{string}</a>""".format(string=given_string)
formated_distinct_words = tz.pipe(
distinct_words,
tz.map(
tz.valmap(get_token_string)),
list)
## Send to Template For Display
template_dir = 'templates'
loader = jinja2.FileSystemLoader(template_dir)
environment = jinja2.Environment(loader=loader)
template = environment.get_template('distinct_words.html')
with open(file_name, 'w') as f:
tz.pipe(
template.render(distinct_words = formated_distinct_words),
lambda x: x.encode('utf8'),
lambda x: f.write(x))
开发者ID:johngeer,项目名称:social-media-comparison,代码行数:51,代码来源:distinctive_words.py
示例4: ipython_display
def ipython_display(specs):
"""Run publish_display_data for the JS and HTML
Args:
specs: a list of Vega specs
"""
pipe(
specs,
map(lambda x: (uuid.uuid4(), vega.Vega(x))),
list,
do(html_publish_map),
map(tlam(js_publish)),
list
)
开发者ID:wd15,项目名称:extremefill2D,代码行数:14,代码来源:plot.py
示例5: send_message_to
def send_message_to(self, recipient: ServerInfo, msg: Message):
# asyncio.sleep suspends the function and allows the event loop to continue processing on the next scheduled
# coroutine in the queue, until this one finishes its sleep
yield from asyncio.sleep(2) # simulating send delay of 2 seconds
reader, writer = \
yield from asyncio.open_connection(
recipient.ip, recipient.port, loop=loop)
pipe(
msg,
pickle.dumps,
writer.write
)
# yield from asyncio.sleep(x) # simulate slow transfer (eg. huge file or very low bandwidth)
writer.close()
开发者ID:michelrandahl,项目名称:python-distributed-system,代码行数:14,代码来源:DataRepServer.py
示例6: html_publish_map
def html_publish_map(data):
"""Run IPython's 'publish_display_data' for each spec.
Args:
data: list of (id, spec) pairings
"""
pipe(
data,
map(lambda x: x[0]),
list,
lambda x: publish_display_data(
{'text/html': render_html(x)},
metadata={'jupyter-vega': '#{0}'.format(x[0])})
)
开发者ID:wd15,项目名称:extremefill2D,代码行数:14,代码来源:plot.py
示例7: fancify_summary
def fancify_summary(expr):
""" Separate a complex summary into two pieces
Helps pandas compute_by on summaries
>>> t = symbol('t', 'var * {x: int, y: int}')
>>> one, two, three = fancify_summary(summary(a=t.x.sum(), b=t.x.sum() + t.y.count() - 1))
A simpler summary with only raw reductions
>>> one
summary(x_sum=sum(t.x), y_count=count(t.y))
A mapping of those names to new leaves to use in another compuation
>>> two # doctest: +SKIP
{'x_sum': x_sum, 'y_count': y_count}
A mapping of computations to do for each column
>>> three # doctest: +SKIP
{'a': x_sum, 'b': (x_sum + y_count) - 1}
In this way, ``compute_by`` is able to do simple pandas reductions using
groups.agg(...) and then do columnwise arithmetic afterwards.
"""
seen_names.clear()
name_dict.clear()
exprs = pipe(expr.values, map(Expr._traverse), concat, filter(lambda x: isinstance(x, Reduction)), set)
one = summary(**dict((_name(expr), expr) for expr in exprs))
two = dict((_name(expr), symbol(_name(expr), datashape.var * expr.dshape)) for expr in exprs)
d = dict((expr, two[_name(expr)]) for expr in exprs)
three = dict((name, value._subs(d)) for name, value in zip(expr.names, expr.values))
return one, two, three
开发者ID:arvindchari88,项目名称:newGitTest,代码行数:34,代码来源:pandas.py
示例8: vega_plot_treants
def vega_plot_treants(treants):
"""Make a vega plot with side-by-side plots
Args:
treants: a list of treants
Returns
a MultiVega instance
>>> from click.testing import CliRunner
>>> from extremefill2D.fextreme import init_sim
>>> from extremefill2D.fextreme.tools import base_path
>>> with CliRunner().isolated_filesystem() as dir_:
... assert pipe(
... os.path.join(base_path(), 'scripts', 'params.json'),
... init_sim(data_path=dir_),
... lambda x: [x, x],
... vega_plot_treants,
... lambda x: type(x) is MultiVega)
"""
return pipe(
treants,
map(lambda x: render_spec([x])),
list,
MultiVega
)
开发者ID:wd15,项目名称:extremefill2D,代码行数:26,代码来源:plot.py
示例9: compare_streams_across_time
def compare_streams_across_time(db_engine, configuration):
"""Return distinct words for each considered stream at each time step in
the given date range."""
def date_range_iterator(overall_date_range, time_step):
"""Returns an iterator of the time ranges being considered.
time_step is assumed to be in minutes"""
def get_time(overall_start, time_step, step):
"""Return the timestamp that is step time_step's beyond overall_start"""
return (overall_start + (time_step*(step-1))).strftime("%Y-%m-%dT%H:%M:%SZ")
overall_start = dt.datetime.strptime(overall_date_range[0], "%Y-%m-%dT%H:%M:%SZ")
overall_end = dt.datetime.strptime(overall_date_range[1], "%Y-%m-%dT%H:%M:%SZ")
time_step = dt.timedelta(minutes=time_step)
return tz.pipe(
# Number of steps to take
(overall_end - overall_start).total_seconds() / time_step.total_seconds(),
int,
# Build range
lambda x: range(1,x+2),
# Convert to timestamps
tz.map(lambda x: [
get_time(overall_start, time_step, x-1),
get_time(overall_start, time_step, x)]))
result = []
for date_range in date_range_iterator(configuration['overall_date_range'], configuration['time_step']):
result.append(
tz.pipe( # Stream comparison for a particular time period
compare_streams(
db_engine,
date_range,
configuration['stream_names'],
configuration['allowed_parts_of_speech'],
configuration['max_num_words']),
lambda x: tz.merge(x, {'date_range': date_range}))) # add in date_range entry
return result
开发者ID:johngeer,项目名称:social-media-comparison,代码行数:34,代码来源:distinctive_words.py
示例10: load_errors_definitions
def load_errors_definitions():
"""Return `definition_by_error_name` dict from the file `errH.json`."""
return pipe(
load_json(os.path.join(json_dir_path, 'ast', 'errH.json')),
map(lambda d: (d['name'], d)), # Index by name and keep singleton value (groupby creates list values)
dict,
)
开发者ID:fkobon,项目名称:calculette-impots-python,代码行数:7,代码来源:loaders.py
示例11: contours
def contours(data):
"""Get zero contours from x, y, z data
Args:
data: dictionary with (x, y, z, dx) keys
Returns:
a list of (N, 2) numpy arrays representing the contours
"""
def linspace_(arr, spacing):
"""Calcuate the linspace based on a spacing
"""
return pipe(
arr,
juxt(min, max),
tlam(lambda x_, y_: np.linspace(x_, y_, (y_ - x_) / spacing))
)
return pipe(
data,
lambda x: dict(xi=linspace_(x['x'], x['dx']),
yi=linspace_(x['y'], x['dx']),
**x),
lambda x: griddata((x['y'], x['x']),
x['z'],
(x['yi'][None, :], x['xi'][:, None]),
method='cubic'),
lambda x: measure.find_contours(x, 0.0),
map(lambda x: float(data['dx']) * x)
)
开发者ID:wd15,项目名称:extremefill2D,代码行数:30,代码来源:plot.py
示例12: rolling_fit_opt_weights
def rolling_fit_opt_weights(df, opt_weights_func, look_ahead_per):
"""applies opt_weights_func to rolling window on pandas df"""
num_rows = df.shape[0]
p = pipe(xrange(num_rows),
filter(lambda x: x + look_ahead_per < num_rows),
map(lambda x: {df.index[x]: opt_weights_func(df.iloc[x:x+look_ahead_per+1])}))
return pd.DataFrame(merge(p)).T
开发者ID:rhouck,项目名称:nn_port,代码行数:7,代码来源:opt_weights.py
示例13: get_treant_df
def get_treant_df(tags, path='.'):
"""Get treants as a Pandas DataFrame
Args:
tags: treant tags to identify the treants
path: the path to search for treants
Returns:
a Pandas DataFrame with the treant name, tags and categories
>>> from click.testing import CliRunner
>>> from toolz.curried import do
>>> with CliRunner().isolated_filesystem() as dir_:
... assert pipe(
... dir_,
... dtr.Treant,
... do(lambda x: x.__setattr__('tags', ['atag'])),
... lambda x: x.uuid[:8],
... lambda x: x == get_treant_df(['atag'], path=dir_).uuid[0]
... )
"""
return pipe(
tags,
get_by_tags(path=path),
lambda x: x.map(get_treant_data),
pandas.DataFrame,
)
开发者ID:wd15,项目名称:extremefill2D,代码行数:28,代码来源:tools.py
示例14: map
def map(self, func, data): # pylint: disable=no-self-use
return pipe(
data,
map(func),
map(DummyResult),
list
)
开发者ID:wd15,项目名称:extremefill2D,代码行数:7,代码来源:tools.py
示例15: outer_dict
def outer_dict(dict_in):
"""Outer product of dictionary values
Args:
dict_in: a dictionary with iterable values
Returns:
a list of dictionaries
>>> assert pipe(
... dict(a=[1], b=[2, 3]),
... curry(outer_dict),
... lambda x: x == [dict(a=1, b=2), dict(a=1, b=3)]
... )
"""
return pipe(
dict_in.items(),
lambda x: zip(*x),
list,
lambda x: (x[0], product(*x[1])),
tlam(lambda x, y: zip(repeat(x), y)),
map(lambda x: zip(*x)),
map(dict),
list
)
开发者ID:wd15,项目名称:extremefill2D,代码行数:25,代码来源:tools.py
示例16: destruct
def destruct(x):
"""
Deconstructs a data structure into a 1-D np.ndarray (via multiple dispatch)
Converts a list of numpy arrays to a single array
"""
# unravel each array, c
return pipe(x, map(destruct), concat, list, np.array)
开发者ID:sundeepteki,项目名称:descent,代码行数:8,代码来源:utils.py
示例17: visit_formula
def visit_formula(node):
formula_name = node['name']
dependencies = pipe(
visit_node(node['expression']),
unique,
list,
)
return (formula_name, dependencies)
开发者ID:openfisca,项目名称:calculette-impots-m-language-parser,代码行数:8,代码来源:dependencies_visitors.py
示例18: linspace_
def linspace_(arr, spacing):
"""Calcuate the linspace based on a spacing
"""
return pipe(
arr,
juxt(min, max),
tlam(lambda x_, y_: np.linspace(x_, y_, (y_ - x_) / spacing))
)
开发者ID:wd15,项目名称:extremefill2D,代码行数:8,代码来源:plot.py
示例19: discover_jsonlines
def discover_jsonlines(j, n=10, encoding='utf-8', **kwargs):
with json_lines(j.path, encoding=encoding) as lines:
data = pipe(lines, filter(nonempty), map(json.loads), take(n), list)
if len(data) < n:
ds = discover(data)
else:
ds = var * discover(data).subshape[0]
return date_to_datetime_dshape(ds)
开发者ID:mrocklin,项目名称:into,代码行数:9,代码来源:json.py
示例20: test_get_by_uuid
def test_get_by_uuid():
"""Test get_by_uuid
"""
from click.testing import CliRunner
with CliRunner().isolated_filesystem() as dir_:
assert pipe(
dir_,
dtr.Treant,
lambda x: x.uuid == get_by_uuid(x.uuid[:8]).uuid)
开发者ID:wd15,项目名称:extremefill2D,代码行数:9,代码来源:tools.py
注:本文中的toolz.curried.pipe函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论