Joshua Greenhalgh06/24/2022, 11:29 AM
loads a file from storage into a pandas dataframe - does some cleaning of that data frame - inserts to a database - the
as flow: gcs_uri = Parameter(name="gcs_uri", required=True) cleaned = clean(gcs_uri=gcs_uri) removal = remove_existing(cleaned) insertion = insert(cleaned) flow.set_dependencies(insertion, upstream_tasks=[removal])
task succeeds - and
is passed to
which looks like;
When this task runs I get (occasionally - and if I rerun the flow with same input it doesn't happen again);
@task def remove_existing(cleaned): logger = prefect.context.get("logger") min = cleaned.some_col.min() ...
How can that argument cleaned be None when the task proceeding it succeeded and returned a value?
min = cleaned.some_col.min() AttributeError: 'NoneType' object has no attribute 'some_col'
Kevin Kho06/24/2022, 2:04 PM
? You could DM me the code of that task
Joshua Greenhalgh06/24/2022, 2:25 PM
Kevin Kho06/24/2022, 2:26 PM
Joshua Greenhalgh06/24/2022, 2:28 PM
Kevin Kho06/24/2022, 2:28 PM
Joshua Greenhalgh07/04/2022, 1:36 PM
Kevin Kho07/04/2022, 5:53 PM
so that it’s persisted something. I think Kubernetes is just retrying directly? WThere really is no magic is you are not using Dask. If I have to guess, I think the process is actually failing and potentially restarting through Kubernetes restarts?
Joshua Greenhalgh07/04/2022, 8:03 PM