Vadym Dytyniak
03/09/2022, 11:28 AM@task(result=S3Result(bucket='bucket-name'))
def checkpoint_data() -> dd.DataFrame:
df = pd.DataFrame({'col_1': ['1', '2', '3'], 'col_2': [1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=1)
return ddf
@task()
def accept_checkpointed_data(ddf: dd.DataFrame) -> None:
raise ValueError("Checkpoint testing...")
Flow:
ddf = checkpoint_data()
accept_checkpointed_data(ddf)
How to be sure that on restart after failure(ValueError I added) accept_checkpointed_data task loads data from S3 and not using cached_inputs?
ThanksAnna Geller
03/09/2022, 12:00 PM@task(result=S3Result(bucket='bucket-name'), checkpoint=True)
then Prefect should be able to use that to restart your flow run from a failed task and the pickled dataframe will be retrieved from S3 and used as input to the accept_checkpointed_data
task.
If you are doing this locally and wonder why it doesn't work, checkpointing is only turned on by default when running on Prefect Cloud or Server. To enable checkpointing for local testing, set the PREFECT__FLOWS__CHECKPOINTING
environment variable to true
.Vadym Dytyniak
03/09/2022, 12:14 PMAnna Geller
03/09/2022, 12:17 PMVadym Dytyniak
03/09/2022, 12:19 PM@task(
checkpoint=True,
result=LocalResult(
dir='mydir',
)
)
def checkpoint_data() -> dd.DataFrame:
df = pd.DataFrame({'col_1': ['1', '2', '3'], 'col_2': [1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=1)
return ddf
with Flow(
name='test_sample',
) as flow:
ddf = checkpoint_data()
os.environ['PREFECT__FLOWS__CHECKPOINTING'] = 'true'
flow.run()
Anna Geller
03/09/2022, 12:23 PMVadym Dytyniak
03/09/2022, 12:23 PMAnna Geller
03/09/2022, 12:24 PMVadym Dytyniak
03/09/2022, 12:24 PMAnna Geller
03/09/2022, 12:24 PMVadym Dytyniak
03/09/2022, 12:25 PMAnna Geller
03/09/2022, 12:25 PMVadym Dytyniak
03/09/2022, 12:26 PMVadym Dytyniak
03/09/2022, 12:53 PMAnna Geller
03/09/2022, 1:04 PMprefect agent local start --log-level DEBUG
but you should also be able to set it on the run config:
from prefect.run_configs import UniversalRun
flow.run_config = UniversalRun(env={"PREFECT__LOGGING__LEVEL": "DEBUG"})
Vadym Dytyniak
03/09/2022, 1:21 PMKevin Kho
03/09/2022, 3:03 PMto_csv
Vadym Dytyniak
03/09/2022, 3:08 PMKevin Kho
03/09/2022, 3:08 PMddf.to_csv("s3://...")
if you can because this will persist directly to S3 distributedly without collecting to the driver as a Pandas DataFrame.
You also should use the worker_client
I think to actually submit work to the cluster as seen hereVadym Dytyniak
03/09/2022, 3:11 PM