• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python toolz.groupby函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中toolz.groupby函数的典型用法代码示例。如果您正苦于以下问题:Python groupby函数的具体用法?Python groupby怎么用?Python groupby使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了groupby函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: setup

    def setup(self):
        keys = self.keys

        while not keys.issubset(self.scheduler.tasks):
            yield gen.sleep(0.05)

        tasks = [self.scheduler.tasks[k] for k in keys]

        self.keys = None

        self.scheduler.add_plugin(self)  # subtle race condition here
        self.all_keys, errors = dependent_keys(tasks, complete=self.complete)
        if not self.complete:
            self.keys = self.all_keys.copy()
        else:
            self.keys, _ = dependent_keys(tasks, complete=False)
        self.all_keys.update(keys)
        self.keys |= errors & self.all_keys

        if not self.keys:
            self.stop(exception=None, key=None)

        # Group keys by func name
        self.keys = valmap(set, groupby(self.func, self.keys))
        self.all_keys = valmap(set, groupby(self.func, self.all_keys))
        for k in self.all_keys:
            if k not in self.keys:
                self.keys[k] = set()

        for k in errors:
            self.transition(k, None, 'erred', exception=True)
        logger.debug("Set up Progress keys")
开发者ID:tomMoral,项目名称:distributed,代码行数:32,代码来源:progress.py


示例2: scatter_to_workers

def scatter_to_workers(center, ncores, data, key=None, report=True):
    """ Scatter data directly to workers

    This distributes data in a round-robin fashion to a set of workers based on
    how many cores they have.  ncores should be a dictionary mapping worker
    identities to numbers of cores.

    See scatter for parameter docstring
    """
    if isinstance(center, str):
        ip, port = center.split(':')
    elif isinstance(center, rpc):
        ip, port = center.ip, center.port
    elif isinstance(center, tuple):
        ip, port = center
    else:
        raise TypeError("Bad type for center")

    if key is None:
        key = str(uuid.uuid1())

    if isinstance(ncores, Iterable) and not isinstance(ncores, dict):
        k = len(data) // len(ncores)
        ncores = {worker: k for worker in ncores}

    workers = list(concat([w] * nc for w, nc in ncores.items()))
    in_type = type(data)
    if isinstance(data, dict):
        names, data = list(zip(*data.items()))
    else:
        names = ('%s-%d' % (key, i) for i in count(0))

    worker_iter = drop(_round_robin_counter[0] % len(workers), cycle(workers))
    _round_robin_counter[0] += len(data)

    L = list(zip(worker_iter, names, data))
    d = groupby(0, L)
    d = {k: {b: c for a, b, c in v}
          for k, v in d.items()}

    out = yield All([rpc(ip=w_ip, port=w_port).update_data(data=v,
                                             close=True, report=report)
                 for (w_ip, w_port), v in d.items()])
    nbytes = merge([o[1]['nbytes'] for o in out])

    who_has = {k: [w for w, _, _ in v] for k, v in groupby(1, L).items()}

    result = [RemoteData(b, ip, port, result=c)
                for a, b, c in L]
    if in_type is dict:
        result = dict(zip(names, result))

    raise Return((result, who_has, nbytes))
开发者ID:lucashtnguyen,项目名称:distributed,代码行数:53,代码来源:client.py


示例3: setup

    def setup(self, keys, complete):
        errors = Progress.setup(self, keys, complete)

        # Group keys by func name
        self.keys = valmap(set, groupby(self.func, self.keys))
        self.all_keys = valmap(set, groupby(self.func, self.all_keys))
        for k in self.all_keys:
            if k not in self.keys:
                self.keys[k] = set()

        logger.debug("Set up Progress keys")
        return errors
开发者ID:aterrel,项目名称:distributed,代码行数:12,代码来源:diagnostics.py


示例4: _run_cnvkit_shared_orig

