Marwan Sarieddine
08/20/2020, 2:23 PMPREFECT_FLOWS_CHECKPOINTING
and I use a pd.DataFrame
as a parameter - I am using prefect v0.13.3
export PREFECT_FLOWS_CHECKPOINTING=true
In [1]: import pandas as pd
In [2]: from prefect import Parameter, task, Flow
In [3]: @task
...: def simple(df):
...: return df
...:
In [4]: with Flow("test") as flow:
...: df = Parameter(name="df")
...: simple(df)
...: flow.run(df=pd.DataFrame())
[2020-08-20 14:20:38] ERROR - prefect.TaskRunner | Unexpected error: TypeError('Object of type DataFrame is not JSON serializable')
Traceback (most recent call last):
File "~/.pyenv/versions/3.7.7/envs/infima/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "~/.pyenv/versions/3.7.7/envs/infima/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 838, in get_task_run_state
result = self.result.write(value, **formatting_kwargs,)
File "~/.pyenv/versions/3.7.7/envs/infima/lib/python3.7/site-packages/prefect/engine/results/prefect_result.py", line 60, in write
new.location = self.serializer.serialize(new.value).decode("utf-8")
File "~/.pyenv/versions/3.7.7/envs/infima/lib/python3.7/site-packages/prefect/engine/serializers.py", line 100, in serialize
return json.dumps(value).encode()
File "~/.pyenv/versions/3.7.7/lib/python3.7/json/__init__.py", line 231, in dumps
return _default_encoder.encode(obj)
File "~/.pyenv/versions/3.7.7/lib/python3.7/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "~/.pyenv/versions/3.7.7/lib/python3.7/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "~/.pyenv/versions/3.7.7/lib/python3.7/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type DataFrame is not JSON serializable
Out[4]: <Failed: "Some reference tasks failed.">
when I disable PREFECT_FLOWS_CHECKPOINTING
flow runs successfully
export PREFECT_FLOWS_CHECKPOINTING=false
In [1]: import pandas as pd
In [2]: from prefect import Parameter, task, Flow
In [3]: @task
...: def simple(df):
...: return df
...:
In [4]: with Flow("test") as flow:
...: df = Parameter(name="df")
...: simple(df)
...: flow.run(df=pd.DataFrame())
Out[4]: <Success: "All reference tasks succeeded.">
Dylan
08/20/2020, 2:26 PMMarwan Sarieddine
08/20/2020, 2:26 PMDylan
08/20/2020, 2:26 PMMarwan Sarieddine
08/20/2020, 2:29 PM__repr__
in case it is not serializable itself ...Dylan
08/20/2020, 2:30 PMMarwan Sarieddine
08/20/2020, 2:31 PMDylan
08/20/2020, 2:31 PMJSON_EXTRACT
queries weren’t working with a certain BigQuery table that was created from pandas uploads)Marwan Sarieddine
08/20/2020, 2:33 PMDylan
08/20/2020, 2:35 PMMarwan Sarieddine
08/20/2020, 2:40 PMDylan
08/20/2020, 2:42 PM