Dekel R

    Dekel R

    3 months ago
    Hey, I have a flow that generates a big pandas DataFrame (about 19M rows and 13 cols) and then saves it to Google BigQuery/Google storage.. etc (There are multiple tasks - but for this question I’m gonna simplify the flow). Task_a returns a df - I log the shape of it (at the last line of task_a) and get the 19M and 13 cols df - then this output gets sent to the next task - Task_b - I log the type and shape in the first row of this task - and its a Nonetype. Anyone ever had such an issue? (I debugged it locally and I cant reproduce the issue - I only get it once processing a lot of data - maybe its the issue here?) Example -
    @task()
    def task_a():
       df = pd.Dataframe()
       ......
       <http://logger.info|logger.info>(df.shape) # 19M and 13 cols
       return df
    
    @task()
    def task_b(df):
       <http://logger.info|logger.info>(type(df)) # Nonetype
    
    with Flow('****',
              storage=Docker(registry_url="us-central1-docker.pkg.dev****/",
                             dockerfile="./Dockerfile"), executor=LocalDaskExecutor(scheduler="processes")) as flow: #
          df = task_a()
          task_b(df)
    Kevin Kho

    Kevin Kho

    3 months ago
    I have not seen something like that happen. Does it happen on smaller data too?
    Dekel R

    Dekel R

    3 months ago
    Never saw it too - and it works just fine on smaller data (thousands of rows and more). That’s why I can’t reproduce it locally and I didn’t notice the issue until the flow analyzed larger amounts of data… I also can’t see any logs (running with LocalDaskExecutor and both logzio logger and Prefect logger - none works with this executer - please see my other message at the channel that I posted yesterday)
    Kevin Kho

    Kevin Kho

    3 months ago
    Why are you tied to processes? Is threads an options for LocalDaskExecutor?
    Dekel R

    Dekel R

    3 months ago
    I’m extracting data from millions of HTML files, so I think processes are the solution here. For now this solution (running with VertexRun and LocalDaskExecutor) is good enough but still takes hours - so moving to threads isn’t a possibility.
    I’m trying to save the df at the same task in which it’s still not a NoneType - I will update how it goes.
    Updating - I saved the df as part of task_a and it works fine - a parquet file of about 300MB. I pass the same frame to the next task, and it’s a NoneType… Any idea of what’s wrong here?
    I also tried reading this object, using cloudpickle on it and then loading it with pickle and it works fine too.
    df = pd.read_parquet(....)
    pickled_df = cloudpickle.dumps(df)
    depickled_df = pickle.loads(pickled_df)
    Kevin Kho

    Kevin Kho

    3 months ago
    Will try to replicate this
    Dekel R

    Dekel R

    3 months ago
    So just that you have all the details - I’m running on Prefect cloud, VertexRun and LocalDaskExecutor. The object I’m passing is a pandas DataFrame of about 19M rows and 13 cols - mostly of strings. When I save it as parquet its about 300MB of data. When I read this object and pass it between tasks on a local run (without a LocalDaskExecutor) it works fine and doesn’t turn into Nonetype. Thx
    Kevin Kho

    Kevin Kho

    3 months ago
    Ah I need dask version
    Dekel R

    Dekel R

    3 months ago
    How do I check the Dask version? I never configured it
    If that helps - I’m running Prefect 0.15.10 (core version of the agents)
    Kevin Kho

    Kevin Kho

    3 months ago
    pip show dask
    Dekel R

    Dekel R

    3 months ago
    Checking it (waiting to get permissions for Prefect’s machine). In the meantime I think I found the issue - when looking at VertexAI custom jobs I get a log of OOM. In Prefect I get “NoneType object” - so it might be just a bad reflection of the VertexAI custom train process…. Checking it now with a bigger machine - but anyway I think the log should reflect the state of the Vertex AI custom job. I will update. Thx
    Kevin Kho

    Kevin Kho

    3 months ago
    Thanks for the update. I didn’t get a chance to dig yet on this one