def _run_cnvkit_shared_orig(inputs, backgrounds):
    """Original CNVkit implementation with full normalization and segmentation.
    """
    work_dir = _sv_workdir(inputs[0])
    raw_work_dir = utils.safe_makedir(os.path.join(work_dir, "raw"))
    background_name = dd.get_sample_name(backgrounds[0]) if backgrounds else "flat"
    background_cnn = os.path.join(raw_work_dir, "%s_background.cnn" % (background_name))
    ckouts = []
    for cur_input in inputs:
        cur_raw_work_dir = utils.safe_makedir(os.path.join(_sv_workdir(cur_input), "raw"))
        out_base, out_base_old = _bam_to_outbase(dd.get_align_bam(cur_input), cur_raw_work_dir, cur_input)
        if utils.file_exists(out_base_old + ".cns"):
            out_base = out_base_old
        ckouts.append({"cnr": "%s.cnr" % out_base,
                       "cns": "%s.cns" % out_base})
    if not utils.file_exists(ckouts[0]["cns"]):
        cov_interval = dd.get_coverage_interval(inputs[0])
        samples_to_run = list(zip(["background"] * len(backgrounds), backgrounds)) + \
                         list(zip(["evaluate"] * len(inputs), inputs))
        # New style shared SV bins
        if tz.get_in(["depth", "bins", "target"], inputs[0]):
            target_bed = tz.get_in(["depth", "bins", "target"], inputs[0])
            antitarget_bed = tz.get_in(["depth", "bins", "antitarget"], inputs[0])
            raw_coverage_cnns = reduce(operator.add,
                                       [_get_general_coverage(cdata, itype) for itype, cdata in samples_to_run])
        # Back compatible with pre-existing runs
        else:
            target_bed, antitarget_bed = _get_original_targets(inputs[0])
            raw_coverage_cnns = reduce(operator.add,
                                       [_get_original_coverage(cdata, itype) for itype, cdata in samples_to_run])
        # Currently metrics not calculated due to speed and needing re-evaluation
        # We could re-enable with larger truth sets to evaluate background noise
        # But want to reimplement in a more general fashion as part of normalization
        if False:
            coverage_cnns = reduce(operator.add,
                                [_cnvkit_metrics(cnns, target_bed, antitarget_bed, cov_interval,
                                                    inputs + backgrounds)
                                    for cnns in tz.groupby("bam", raw_coverage_cnns).values()])
            background_cnn = cnvkit_background(_select_background_cnns(coverage_cnns),
                                                background_cnn, inputs, target_bed, antitarget_bed)
        else:
            coverage_cnns = raw_coverage_cnns
            background_cnn = cnvkit_background([x["file"] for x in coverage_cnns if x["itype"] == "background"],
                                                background_cnn, inputs, target_bed, antitarget_bed)
        parallel = {"type": "local", "cores": dd.get_cores(inputs[0]), "progs": ["cnvkit"]}
        fixed_cnrs = run_multicore(_cnvkit_fix,
                                   [(cnns, background_cnn, inputs, ckouts) for cnns in
                                    tz.groupby("bam", [x for x in coverage_cnns
                                                       if x["itype"] == "evaluate"]).values()],
                                   inputs[0]["config"], parallel)
        [_cnvkit_segment(cnr, cov_interval, data, inputs + backgrounds) for cnr, data in fixed_cnrs]
    return ckouts
开发者ID:chapmanb,项目名称:bcbio-nextgen,代码行数:52,代码来源:cnvkit.py


示例5: _run_cnvkit_shared

