I have a plain CSV dataset of several GB size and looking to filter it on my local machine which doesn't provide enough memory to simply use Pandas.
So I'm trying to use Dask but I have plenty of issues on how to actually use it and in this particular case I end up with some secondary process which eats up my memory;
i.e. I get a warning (the actual process memory number grows over time)
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 6.88 GB -- Worker memory limit: 2.00 GB
Example code
The data is a simple CSV with few but long columns adding up to few tens of GB.
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client
def todate(time_in_seconds, relative_to: str = "2019-10-31T12") -> pd.Timestamp:
"""Return time relative to specified date."""
return pd.Timestamp(relative_to) + pd.to_timedelta(time_in_seconds, "s")
client = Client(
n_workers=1,
threads_per_worker=4,
processes=False,
memory_limit="2GB",
)
ddf = dd.read_csv(
path_to_dataset,
names=["id", "time"],
usecols=[0, 5],
dtype={"id": np.uint32, "time": np.int32},
)
date_col = ddf.map_partitions(lambda x: todate(x.time.values))
ddf = ddf.assign(time=date_col)
ddf.compute()
Question
So I just don't really understand why it is not sticking to the 2GB memory limit?
I don't seem to be able to provide a chunk size for the read_csv
function?
I assumed that because I can't set a limit, that Dask would chunk it by itself based on the allowed memory of the Client?
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…