Hey, I have a flow that generates a big pandas Dat...
# prefect-community
d
Hey, I have a flow that generates a big pandas DataFrame (about 19M rows and 13 cols) and then saves it to Google BigQuery/Google storage.. etc (There are multiple tasks - but for this question I’m gonna simplify the flow). Task_a returns a df - I log the shape of it (at the last line of task_a) and get the 19M and 13 cols df - then this output gets sent to the next task - Task_b - I log the type and shape in the first row of this task - and its a Nonetype. Anyone ever had such an issue? (I debugged it locally and I cant reproduce the issue - I only get it once processing a lot of data - maybe its the issue here?) Example -
Copy code
@task()
def task_a():
   df = pd.Dataframe()
   ......
   <http://logger.info|logger.info>(df.shape) # 19M and 13 cols
   return df

@task()
def task_b(df):
   <http://logger.info|logger.info>(type(df)) # Nonetype

with Flow('****',
          storage=Docker(registry_url="us-central1-docker.pkg.dev****/",
                         dockerfile="./Dockerfile"), executor=LocalDaskExecutor(scheduler="processes")) as flow: #
      df = task_a()
      task_b(df)
k
I have not seen something like that happen. Does it happen on smaller data too?
d
Never saw it too - and it works just fine on smaller data (thousands of rows and more). That’s why I can’t reproduce it locally and I didn’t notice the issue until the flow analyzed larger amounts of data… I also can’t see any logs (running with LocalDaskExecutor and both logzio logger and Prefect logger - none works with this executer - please see my other message at the channel that I posted yesterday)
k
Why are you tied to processes? Is threads an options for LocalDaskExecutor?
d
I’m extracting data from millions of HTML files, so I think processes are the solution here. For now this solution (running with VertexRun and LocalDaskExecutor) is good enough but still takes hours - so moving to threads isn’t a possibility.
I’m trying to save the df at the same task in which it’s still not a NoneType - I will update how it goes.
Updating - I saved the df as part of task_a and it works fine - a parquet file of about 300MB. I pass the same frame to the next task, and it’s a NoneType… Any idea of what’s wrong here?
I also tried reading this object, using cloudpickle on it and then loading it with pickle and it works fine too.
Copy code
df = pd.read_parquet(....)
pickled_df = cloudpickle.dumps(df)
depickled_df = pickle.loads(pickled_df)
k
Will try to replicate this
d
So just that you have all the details - I’m running on Prefect cloud, VertexRun and LocalDaskExecutor. The object I’m passing is a pandas DataFrame of about 19M rows and 13 cols - mostly of strings. When I save it as parquet its about 300MB of data. When I read this object and pass it between tasks on a local run (without a LocalDaskExecutor) it works fine and doesn’t turn into Nonetype. Thx
k
Ah I need dask version
d
How do I check the Dask version? I never configured it
If that helps - I’m running Prefect 0.15.10 (core version of the agents)
k
pip show dask
d
Checking it (waiting to get permissions for Prefect’s machine). In the meantime I think I found the issue - when looking at VertexAI custom jobs I get a log of OOM. In Prefect I get “NoneType object” - so it might be just a bad reflection of the VertexAI custom train process…. Checking it now with a bigger machine - but anyway I think the log should reflect the state of the Vertex AI custom job. I will update. Thx
k
Thanks for the update. I didn’t get a chance to dig yet on this one