def _run_cnvkit_shared(inputs, backgrounds):
    """Shared functionality to run CNVkit, parallelizing over multiple BAM files.
    """
    work_dir = _sv_workdir(inputs[0])
    raw_work_dir = utils.safe_makedir(os.path.join(work_dir, "raw"))
    background_name = dd.get_sample_name(backgrounds[0]) if backgrounds else "flat"
    background_cnn = os.path.join(raw_work_dir, "%s_background.cnn" % (background_name))
    ckouts = []
    for cur_input in inputs:
        cur_raw_work_dir = utils.safe_makedir(os.path.join(_sv_workdir(cur_input), "raw"))
        out_base = _bam_to_outbase(dd.get_align_bam(cur_input), cur_raw_work_dir)
        ckouts.append({"cnr": "%s.cnr" % out_base,
                       "cns": "%s.cns" % out_base,
                       "back_cnn": background_cnn})
    if not utils.file_exists(ckouts[0]["cnr"]):
        cov_interval = dd.get_coverage_interval(inputs[0])
        raw_target_bed, access_bed = _get_target_access_files(cov_interval, inputs[0], work_dir)
        # bail out if we ended up with no regions
        if not utils.file_exists(raw_target_bed):
            return {}
        raw_target_bed = annotate.add_genes(raw_target_bed, inputs[0])
        parallel = {"type": "local", "cores": dd.get_cores(inputs[0]), "progs": ["cnvkit"]}
        pct_coverage = (pybedtools.BedTool(raw_target_bed).total_coverage() /
                        float(pybedtools.BedTool(access_bed).total_coverage())) * 100.0
        target_bed, antitarget_bed = _cnvkit_targets(raw_target_bed, access_bed, cov_interval,
                                                     pct_coverage, raw_work_dir, inputs[0])
        split_beds = _split_bed(target_bed, inputs[0]) + _split_bed(antitarget_bed, inputs[0])
        samples_to_run = zip(["background"] * len(backgrounds), backgrounds) + \
                         zip(["evaluate"] * len(inputs), inputs)
        split_cnns = run_multicore(_cnvkit_coverage,
                                   [(cdata, bed, itype) for itype, cdata in samples_to_run for bed in split_beds],
                                   inputs[0]["config"], parallel)
        raw_coverage_cnns = _merge_coverage(split_cnns, inputs[0])
        coverage_cnns = run_multicore(_cnvkit_metrics,
                                      [(cnns, target_bed, antitarget_bed, cov_interval, inputs + backgrounds)
                                       for cnns in tz.groupby("bam", raw_coverage_cnns).values()],
                                      inputs[0]["config"], parallel)
        background_cnn = _cnvkit_background(_select_background_cnns(coverage_cnns),
                                            background_cnn, target_bed, antitarget_bed, inputs[0])
        fixed_cnrs = run_multicore(_cnvkit_fix,
                                   [(cnns, background_cnn, inputs + backgrounds) for cnns in
                                    tz.groupby("bam", [x for x in coverage_cnns
                                                       if x["itype"] == "evaluate"]).values()],
                                      inputs[0]["config"], parallel)
        run_multicore(_cnvkit_segment,
                      [(cnr, cov_interval, data) for cnr, data in fixed_cnrs],
                      inputs[0]["config"], parallel)
    return ckouts
开发者ID:pansapiens,项目名称:bcbio-nextgen,代码行数:48,代码来源:cnvkit.py


示例6: split_next_and_previous_event_columns

    def split_next_and_previous_event_columns(self, requested_columns):
        """
        Split requested columns into columns that should load the next known
        value and columns that should load the previous known value.

        Parameters
        ----------
        requested_columns : iterable[BoundColumn]

        Returns
        -------
        next_cols, previous_cols : iterable[BoundColumn], iterable[BoundColumn]
            ``requested_columns``, partitioned into sub-sequences based on
            whether the column should produce values from the next event or the
            previous event
        """

        def next_or_previous(c):
            if c in self.next_value_columns:
                return "next"
            elif c in self.previous_value_columns:
                return "previous"

            raise ValueError("{c} not found in next_value_columns " "or previous_value_columns".format(c=c))

        groups = groupby(next_or_previous, requested_columns)
        return groups.get("next", ()), groups.get("previous", ())
开发者ID:quantopian,项目名称:zipline,代码行数:27,代码来源:events.py


示例7: diagnostic_yield

  def diagnostic_yield(self, metric='completeness', cutoff=1,
                       superblock_ids=None, group_id=None, sample_ids=None):
    """Calculate diagnostic yield."""
    # extract column to filter on
    metric_column = getattr(BlockData, metric)

    # set up the base query for all blocks
    total_query = self.total_count(BlockData)

    if superblock_ids:
      # apply the superblock filter on the Block class level
      total_query = total_query.join(BlockData.parent)\
                               .filter(Block.superblock_id.in_(superblock_ids))

    # extend base query to include only passed blocks
    pass_query = total_query.filter(metric_column >= cutoff)

    # optionally limit query
    queries = [limit_query(query, group=group_id, samples=sample_ids)
               for query in (total_query, pass_query)]

    # group multiple queries by sample ID (first column)
    metrics = groupby(get(0), concat(queries))

    # iterate over all values, concat different query results, and keep
    # only the unique values (excluding second sample_id)
    combined = (unique(concat(values)) for values in itervalues(metrics))

    # calculate diagnostic yield by simple division
    for sample_id, group_id, total, covered in combined:
      yield sample_id, group_id, (covered / total)
