https://prefect.io logo
Title
v

Vadym Dytyniak

03/09/2022, 11:28 AM
Hi. I am trying to add checkpointing in my flows to checkpoint Dask dataframe that I pass between tasks: Tasks:
@task(result=S3Result(bucket='bucket-name'))
def checkpoint_data() -> dd.DataFrame:
    df = pd.DataFrame({'col_1': ['1', '2', '3'], 'col_2': [1, 2, 3]})
    ddf = dd.from_pandas(df, npartitions=1)

    return ddf


@task()
def accept_checkpointed_data(ddf: dd.DataFrame) -> None:
    raise ValueError("Checkpoint testing...")
Flow:
ddf = checkpoint_data()
accept_checkpointed_data(ddf)
How to be sure that on restart after failure(ValueError I added) accept_checkpointed_data task loads data from S3 and not using cached_inputs? Thanks
a

Anna Geller

03/09/2022, 12:00 PM
It depends on the checkpoint configuration - if you set (which should be True by default with S3Result):
@task(result=S3Result(bucket='bucket-name'), checkpoint=True)
then Prefect should be able to use that to restart your flow run from a failed task and the pickled dataframe will be retrieved from S3 and used as input to the
accept_checkpointed_data
task. If you are doing this locally and wonder why it doesn't work, checkpointing is only turned on by default when running on Prefect Cloud or Server. To enable checkpointing for local testing, set the
PREFECT__FLOWS__CHECKPOINTING
environment variable to
true
.
v

Vadym Dytyniak

03/09/2022, 12:14 PM
I am setting this property and use LocalResult, but dir is still empty. I am talking about local mode.
a

Anna Geller

03/09/2022, 12:17 PM
I'm a bit confused - you set S3Result but also local execution. Does it mean you use: 1. S3Result with local flow run 2. S3Result with local agent but a backend flow run 3. LocalResult with a local flow run 4. LocalResult with a local agent but backend flow run?
v

Vadym Dytyniak

03/09/2022, 12:19 PM
I selected the simplest one - 3
to debug and see how it works
@task(
    checkpoint=True,
    result=LocalResult(
        dir='mydir',
    )
)
def checkpoint_data() -> dd.DataFrame:
    df = pd.DataFrame({'col_1': ['1', '2', '3'], 'col_2': [1, 2, 3]})
    ddf = dd.from_pandas(df, npartitions=1)

    return ddf


with Flow(
    name='test_sample',
) as flow:
    ddf = checkpoint_data()

os.environ['PREFECT__FLOWS__CHECKPOINTING'] = 'true'
flow.run()
prefect version 1.0.0
a

Anna Geller

03/09/2022, 12:23 PM
Are you on Discourse? https://discourse.prefect.io
v

Vadym Dytyniak

03/09/2022, 12:23 PM
not yet
is the answer there?
a

Anna Geller

03/09/2022, 12:24 PM
I was trying to add my answer with images but it failed because Slack is partly down
v

Vadym Dytyniak

03/09/2022, 12:24 PM
Should I create question there?
a

Anna Geller

03/09/2022, 12:24 PM
I would like to continue the discussion there if you would be OK with that so that I can add images when reproducing the issue 😄 fine with you? you would need to sign up
v

Vadym Dytyniak

03/09/2022, 12:25 PM
I am there
👍 1
a

Anna Geller

03/09/2022, 12:25 PM
I will reproduce and add my answer there - feel free to also respond directly there
v

Vadym Dytyniak

03/09/2022, 12:26 PM
thanks
is it shared with this thread?
no, I see 🙂
if you are fine with that, let's continue the discussion there - I already reproduced with S3Result and backend execution - i.e. option #2, now I'll try to do #3 as you requested
v

Vadym Dytyniak

03/09/2022, 12:53 PM
Thank for you help!
👍 1
Last thing - how to set debug logging in prefect cloud?
a

Anna Geller

03/09/2022, 1:04 PM
The best way is to set it on the agent, e.g.:
prefect agent local start --log-level DEBUG
but you should also be able to set it on the run config:
from prefect.run_configs import UniversalRun

flow.run_config = UniversalRun(env={"PREFECT__LOGGING__LEVEL": "DEBUG"})
v

Vadym Dytyniak

03/09/2022, 1:21 PM
Yes, it helped, now I see log that data for failed tasks is downloaded from S3
thanks a lot
👍 1
k

Kevin Kho

03/09/2022, 3:03 PM
I dont think this should be doable because the default serializer for results is the PickleSerializer which uses cloudpickle to compress the data before persisting. You can’t cloudpickle Dask Dataframes though so you either need to convert to pandas (and use either the default serializer or Pandas serializer) or you need to create a distributed Dask DataFrame serializer that uses Dask
to_csv
v

Vadym Dytyniak

03/09/2022, 3:08 PM
Yes, we are going to implement custom S3Result to store and load ddf as parquet
k

Kevin Kho

03/09/2022, 3:08 PM
I would honestly suggest turning checkpointing off and then saving it manually using s3fs like this;
ddf.to_csv("s3://...")
if you can because this will persist directly to S3 distributedly without collecting to the driver as a Pandas DataFrame. You also should use the
worker_client
I think to actually submit work to the cluster as seen here
Oh yeah that’s right. Don’t even use CSV lol
v

Vadym Dytyniak

03/09/2022, 3:11 PM
We have custom storage layer, so we can just use it to override read/write methods
the only thing is not clear it is exists method
We also have to override it