I have a flow that looks a bit like this;
# prefect-community
j
I have a flow that looks a bit like this;
Copy code
as flow:

    gcs_uri = Parameter(name="gcs_uri", required=True)

    cleaned = clean(gcs_uri=gcs_uri)

    removal = remove_existing(cleaned)
    insertion = insert(cleaned)

    flow.set_dependencies(insertion, upstream_tasks=[removal])
loads a file from storage into a pandas dataframe - does some cleaning of that data frame - inserts to a database - the
clean
task succeeds - and
cleaned
is passed to
remove_existing
which looks like;
Copy code
@task
def remove_existing(cleaned):

    logger = prefect.context.get("logger")

    min = cleaned.some_col.min()

    ...
When this task runs I get (occasionally - and if I rerun the flow with same input it doesn't happen again);
Copy code
min = cleaned.some_col.min()
AttributeError: 'NoneType' object has no attribute 'some_col'
How can that argument cleaned be None when the task proceeding it succeeded and returned a value?
Is there some prefect magic behind the scenes around the argument passing that could go wrong?
k
There have been some people that mentioned this but noone has ever gotten back to me. What is the return of
cleaned
? You could DM me the code of that task
j
Its a pandas dataframe - can't share the code really but it reads a file from gcs - parses it into a dataframe does some simple cleaning and returns the dataframe - I have added logging to the clean function and at the end of that function the return value is not None
but then it is in the task which the dataframe is passed to
k
Ah ok no worries, is the execution on something like Dask? or Local?
j
its happening on k8s
k
LocalExecutor on KubernetesRun?
j
Hey so I am seeing this occur quite frequently now - really no idea how to debug it...In answer to @Kevin Kho question (sorry didn't reply sooner) I am using
KubernetesRun
the TLDR of this is that previous task succeeds - the value of the parameter to the following task (return value of proceeding) is None - but the previous task cannot return None
k
If I have to guess, this is something like Kubernetes is resubmitting a failed job but the object is no longer in memory? Would you have a clue is the original process is running out of memory? You can also try configuring
Results
so that it’s persisted something. I think Kubernetes is just retrying directly? WThere really is no magic is you are not using Dask. If I have to guess, I think the process is actually failing and potentially restarting through Kubernetes restarts?
I think the pod logs might be helpful to look at
j
@Kevin Kho thanks very much - I shall try to see if there are any correlations with pod failures!