开发者ID:henrikstranneheim,项目名称:chanjo-report,代码行数:31,代码来源:core.py


示例8: pip_dict

def pip_dict():
	from pkg_resources import working_set
	from toolz import groupby
	first = lambda x: x[0].upper()
	ws = working_set.by_key
	WS = groupby(first, ws)
	return ws, WS
开发者ID:KGerring,项目名称:RevealMe,代码行数:7,代码来源:util.py


示例9: format_website

    def format_website(self):
        # jira category => website category mapping
        categories = {
            'New Feature': 'feature',
            'Improvement': 'feature',
            'Wish': 'feature',
            'Task': 'feature',
            'Test': 'bug',
            'Bug': 'bug',
            'Sub-task': 'feature'
        }
        titles = {
            'feature': 'New Features and Improvements',
            'bugfix': 'Bug Fixes'
        }

        issues_by_category = toolz.groupby(
            lambda issue: categories[issue.fields.issuetype.name],
            self.issues
        )

        out = StringIO()

        for category in ('feature', 'bug'):
            title = titles[category]
            issues = issues_by_category[category]
            issues.sort(key=lambda x: x.key)

            out.write(md('## {}\n\n', title))
            for issue in issues:
                link = md('[{0}]({1}/browse/{0})', issue.key, self.server)
                out.write(md('* {} - {}\n', link, issue.fields.summary))
            out.write('\n')

        return out.getvalue()
开发者ID:pcmoritz,项目名称:arrow,代码行数:35,代码来源:crossbow.py


示例10: compute

def compute(*args, **kwargs):
    """Compute several dask collections at once.

    Examples
    --------
    >>> import dask.array as da
    >>> a = da.arange(10, chunks=2).sum()
    >>> b = da.arange(10, chunks=2).mean()
    >>> compute(a, b)
    (45, 4.5)
    """
    groups = groupby(attrgetter('_optimize'), args)
    get = kwargs.pop('get', None) or _globals['get']

    if not get:
        get = args[0]._default_get
        if not all(a._default_get == get for a in args):
            raise ValueError("Compute called on multiple collections with "
                             "differing default schedulers. Please specify a "
                             "scheduler `get` function using either "
                             "the `get` kwarg or globally with `set_options`.")

    dsk = merge([opt(merge([v.dask for v in val]), [v._keys() for v in val])
                for opt, val in groups.items()])
    keys = [arg._keys() for arg in args]
    results = get(dsk, keys, **kwargs)
    return tuple(a._finalize(a, r) for a, r in zip(args, results))
开发者ID:hgz2373294,项目名称:dask,代码行数:27,代码来源:base.py


示例11: _get_subnet_config_w_cidr

    def _get_subnet_config_w_cidr(self, network_config):
        network_cidr_base = str(network_config.get('network_cidr_base', '172.16.0.0'))
        network_cidr_size = str(network_config.get('network_cidr_size', '20'))
        first_network_address_block = str(network_config.get('first_network_address_block', network_cidr_base))

        ret_val = {}
        base_cidr = network_cidr_base + '/' + network_cidr_size
        net = netaddr.IPNetwork(base_cidr)

        grouped_subnet = groupby('size', self._get_subnet_config_w_az(network_config))
        subnet_groups = sorted(grouped_subnet.items())
        available_cidrs = []

        for subnet_size, subnet_configs in subnet_groups:
            newcidrs = net.subnet(int(subnet_size))

            for subnet_config in subnet_configs:
                try:
                    cidr = newcidrs.next()
                except StopIteration as e:
                    net = chain(*reversed(available_cidrs)).next()
                    newcidrs = net.subnet(int(subnet_size))
                    cidr = newcidrs.next()

                new_config = assoc(subnet_config, 'cidr', str(cidr))
                yield new_config
            else:
                net = newcidrs.next()
                available_cidrs.append(newcidrs)
开发者ID:stocktwits,项目名称:cloudformation-environmentbase,代码行数:29,代码来源:base_network.py


示例12: load_adjusted_array

 def load_adjusted_array(self, columns, dates, assets, mask):
     return merge(
         self.pool.imap_unordered(
             partial(self._load_dataset, dates, assets, mask),
             itervalues(groupby(getdataset, columns)),
         ),
     )
