本文整理汇总了Python中toolz.merge函数的典型用法代码示例。如果您正苦于以下问题:Python merge函数的具体用法?Python merge怎么用?Python merge使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了merge函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: elemwise
def elemwise(op, *args, **kwargs):
""" Elementwise operation for dask.Dataframes """
columns = kwargs.get('columns', None)
name = kwargs.get('name', None)
_name = 'elemwise' + next(tokens)
dfs = [arg for arg in args if isinstance(arg, _Frame)]
other = [(i, arg) for i, arg in enumerate(args)
if not isinstance(arg, _Frame)]
if other:
op2 = partial_by_order(op, other)
else:
op2 = op
if not all(df.divisions == dfs[0].divisions for df in dfs):
msg = 'All dask.Dataframe and dask.Series must have same divisions'
raise ValueError(msg)
if not all(df.npartitions == dfs[0].npartitions for df in dfs):
msg = 'All dask.Dataframe and dask.Series must have same npartitions'
raise ValueError(msg)
dsk = dict(((_name, i), (op2,) + frs)
for i, frs in enumerate(zip(*[df._keys() for df in dfs])))
if columns is not None:
return DataFrame(merge(dsk, *[df.dask for df in dfs]),
_name, columns, dfs[0].divisions)
else:
column_name = name or consistent_name(n for df in dfs
for n in df.columns)
return Series(merge(dsk, *[df.dask for df in dfs]),
_name, column_name, dfs[0].divisions)
开发者ID:BabeNovelty,项目名称:dask,代码行数:33,代码来源:core.py
示例2: from_imperative
def from_imperative(dfs, columns, divisions=None):
""" Create DataFrame from many imperative objects
Parameters
----------
dfs: list of Values
An iterable of dask.imperative.Value objects, such as come from dask.do
These comprise the individual partitions of the resulting dataframe
columns: list or string
The list of column names if the result is a DataFrame
Or the single column name if the result is a Series
divisions: list or None
"""
from dask.imperative import Value
if isinstance(dfs, Value):
dfs = [dfs]
dsk = merge(df.dask for df in dfs)
name = 'from-imperative-' + tokenize(*dfs)
names = [(name, i) for i in range(len(dfs))]
values = [df.key for df in dfs]
dsk2 = dict(zip(names, values))
if divisions is None:
divisions = [None] * (len(dfs) + 1)
if isinstance(columns, str):
return Series(merge(dsk, dsk2), name, columns, divisions)
else:
return DataFrame(merge(dsk, dsk2), name, columns, divisions)
开发者ID:gochoam,项目名称:dask,代码行数:30,代码来源:io.py
示例3: to_hdf
def to_hdf(df, path_or_buf, key, mode='a', append=False, complevel=0,
complib=None, fletcher32=False, get=get_sync, dask_kwargs=None,
name_function=None, compute=True, **kwargs):
name = 'to-hdf-' + uuid.uuid1().hex
pd_to_hdf = getattr(df._partition_type, 'to_hdf')
# if path_or_buf is string, format using i_name
if isinstance(path_or_buf, str):
if path_or_buf.count('*') + key.count('*') > 1:
raise ValueError("A maximum of one asterisk is accepted in file path and dataset key")
fmt_obj = lambda path_or_buf, i_name: path_or_buf.replace('*', i_name)
else:
if key.count('*') > 1:
raise ValueError("A maximum of one asterisk is accepted in dataset key")
fmt_obj = lambda path_or_buf, _: path_or_buf
if name_function is None:
name_function = build_name_function(df.npartitions - 1)
# we guarantee partition order is preserved when its saved and read
# so we enforce name_function to maintain the order of its input.
if '*' in key or (isinstance(path_or_buf, str) and '*' in path_or_buf):
formatted_names = [name_function(i) for i in range(df.npartitions)]
if formatted_names != sorted(formatted_names):
warn("In order to preserve order between partitions "
"name_function must preserve the order of its input")
dsk = dict()
i_name = name_function(0)
dsk[(name, 0)] = (_link, None,
(apply, pd_to_hdf,
(tuple, [(df._name, 0), fmt_obj(path_or_buf, i_name),
key.replace('*', i_name)]),
merge(kwargs,
{'mode': mode, 'format': 'table', 'append': append,
'complevel': complevel, 'complib': complib,
'fletcher32': fletcher32})))
for i in range(1, df.npartitions):
i_name = name_function(i)
dsk[(name, i)] = (_link, (name, i - 1),
(apply, pd_to_hdf,
(tuple, [(df._name, i), fmt_obj(path_or_buf, i_name),
key.replace('*', i_name)]),
merge(kwargs,
{'mode': 'a', 'format': 'table', 'append': True,
'complevel': complevel, 'complib': complib,
'fletcher32': fletcher32})))
dask_kwargs = dask_kwargs or {}
dsk = merge(df.dask, dsk)
key = (name, df.npartitions - 1)
if compute:
return DataFrame._get(dsk, key, get=get, **dask_kwargs)
else:
return Delayed(key, [dsk])
开发者ID:lematmat,项目名称:dask,代码行数:60,代码来源:io.py
示例4: set_partition
def set_partition(f, index, divisions, get=threaded.get, **kwargs):
""" Set new partitioning along index given divisions """
divisions = unique(divisions)
name = next(names)
if isinstance(index, Series):
assert index.divisions == f.divisions
dsk = dict(((name, i), (f._partition_type.set_index, block, ind))
for i, (block, ind) in enumerate(zip(f._keys(), index._keys())))
f2 = type(f)(merge(f.dask, index.dask, dsk), name,
f.column_info, f.divisions)
else:
dsk = dict(((name, i), (f._partition_type.set_index, block, index))
for i, block in enumerate(f._keys()))
f2 = type(f)(merge(f.dask, dsk), name, f.column_info, f.divisions)
head = f2.head()
pf = pframe(like=head, divisions=divisions, **kwargs)
def append(block):
pf.append(block)
return 0
f2.map_blocks(append).compute(get=get)
pf.flush()
return from_pframe(pf)
开发者ID:flrgsr,项目名称:dask,代码行数:26,代码来源:shuffle.py
示例5: _loc
def _loc(self, ind):
""" Helper function for the .loc accessor """
if not self.known_divisions:
raise ValueError(
"Can not use loc on DataFrame without known divisions")
name = next(names)
if not isinstance(ind, slice):
part = self._partition_of_index_value(ind)
dsk = {(name, 0): (lambda df: df.loc[ind], (self._name, part))}
return type(self)(merge(self.dask, dsk), name,
self.column_info, [])
else:
assert ind.step in (None, 1)
if ind.start:
start = self._partition_of_index_value(ind.start)
else:
start = 0
if ind.stop is not None:
stop = self._partition_of_index_value(ind.stop)
else:
stop = self.npartitions - 1
if stop == start:
dsk = {(name, 0): (_loc, (self._name, start), ind.start, ind.stop)}
else:
dsk = merge(
{(name, 0): (_loc, (self._name, start), ind.start, None)},
dict(((name, i), (self._name, start + i))
for i in range(1, stop - start)),
{(name, stop - start): (_loc, (self._name, stop), None, ind.stop)})
return type(self)(merge(self.dask, dsk), name, self.column_info,
self.divisions[start:stop])
开发者ID:flrgsr,项目名称:dask,代码行数:32,代码来源:core.py
示例6: compute
def compute(*args, **kwargs):
"""Compute several dask collections at once.
Examples
--------
>>> import dask.array as da
>>> a = da.arange(10, chunks=2).sum()
>>> b = da.arange(10, chunks=2).mean()
>>> compute(a, b)
(45, 4.5)
"""
groups = groupby(attrgetter('_optimize'), args)
get = kwargs.pop('get', None) or _globals['get']
if not get:
get = args[0]._default_get
if not all(a._default_get == get for a in args):
raise ValueError("Compute called on multiple collections with "
"differing default schedulers. Please specify a "
"scheduler `get` function using either "
"the `get` kwarg or globally with `set_options`.")
dsk = merge([opt(merge([v.dask for v in val]), [v._keys() for v in val])
for opt, val in groups.items()])
keys = [arg._keys() for arg in args]
results = get(dsk, keys, **kwargs)
return tuple(a._finalize(a, r) for a, r in zip(args, results))
开发者ID:hgz2373294,项目名称:dask,代码行数:27,代码来源:base.py
示例7: from_imperative
def from_imperative(dfs, metadata=None, divisions=None, columns=None):
""" Create DataFrame from many imperative objects
Parameters
----------
dfs: list of Values
An iterable of dask.imperative.Value objects, such as come from dask.do
These comprise the individual partitions of the resulting dataframe
metadata: list or string of column names or empty dataframe
divisions: list or None
"""
if columns is not None:
print("Deprecation warning: Use metadata argument, not columns")
metadata = columns
from dask.imperative import Value
if isinstance(dfs, Value):
dfs = [dfs]
dsk = merge(df.dask for df in dfs)
name = 'from-imperative-' + tokenize(*dfs)
names = [(name, i) for i in range(len(dfs))]
values = [df.key for df in dfs]
dsk2 = dict(zip(names, values))
if divisions is None:
divisions = [None] * (len(dfs) + 1)
if isinstance(metadata, str):
return Series(merge(dsk, dsk2), name, metadata, divisions)
else:
return DataFrame(merge(dsk, dsk2), name, metadata, divisions)
开发者ID:jhyun0919,项目名称:ML-DL_jhyun,代码行数:31,代码来源:io.py
示例8: elemwise
def elemwise(op, *args, **kwargs):
""" Elementwise operation for dask.Dataframes """
columns = kwargs.get('columns', None)
name = kwargs.get('name', None)
_name = next(names)
frames = [arg for arg in args if isinstance(arg, _Frame)]
other = [(i, arg) for i, arg in enumerate(args)
if not isinstance(arg, _Frame)]
if other:
op2 = partial_by_order(op, other)
else:
op2 = op
assert all(f.divisions == frames[0].divisions for f in frames)
assert all(f.npartitions == frames[0].npartitions for f in frames)
dsk = dict(((_name, i), (op2,) + frs)
for i, frs in enumerate(zip(*[f._keys() for f in frames])))
if columns is not None:
return DataFrame(merge(dsk, *[f.dask for f in frames]),
_name, columns, frames[0].divisions)
else:
column_name = name or consistent_name(n for f in frames
for n in f.columns)
return Series(merge(dsk, *[f.dask for f in frames]),
_name, column_name, frames[0].divisions)
开发者ID:flrgsr,项目名称:dask,代码行数:30,代码来源:core.py
示例9: to_hdf
def to_hdf(df, path_or_buf, key, mode='a', append=False, complevel=0,
complib=None, fletcher32=False, get=get_sync, dask_kwargs=None,
**kwargs):
name = 'to-hdf-' + uuid.uuid1().hex
pd_to_hdf = getattr(df._partition_type, 'to_hdf')
dsk = dict()
dsk[(name, 0)] = (_link, None,
(apply, pd_to_hdf,
(tuple, [(df._name, 0), path_or_buf, key]),
merge(kwargs,
{'mode': mode, 'format': 'table', 'append': append,
'complevel': complevel, 'complib': complib,
'fletcher32': fletcher32})))
for i in range(1, df.npartitions):
dsk[(name, i)] = (_link, (name, i - 1),
(apply, pd_to_hdf,
(tuple, [(df._name, i), path_or_buf, key]),
merge(kwargs,
{'mode': 'a', 'format': 'table', 'append': True,
'complevel': complevel, 'complib': complib,
'fletcher32': fletcher32})))
dask_kwargs = dask_kwargs or {}
DataFrame._get(merge(df.dask, dsk), (name, df.npartitions - 1),
get=get, **dask_kwargs)
开发者ID:jhyun0919,项目名称:ML-DL_jhyun,代码行数:28,代码来源:io.py
示例10: f
def f(c, a, b):
keys = yield _scatter((c.ip, c.port), [1, 2, 3])
assert merge(a.data, b.data) == \
{k: i for k, i in zip(keys, [1, 2, 3])}
assert set(c.who_has) == set(keys)
assert all(len(v) == 1 for v in c.who_has.values())
keys2, who_has, nbytes = yield scatter_to_workers([a.address, b.address],
[4, 5, 6])
m = merge(a.data, b.data)
for k, v in zip(keys2, [4, 5, 6]):
assert m[k] == v
assert isinstance(who_has, dict)
assert set(concat(who_has.values())) == {a.address, b.address}
assert len(who_has) == len(keys2)
assert isinstance(nbytes, dict)
assert set(nbytes) == set(who_has)
assert all(isinstance(v, int) for v in nbytes.values())
result = yield _gather((c.ip, c.port), keys2)
assert result == [4, 5, 6]
开发者ID:coobas,项目名称:distributed,代码行数:27,代码来源:test_client.py
示例11: apply
def apply(self, latitude, longitude, latitude_mask, **kwargs):
latitude = (latitude.T - data.train_gps_mean[0]) / data.train_gps_std[0]
longitude = (longitude.T - data.train_gps_mean[1]) / data.train_gps_std[1]
latitude_mask = latitude_mask.T
rec_in = tensor.concatenate((latitude[:, :, None], longitude[:, :, None]),
axis=2)
path = self.rec.apply(merge(self.fwd_fork.apply(rec_in, as_dict=True),
{'mask': latitude_mask}),
merge(self.bkwd_fork.apply(rec_in, as_dict=True),
{'mask': latitude_mask}))[0]
last_id = tensor.cast(latitude_mask.sum(axis=0) - 1, dtype='int64')
path_representation = (path[0][:, -self.config.rec_state_dim:],
path[last_id - 1, tensor.arange(last_id.shape[0])]
[:, :self.config.rec_state_dim])
embeddings = tuple(self.context_embedder.apply(
**{k: kwargs[k] for k in self.context_embedder.inputs }))
inputs = tensor.concatenate(path_representation + embeddings, axis=1)
outputs = self.rec_to_output.apply(inputs)
return outputs
开发者ID:DragonCircle,项目名称:taxi,代码行数:25,代码来源:memory_network_bidir.py
示例12: from_imperative
def from_imperative(values):
""" Create bag from many imperative objects
Parameters
----------
values: list of Values
An iterable of dask.imperative.Value objects, such as come from dask.do
These comprise the individual partitions of the resulting bag
Returns
-------
Bag
Examples
--------
>>> b = from_imperative([x, y, z]) # doctest: +SKIP
"""
from dask.imperative import Value
if isinstance(values, Value):
values = [values]
dsk = merge(v.dask for v in values)
name = 'bag-from-imperative-' + tokenize(*values)
names = [(name, i) for i in range(len(values))]
values = [v.key for v in values]
dsk2 = dict(zip(names, values))
return Bag(merge(dsk, dsk2), name, len(values))
开发者ID:jcorbin,项目名称:dask,代码行数:28,代码来源:core.py
示例13: apply
def apply(self, source_sentence, source_sentence_mask):
"""Produces source annotations, either non-recurrently or with
a bidirectional RNN architecture.
"""
# Time as first dimension
source_sentence = source_sentence.T
source_sentence_mask = source_sentence_mask.T
embeddings = self.lookup.apply(source_sentence)
representation = self.bidirs[0].apply(
merge(self.fwd_forks[0].apply(embeddings, as_dict=True),
{'mask': source_sentence_mask}),
merge(self.back_forks[0].apply(embeddings, as_dict=True),
{'mask': source_sentence_mask}))
for i in xrange(1, self.n_layers):
if self.skip_connections:
inp = tensor.concatenate([representation, embeddings],
axis=2)
else:
inp = representation
representation = self.bidirs[i].apply(
merge(self.fwd_forks[i].apply(inp, as_dict=True),
{'mask': source_sentence_mask}),
merge(self.back_forks[i].apply(inp, as_dict=True),
{'mask': source_sentence_mask})
)
return representation, source_sentence_mask
开发者ID:ucam-smt,项目名称:sgnmt,代码行数:26,代码来源:encoder.py
示例14: _loc_slice
def _loc_slice(self, ind):
name = 'loc-slice' + next(tokens)
assert ind.step in (None, 1)
if ind.start:
start = _partition_of_index_value(self.divisions, ind.start)
else:
start = 0
if ind.stop is not None:
stop = _partition_of_index_value(self.divisions, ind.stop)
else:
stop = self.npartitions - 1
istart = _coerce_loc_index(self.divisions, ind.start)
istop = _coerce_loc_index(self.divisions, ind.stop)
if stop == start:
dsk = {(name, 0): (_loc, (self._name, start), ind.start, ind.stop)}
divisions = [istart, istop]
else:
dsk = merge(
{(name, 0): (_loc, (self._name, start), ind.start, None)},
dict(((name, i), (self._name, start + i))
for i in range(1, stop - start)),
{(name, stop - start): (_loc, (self._name, stop), None, ind.stop)})
divisions = ((max(istart, self.divisions[start])
if ind.start is not None
else self.divisions[0],) +
self.divisions[start+1:stop+1] +
(min(istop, self.divisions[stop+1])
if ind.stop is not None
else self.divisions[-1],))
assert len(divisions) == len(dsk) + 1
return type(self)(merge(self.dask, dsk),
name, self.column_info,
divisions)
开发者ID:BabeNovelty,项目名称:dask,代码行数:35,代码来源:core.py
示例15: f
def f(c, a, b):
data = yield _scatter((c.ip, c.port), [1, 2, 3])
assert c.ip in str(data[0])
assert c.ip in repr(data[0])
assert merge(a.data, b.data) == \
{d.key: i for d, i in zip(data, [1, 2, 3])}
assert set(c.who_has) == {d.key for d in data}
assert all(len(v) == 1 for v in c.who_has.values())
result = yield [d._get() for d in data]
assert result == [1, 2, 3]
yield data[0]._delete()
assert merge(a.data, b.data) == \
{d.key: i for d, i in zip(data[1:], [2, 3])}
assert data[0].key not in c.who_has
data = yield scatter_to_workers((c.ip, c.port), [a.address, b.address],
[4, 5, 6])
m = merge(a.data, b.data)
for d, v in zip(data, [4, 5, 6]):
assert m[d.key] == v
result = yield _gather((c.ip, c.port), data)
assert result == [4, 5, 6]
开发者ID:thrasibule,项目名称:distributed,代码行数:32,代码来源:test_client.py
示例16: read_csv
def read_csv(fn, *args, **kwargs):
chunksize = kwargs.pop('chunksize', 2**16)
categorize = kwargs.pop('categorize', None)
index = kwargs.pop('index', None)
if index and categorize == None:
categorize = True
header = kwargs.get('header', 1)
nlines = linecount(fn) - header
nchunks = int(ceil(1.0 * nlines / chunksize))
read = next(read_csv_names)
blockdivs = tuple(range(chunksize, nlines, chunksize))
one_chunk = pd.read_csv(fn, *args, nrows=100, **kwargs)
cols = []
if categorize or index:
if categorize:
category_columns = [c for c in one_chunk.dtypes.index
if one_chunk.dtypes[c] == 'O']
else:
category_columns = []
cols = category_columns + ([index] if index else [])
d = read_csv(fn, *args, **merge(kwargs,
dict(chunksize=chunksize,
usecols=cols,
categorize=False,
parse_dates=None)))
categories = [d[c].drop_duplicates() for c in category_columns]
if index:
quantiles = d[index].quantiles(np.linspace(0, 100, nchunks + 1)[1:-1])
result = compute(quantiles, *categories)
quantiles, categories = result[0], result[1:]
else:
categories = compute(*categories)
categories = dict(zip(category_columns, categories))
kwargs['chunksize'] = chunksize
load = {(read, -1): (partial(pd.read_csv, *args, **kwargs), fn)}
load.update(dict(((read, i), (get_chunk, (read, i-1), chunksize*i))
for i in range(nchunks)))
name = next(names)
dsk = dict(((name, i), (getitem, (read, i), 0))
for i in range(nchunks))
result = DataFrame(merge(dsk, load), name, one_chunk.columns, blockdivs)
if categorize:
func = partial(categorize_block, categories=categories)
result = result.map_blocks(func, columns=result.columns)
if index:
result = set_partition(result, index, quantiles)
return result
开发者ID:kastnerkyle,项目名称:dask,代码行数:60,代码来源:core.py
示例17: test_to_task_dask
def test_to_task_dask():
a = delayed(1, name='a')
b = delayed(2, name='b')
task, dask = to_task_dask([a, b, 3])
assert task == ['a', 'b', 3]
task, dask = to_task_dask((a, b, 3))
assert task == (tuple, ['a', 'b', 3])
assert dict(dask) == merge(a.dask, b.dask)
task, dask = to_task_dask({a: 1, b: 2})
assert (task == (dict, [['b', 2], ['a', 1]]) or
task == (dict, [['a', 1], ['b', 2]]))
assert dict(dask) == merge(a.dask, b.dask)
f = namedtuple('f', ['x', 'y'])
x = f(1, 2)
task, dask = to_task_dask(x)
assert task == x
assert dict(dask) == {}
# Issue https://github.com/dask/dask/issues/2107
class MyClass(dict):
pass
task, dask = to_task_dask(MyClass())
assert type(task) is MyClass
assert dict(dask) == {}
开发者ID:wikiped,项目名称:dask,代码行数:28,代码来源:test_delayed.py
示例18: __getitem__
def __getitem__(self, key):
if isinstance(key, (str, unicode)):
name = self._name + '.' + key
if key in self.columns:
dsk = dict(((name, i), (operator.getitem, (self._name, i), key))
for i in range(self.npartitions))
return Series(merge(self.dask, dsk), name,
key, self.divisions)
if isinstance(key, list):
name = '%s[%s]' % (self._name, str(key))
if all(k in self.columns for k in key):
dsk = dict(((name, i), (operator.getitem,
(self._name, i),
(list, key)))
for i in range(self.npartitions))
return DataFrame(merge(self.dask, dsk), name,
key, self.divisions)
if isinstance(key, Series) and self.divisions == key.divisions:
name = next(names)
dsk = dict(((name, i), (operator.getitem, (self._name, i),
(key._name, i)))
for i in range(self.npartitions))
return DataFrame(merge(self.dask, key.dask, dsk), name,
self.columns, self.divisions)
raise NotImplementedError()
开发者ID:flrgsr,项目名称:dask,代码行数:25,代码来源:core.py
示例19: persist
def persist(self, collections):
""" Persist dask collections on cluster
Starts computation of the collection on the cluster in the background.
Provides a new dask collection that is semantically identical to the
previous one, but now based off of futures currently in execution.
Parameters
----------
collections: sequence or single dask object
Collections like dask.array or dataframe or dask.value objects
Returns
-------
List of collections, or single collection, depending on type of input.
Examples
--------
>>> xx = executor.persist(x) # doctest: +SKIP
>>> xx, yy = executor.persist([x, y]) # doctest: +SKIP
See Also
--------
Executor.compute
"""
if isinstance(collections, (tuple, list, set, frozenset)):
singleton = False
else:
singleton = True
collections = [collections]
assert all(isinstance(c, Base) for c in collections)
groups = groupby(lambda x: x._optimize, collections)
dsk = merge([opt(merge([v.dask for v in val]),
[v._keys() for v in val])
for opt, val in groups.items()])
d = {k: unpack_remotedata(v) for k, v in dsk.items()}
dsk2 = {k: v[0] for k, v in d.items()}
dependencies = {k: v[1] for k, v in d.items()}
for k, v in dsk2.items():
dependencies[k] |= set(_deps(dsk, v))
names = list({k for c in collections for k in flatten(c._keys())})
self._send_to_scheduler({'op': 'update-graph',
'tasks': valmap(dumps_task, dsk2),
'dependencies': dependencies,
'keys': names,
'client': self.id})
result = [redict_collection(c, {k: Future(k, self)
for k in flatten(c._keys())})
for c in collections]
if singleton:
return first(result)
else:
return result
开发者ID:canavandl,项目名称:distributed,代码行数:59,代码来源:executor.py
示例20: compute
def compute(self, *args, **kwargs):
""" Compute dask collections on cluster
Parameters
----------
args: iterable of dask objects
Collections like dask.array or dataframe or dask.value objects
sync: bool (optional)
Returns Futures if False (default) or concrete values if True
Returns
-------
Tuple of Futures or concrete values
Examples
--------
>>> from dask import do, value
>>> from operator import add
>>> x = dask.do(add)(1, 2)
>>> y = dask.do(add)(x, x)
>>> xx, yy = executor.compute(x, y) # doctest: +SKIP
>>> xx # doctest: +SKIP
<Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e>
>>> xx.result() # doctest: +SKIP
3
>>> yy.result() # doctest: +SKIP
6
"""
sync = kwargs.pop('sync', False)
assert not kwargs
if sync:
return dask.compute(*args, get=self.get)
variables = [a for a in args if isinstance(a, Base)]
groups = groupby(lambda x: x._optimize, variables)
dsk = merge([opt(merge([v.dask for v in val]),
[v._keys() for v in val])
for opt, val in groups.items()])
names = ['finalize-%s' % tokenize(v) for v in variables]
dsk2 = {name: (v._finalize, v, v._keys()) for name, v in zip(names, variables)}
self.loop.add_callback(self.scheduler_queue.put_nowait,
{'op': 'update-graph',
'dsk': merge(dsk, dsk2),
'keys': names})
i = 0
futures = []
for arg in args:
if isinstance(arg, Base):
futures.append(Future(names[i], self))
i += 1
else:
futures.append(arg)
return futures
开发者ID:aterrel,项目名称:distributed,代码行数:58,代码来源:executor.py
注:本文中的toolz.merge函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论