Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
425 views
in Technique[技术] by (71.8m points)

python - Constructing Mode and Corresponding Count Functions Using Custom Aggregation Functions for GroupBy in Dask

So dask has now been updated to support custom aggregation functions for groupby. (Thanks to the dev team and @chmp for working on this!). I am currently trying to construct a mode function and corresponding count function. Basically what I envision is that mode returns a list, for each grouping, of the most common values for a specific column (ie. [4, 1, 2]). Additionally, there is a corresponding count function that returns the number of instances of those values, ie. 3.

Now I am currently trying to implement this in code. As per the groupby.py file, the parameters for custom aggregations are as follows:

Parameters
    ----------
    name : str
        the name of the aggregation. It should be unique, since intermediate
        result will be identified by this name.
    chunk : callable
        a function that will be called with the grouped column of each
        partition. It can either return a single series or a tuple of series.
        The index has to be equal to the groups.
    agg : callable
        a function that will be called to aggregate the results of each chunk.
        Again the argument(s) will be grouped series. If ``chunk`` returned a
        tuple, ``agg`` will be called with all of them as individual positional
        arguments.
    finalize : callable
        an optional finalizer that will be called with the results from the
        aggregation.

Here is the provided code for mean:

    custom_mean = dd.Aggregation(
        'custom_mean',
        lambda s: (s.count(), s.sum()),
        lambda count, sum: (count.sum(), sum.sum()),
        lambda count, sum: sum / count,
    )
    df.groupby('g').agg(custom_mean)

I am trying to think of the best way to do this. Currently I have the following functions:

def custom_count(x):
    count = Counter(x)
    freq_list = count.values()
    max_cnt = max(freq_list)
    total = freq_list.count(max_cnt)
    return count.most_common(total)

custom_mode = dd.Aggregation(
    'custom_mode',
    lambda s: custom_count(s),
    lambda s1: s1.extend(),
    lambda s2: ......
)

However I am getting stuck on understanding how exactly the agg part should be working. Any help on this problem would be appreciated.

Thanks!

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Admittedly, the docs are currently somewhat light on detail. Thanks for bringing this issue to my attention. Please let me now if this answer helps and I will contribute an updated version of the docs to dask.

To your question: for a single return value, the different steps of the aggregation are equivalent to:

res = chunk(df.groupby('g')['col'])
res = agg(res.groupby(level=[0]))
res = finalize(res)

In these terms, the mode function could be implemented as follows:

def chunk(s):
    # for the comments, assume only a single grouping column, the 
    # implementation can handle multiple group columns.
    #
    # s is a grouped series. value_counts creates a multi-series like 
    # (group, value): count
    return s.value_counts()


def agg(s):
    # s is a grouped multi-index series. In .apply the full sub-df will passed
    # multi-index and all. Group on the value level and sum the counts. The
    # result of the lambda function is a series. Therefore, the result of the 
    # apply is a multi-index series like (group, value): count
    return s.apply(lambda s: s.groupby(level=-1).sum())

    # faster version using pandas internals
    s = s._selected_obj
    return s.groupby(level=list(range(s.index.nlevels))).sum()


def finalize(s):
    # s is a multi-index series of the form (group, value): count. First
    # manually group on the group part of the index. The lambda will receive a
    # sub-series with multi index. Next, drop the group part from the index.
    # Finally, determine the index with the maximum value, i.e., the mode.
    level = list(range(s.index.nlevels - 1))
    return (
        s.groupby(level=level)
        .apply(lambda s: s.reset_index(level=level, drop=True).argmax())
    )

mode = dd.Aggregation('mode', chunk, agg, finalize)

Note, that this implementation does not match the dataframe .mode function in case of ties. This version will return one of the values in case of a tie, instead of all values.

The mode aggregation can now be used as in

import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame({
    'col': [0, 1, 1, 2, 3] * 10,
    'g0': [0, 0, 0, 1, 1] * 10,
    'g1': [0, 0, 0, 1, 1] * 10,
})
ddf = dd.from_pandas(df, npartitions=10)

res = ddf.groupby(['g0', 'g1']).agg({'col': mode}).compute()
print(res)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...