开发者ID:jeyoor,项目名称:zipline,代码行数:7,代码来源:core.py


示例13: load_adjusted_array

 def load_adjusted_array(self, columns, dates, assets, mask):
     return dict(
         concat(map(
             partial(self._load_dataset, dates, assets, mask),
             itervalues(groupby(getdataset, columns))
         ))
     )
开发者ID:MattMing,项目名称:zipline,代码行数:7,代码来源:core.py


示例14: scatter_to_workers

def scatter_to_workers(center, ncores, data, key=None):
    """ Scatter data directly to workers

    This distributes data in a round-robin fashion to a set of workers based on
    how many cores they have.  ncores should be a dictionary mapping worker
    identities to numbers of cores.

    See scatter for parameter docstring
    """
    center = coerce_to_rpc(center)
    if key is None:
        key = str(uuid.uuid1())

    if isinstance(ncores, Iterable) and not isinstance(ncores, dict):
        ncores = {worker: 1 for worker in ncores}

    workers = list(concat([w] * nc for w, nc in ncores.items()))
    if isinstance(data, dict):
        names, data = list(zip(*data.items()))
    else:
        names = ("%s-%d" % (key, i) for i in count(0))

    L = list(zip(cycle(workers), names, data))
    d = groupby(0, L)
    d = {k: {b: c for a, b, c in v} for k, v in d.items()}

    yield [rpc(ip=w_ip, port=w_port).update_data(data=v, close=True) for (w_ip, w_port), v in d.items()]

    result = [RemoteData(b, center.ip, center.port, result=c) for a, b, c in L]

    raise Return(result)
开发者ID:thrasibule,项目名称:distributed,代码行数:31,代码来源:client.py


示例15: set_params

 def set_params(self, **params):
     d = groupby(0, [(k.split('__')[0], k.split('__', 1)[1], v)
                     for k, v in params.items()])
     d = {k: {a: b for _, a, b in v} for k, v in d.items()}
     steps = [(name, set_params(est, **d[name]) if name in d else est)
              for name, est in self.steps]
     return Pipeline(steps)
开发者ID:konggas,项目名称:dasklearn,代码行数:7,代码来源:pipeline.py


示例16: persist

    def persist(self, collections):
        """ Persist dask collections on cluster

        Starts computation of the collection on the cluster in the background.
        Provides a new dask collection that is semantically identical to the
        previous one, but now based off of futures currently in execution.

        Parameters
        ----------
        collections: sequence or single dask object
            Collections like dask.array or dataframe or dask.value objects

        Returns
        -------
        List of collections, or single collection, depending on type of input.

        Examples
        --------
        >>> xx = executor.persist(x)  # doctest: +SKIP
        >>> xx, yy = executor.persist([x, y])  # doctest: +SKIP

        See Also
        --------
        Executor.compute
        """
        if isinstance(collections, (tuple, list, set, frozenset)):
            singleton = False
        else:
            singleton = True
            collections = [collections]

        assert all(isinstance(c, Base) for c in collections)

        groups = groupby(lambda x: x._optimize, collections)
        dsk = merge([opt(merge([v.dask for v in val]),
                         [v._keys() for v in val])
                    for opt, val in groups.items()])

        d = {k: unpack_remotedata(v) for k, v in dsk.items()}
        dsk2 = {k: v[0] for k, v in d.items()}
        dependencies = {k: v[1] for k, v in d.items()}

        for k, v in dsk2.items():
            dependencies[k] |= set(_deps(dsk, v))

        names = list({k for c in collections for k in flatten(c._keys())})

        self._send_to_scheduler({'op': 'update-graph',
                                 'tasks': valmap(dumps_task, dsk2),
                                 'dependencies': dependencies,
                                 'keys': names,
                                 'client': self.id})
        result = [redict_collection(c, {k: Future(k, self)
                                        for k in flatten(c._keys())})
                for c in collections]
        if singleton:
            return first(result)
        else:
            return result
开发者ID:canavandl,项目名称:distributed,代码行数:59,代码来源:executor.py


示例17: scatter_to_workers

