Admittedly, the docs are currently somewhat light on detail. Thanks for bringing this issue to my attention. Please let me now if this answer helps and I will contribute an updated version of the docs to dask.
To your question: for a single return value, the different steps of the aggregation are equivalent to:
res = chunk(df.groupby('g')['col'])
res = agg(res.groupby(level=[0]))
res = finalize(res)
In these terms, the mode function could be implemented as follows:
def chunk(s):
# for the comments, assume only a single grouping column, the
# implementation can handle multiple group columns.
#
# s is a grouped series. value_counts creates a multi-series like
# (group, value): count
return s.value_counts()
def agg(s):
# s is a grouped multi-index series. In .apply the full sub-df will passed
# multi-index and all. Group on the value level and sum the counts. The
# result of the lambda function is a series. Therefore, the result of the
# apply is a multi-index series like (group, value): count
return s.apply(lambda s: s.groupby(level=-1).sum())
# faster version using pandas internals
s = s._selected_obj
return s.groupby(level=list(range(s.index.nlevels))).sum()
def finalize(s):
# s is a multi-index series of the form (group, value): count. First
# manually group on the group part of the index. The lambda will receive a
# sub-series with multi index. Next, drop the group part from the index.
# Finally, determine the index with the maximum value, i.e., the mode.
level = list(range(s.index.nlevels - 1))
return (
s.groupby(level=level)
.apply(lambda s: s.reset_index(level=level, drop=True).argmax())
)
mode = dd.Aggregation('mode', chunk, agg, finalize)
Note, that this implementation does not match the dataframe .mode
function in case of ties. This version will return one of the values in case of a tie, instead of all values.
The mode aggregation can now be used as in
import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame({
'col': [0, 1, 1, 2, 3] * 10,
'g0': [0, 0, 0, 1, 1] * 10,
'g1': [0, 0, 0, 1, 1] * 10,
})
ddf = dd.from_pandas(df, npartitions=10)
res = ddf.groupby(['g0', 'g1']).agg({'col': mode}).compute()
print(res)