Joshua Greenhalgh
06/24/2022, 11:29 AMas flow:
gcs_uri = Parameter(name="gcs_uri", required=True)
cleaned = clean(gcs_uri=gcs_uri)
removal = remove_existing(cleaned)
insertion = insert(cleaned)
flow.set_dependencies(insertion, upstream_tasks=[removal])
loads a file from storage into a pandas dataframe - does some cleaning of that data frame - inserts to a database - the clean
task succeeds - and cleaned
is passed to remove_existing
which looks like;
@task
def remove_existing(cleaned):
logger = prefect.context.get("logger")
min = cleaned.some_col.min()
...
When this task runs I get (occasionally - and if I rerun the flow with same input it doesn't happen again);
min = cleaned.some_col.min()
AttributeError: 'NoneType' object has no attribute 'some_col'
How can that argument cleaned be None when the task proceeding it succeeded and returned a value?Kevin Kho
cleaned
? You could DM me the code of that taskJoshua Greenhalgh
06/24/2022, 2:25 PMKevin Kho
Joshua Greenhalgh
06/24/2022, 2:28 PMKevin Kho
Joshua Greenhalgh
07/04/2022, 1:36 PMKubernetesRun
Kevin Kho
Results
so that it’s persisted something. I think Kubernetes is just retrying directly?
WThere really is no magic is you are not using Dask. If I have to guess, I think the process is actually failing and potentially restarting through Kubernetes restarts?Joshua Greenhalgh
07/04/2022, 8:03 PM