Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
345 views
in Technique[技术] by (71.8m points)

python - simple dask map_partitions example

I read the following SO thead and now am trying to understand it. Here is my example:

import dask.dataframe as dd
import pandas as pd
from dask.multiprocessing import get
import random

df = pd.DataFrame({'col_1':random.sample(range(10000), 10000), 'col_2': random.sample(range(10000), 10000) })

def test_f(col_1, col_2):
    return col_1*col_2

ddf = dd.from_pandas(df, npartitions=8)

ddf['result'] = ddf.map_partitions(test_f, columns=['col_1', 'col_2']).compute(get=get)

It generates the following error below. What am I doing wrong? Also I am not clear how to pass additional parameters to function in map_partitions?

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
~AppDataLocalcondacondaenvsensorflowlibsite-packagesdaskdataframeutils.py in raise_on_meta_error(funcname)
    136     try:
--> 137         yield
    138     except Exception as e:

~AppDataLocalcondacondaenvsensorflowlibsite-packagesdaskdataframecore.py in _emulate(func, *args, **kwargs)
   3130     with raise_on_meta_error(funcname(func)):
-> 3131         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   3132 

TypeError: test_f() got an unexpected keyword argument 'columns'

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
<ipython-input-9-913789c7326c> in <module>()
----> 1 ddf['result'] = ddf.map_partitions(test_f, columns=['col_1', 'col_2']).compute(get=get)

~AppDataLocalcondacondaenvsensorflowlibsite-packagesdaskdataframecore.py in map_partitions(self, func, *args, **kwargs)
    469         >>> ddf.map_partitions(func).clear_divisions()  # doctest: +SKIP
    470         """
--> 471         return map_partitions(func, self, *args, **kwargs)
    472 
    473     @insert_meta_param_description(pad=12)

~AppDataLocalcondacondaenvsensorflowlibsite-packagesdaskdataframecore.py in map_partitions(func, *args, **kwargs)
   3163 
   3164     if meta is no_default:
-> 3165         meta = _emulate(func, *args, **kwargs)
   3166 
   3167     if all(isinstance(arg, Scalar) for arg in args):

~AppDataLocalcondacondaenvsensorflowlibsite-packagesdaskdataframecore.py in _emulate(func, *args, **kwargs)
   3129     """
   3130     with raise_on_meta_error(funcname(func)):
-> 3131         return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
   3132 
   3133 

~AppDataLocalcondacondaenvsensorflowlibcontextlib.py in __exit__(self, type, value, traceback)
     75                 value = type()
     76             try:
---> 77                 self.gen.throw(type, value, traceback)
     78             except StopIteration as exc:
     79                 # Suppress StopIteration *unless* it's the same exception that

~AppDataLocalcondacondaenvsensorflowlibsite-packagesdaskdataframeutils.py in raise_on_meta_error(funcname)
    148                ).format(" in `{0}`".format(funcname) if funcname else "",
    149                         repr(e), tb)
--> 150         raise ValueError(msg)
    151 
    152 

ValueError: Metadata inference failed in `test_f`.

Original error is below:
------------------------
TypeError("test_f() got an unexpected keyword argument 'columns'",)

Traceback:
---------
  File "C:Userssome_userAppDataLocalcondacondaenvsensorflowlibsite-packagesdaskdataframeutils.py", line 137, in raise_on_meta_error
    yield
  File "C:Userssome_userAppDataLocalcondacondaenvsensorflowlibsite-packagesdaskdataframecore.py", line 3131, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

There is an example in map_partitions docs to achieve exactly what are trying to do:

ddf.map_partitions(lambda df: df.assign(z=df.x * df.y))

When you call map_partitions (just like when you call .apply() on pandas.DataFrame), the function that you try to map (or apply) will be given dataframe as a first argument.

In case of dask.dataframe.map_partitions this first argument will be a partition and in case of pandas.DataFrame.apply - a whole dataframe.

Which means that your function has to accept dataframe(partition) as a first argument and and in your case could look like this:

def test_f(df, col_1, col_2):
    return df.assign(result=df[col_1] * df[col_2])

Note that assignment of a new column in this case happens (i.e. gets scheduled to happen) BEFORE you call .compute().

In your example you assign column AFTER you call .compute(), which kind of defeats the purpose of using dask. I.e. after you call .compute() the results of that operation are loaded into memory if there is enough space for those results (if not you just get MemoryError).

So for you example to work you could:

1) Use function (with column names as arguments):

def test_f(df, col_1, col_2):
    return df.assign(result=df[col_1] * df[col_2])


ddf_out = ddf.map_partitions(test_f, 'col_1', 'col_2')

# Here is good place to do something with BIG ddf_out dataframe before calling .compute()

result = ddf_out.compute(get=get)  # Will load the whole dataframe into memory

2) Use lambda (with column names hardcoded in the function):

ddf_out = ddf.map_partitions(lambda df: df.assign(result=df.col_1 * df.col_2))

# Here is good place to do something with BIG ddf_out dataframe before calling .compute()

result = ddf_out.compute(get=get)  # Will load the whole dataframe into memory

Update:

To apply function on a row-by-row basis, here is a quote from the post you linked:

map / apply

You can map a function row-wise across a series with map

df.mycolumn.map(func)

You can map a function row-wise across a dataframe with apply

df.apply(func, axis=1)

I.e. for the example function in your question, it might look like this:

def test_f(dds, col_1, col_2):
    return dds[col_1] * dds[col_2]

Since you will be applying it on a row-by-row basis the function's first argument will be a series (i.e. each row of a dataframe is a series).

To apply this function then you might call it like this:

dds_out = ddf.apply(
    test_f, 
    args=('col_1', 'col_2'), 
    axis=1, 
    meta=('result', int)
).compute(get=get)

This will return a series named 'result'.

I guess you could also call .apply on each partition with a function but it does not look to be any more efficient then calling .apply on dataframe directly. But may be your tests will prove otherwise.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
...