def scatter_to_workers(ncores, data, report=True, serialize=True):
    """ Scatter data directly to workers

    This distributes data in a round-robin fashion to a set of workers based on
    how many cores they have.  ncores should be a dictionary mapping worker
    identities to numbers of cores.

    See scatter for parameter docstring
    """
    if isinstance(ncores, Iterable) and not isinstance(ncores, dict):
        k = len(data) // len(ncores)
        ncores = {coerce_to_address(worker): k for worker in ncores}

    workers = list(concat([w] * nc for w, nc in ncores.items()))
    if isinstance(data, dict):
        names, data = list(zip(*data.items()))
    else:
        names = []
        for x in data:
            try:
                names.append(tokenize(x))
            except:
                names.append(str(uuid.uuid1()))

    worker_iter = drop(_round_robin_counter[0] % len(workers), cycle(workers))
    _round_robin_counter[0] += len(data)

    L = list(zip(worker_iter, names, data))
    d = groupby(0, L)
    d = {worker: {key: dumps(value) if serialize else value
                   for _, key, value in v}
          for worker, v in d.items()}

    rpcs = {addr: rpc(addr) for addr in d}
    try:
        out = yield All([rpcs[address].update_data(data=v,
                                                 close=True, report=report)
                     for address, v in d.items()])
    finally:
        for r in rpcs.values():
            r.close_rpc()
    nbytes = merge(o['nbytes'] for o in out)

    who_has = {k: [w for w, _, _ in v] for k, v in groupby(1, L).items()}

    raise Return((names, who_has, nbytes))
开发者ID:dask,项目名称:distributed,代码行数:46,代码来源:utils_comm.py


示例18: group_by

def group_by(df, s):
#  s = re.sub("\s*", "", s).split(',')
  keys = zip(*select(df, s).values())
  y = zip(keys, range(10000))
  grouped_keys = groupby(itemgetter(0), y)
  for g in grouped_keys:
    index = g[1]
    yield filter_index(df, index)
开发者ID:lucusadv,项目名称:pynancier,代码行数:8,代码来源:dict_tidy.py


示例19: compute

    def compute(self, *args, **kwargs):
        """ Compute dask collections on cluster

        Parameters
        ----------
        args: iterable of dask objects
            Collections like dask.array or dataframe or dask.value objects
        sync: bool (optional)
            Returns Futures if False (default) or concrete values if True

        Returns
        -------
        Tuple of Futures or concrete values

        Examples
        --------

        >>> from dask import do, value
        >>> from operator import add
        >>> x = dask.do(add)(1, 2)
        >>> y = dask.do(add)(x, x)
        >>> xx, yy = executor.compute(x, y)  # doctest: +SKIP
        >>> xx  # doctest: +SKIP
        <Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e>
        >>> xx.result()  # doctest: +SKIP
        3
        >>> yy.result()  # doctest: +SKIP
        6
        """
        sync = kwargs.pop('sync', False)
        assert not kwargs
        if sync:
            return dask.compute(*args, get=self.get)

        variables = [a for a in args if isinstance(a, Base)]

        groups = groupby(lambda x: x._optimize, variables)
        dsk = merge([opt(merge([v.dask for v in val]),
                         [v._keys() for v in val])
                    for opt, val in groups.items()])
        names = ['finalize-%s' % tokenize(v) for v in variables]
        dsk2 = {name: (v._finalize, v, v._keys()) for name, v in zip(names, variables)}

        self.loop.add_callback(self.scheduler_queue.put_nowait,
                                {'op': 'update-graph',
                                'dsk': merge(dsk, dsk2),
                                'keys': names})

        i = 0
        futures = []
        for arg in args:
            if isinstance(arg, Base):
                futures.append(Future(names[i], self))
                i += 1
            else:
                futures.append(arg)

        return futures
开发者ID:aterrel,项目名称:distributed,代码行数:58,代码来源:executor.py


示例20: partition

def partition(grouper, sequence, npartitions, p, nelements=2**20):
    """ Partition a bag along a grouper, store partitions on disk """
    for block in partition_all(nelements, sequence):
        d = groupby(grouper, block)
        d2 = defaultdict(list)
        for k, v in d.items():
            d2[abs(hash(k)) % npartitions].extend(v)
        p.append(d2)
    return p
开发者ID:kerrywatson1,项目名称:dask,代码行数:9,代码来源:core.py



注:本文中的toolz.groupby函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python toolz.isdistinct函数代码示例发布时间:2022-05-27
下一篇:
Python toolz.get_in函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap