Hi. I am trying to add checkpointing in my flows t...
# ask-community
v
Hi. I am trying to add checkpointing in my flows to checkpoint Dask dataframe that I pass between tasks: Tasks:
Copy code
@task(result=S3Result(bucket='bucket-name'))
def checkpoint_data() -> dd.DataFrame:
    df = pd.DataFrame({'col_1': ['1', '2', '3'], 'col_2': [1, 2, 3]})
    ddf = dd.from_pandas(df, npartitions=1)

    return ddf


@task()
def accept_checkpointed_data(ddf: dd.DataFrame) -> None:
    raise ValueError("Checkpoint testing...")
Flow:
Copy code
ddf = checkpoint_data()
accept_checkpointed_data(ddf)
How to be sure that on restart after failure(ValueError I added) accept_checkpointed_data task loads data from S3 and not using cached_inputs? Thanks
a
It depends on the checkpoint configuration - if you set (which should be True by default with S3Result):
Copy code
@task(result=S3Result(bucket='bucket-name'), checkpoint=True)
then Prefect should be able to use that to restart your flow run from a failed task and the pickled dataframe will be retrieved from S3 and used as input to the
accept_checkpointed_data
task. If you are doing this locally and wonder why it doesn't work, checkpointing is only turned on by default when running on Prefect Cloud or Server. To enable checkpointing for local testing, set the
PREFECT__FLOWS__CHECKPOINTING
environment variable to
true
.
v
I am setting this property and use LocalResult, but dir is still empty. I am talking about local mode.
a
I'm a bit confused - you set S3Result but also local execution. Does it mean you use: 1. S3Result with local flow run 2. S3Result with local agent but a backend flow run 3. LocalResult with a local flow run 4. LocalResult with a local agent but backend flow run?
v
I selected the simplest one - 3
to debug and see how it works
Copy code
@task(
    checkpoint=True,
    result=LocalResult(
        dir='mydir',
    )
)
def checkpoint_data() -> dd.DataFrame:
    df = pd.DataFrame({'col_1': ['1', '2', '3'], 'col_2': [1, 2, 3]})
    ddf = dd.from_pandas(df, npartitions=1)

    return ddf


with Flow(
    name='test_sample',
) as flow:
    ddf = checkpoint_data()

os.environ['PREFECT__FLOWS__CHECKPOINTING'] = 'true'
flow.run()
prefect version 1.0.0
a
Are you on Discourse? https://discourse.prefect.io
v
not yet
is the answer there?
a
I was trying to add my answer with images but it failed because Slack is partly down
v
Should I create question there?
a
I would like to continue the discussion there if you would be OK with that so that I can add images when reproducing the issue 😄 fine with you? you would need to sign up
v
I am there
👍 1
a
I will reproduce and add my answer there - feel free to also respond directly there
v
thanks
is it shared with this thread?
no, I see 🙂
if you are fine with that, let's continue the discussion there - I already reproduced with S3Result and backend execution - i.e. option #2, now I'll try to do #3 as you requested
v
Thank for you help!
👍 1
Last thing - how to set debug logging in prefect cloud?
a
The best way is to set it on the agent, e.g.:
Copy code
prefect agent local start --log-level DEBUG
but you should also be able to set it on the run config:
Copy code
from prefect.run_configs import UniversalRun

flow.run_config = UniversalRun(env={"PREFECT__LOGGING__LEVEL": "DEBUG"})
v
Yes, it helped, now I see log that data for failed tasks is downloaded from S3
thanks a lot
👍 1
k
I dont think this should be doable because the default serializer for results is the PickleSerializer which uses cloudpickle to compress the data before persisting. You can’t cloudpickle Dask Dataframes though so you either need to convert to pandas (and use either the default serializer or Pandas serializer) or you need to create a distributed Dask DataFrame serializer that uses Dask
to_csv
v
Yes, we are going to implement custom S3Result to store and load ddf as parquet
k
I would honestly suggest turning checkpointing off and then saving it manually using s3fs like this;
Copy code
ddf.to_csv("s3://...")
if you can because this will persist directly to S3 distributedly without collecting to the driver as a Pandas DataFrame. You also should use the
worker_client
I think to actually submit work to the cluster as seen here
Oh yeah that’s right. Don’t even use CSV lol
v
We have custom storage layer, so we can just use it to override read/write methods
the only thing is not clear it is exists method
We also have to override it