https://prefect.io logo
Title
p

Pawel

06/14/2021, 7:17 PM
Hi folks, I have a task decorated with GCSResult
from prefect.engine.results import GCSResult
from prefect import task, Flow

@task(log_stdout=True, result=GCSResult(bucket='prefect-cloud-results', location="{flow_name}/{flow_run_id}/{task_slug}/{task_run_id}_dummybenchmark.pkl"))
def compute_benchmark() -> float:
    return 0.753

with Flow("test_gcs") as flow:
    r = compute_benchmark()

x=flow.run()
When running in Prefect Cloud, the result is written to the specified GCS location and logs show as much, but when running the same flow on my local machine, the flow runs successfully but no attempt is made to write to GCS. It's not an access issue as I can manually call GCSResult.write(...) with success. Is the GCSResult decorator only expected to work from Prefect Cloud runs or am I missing some setting?
k

Kevin Kho

06/14/2021, 7:24 PM
Hey @Pawel , do you mean using
flow.run()
when you say running the same flow on your local machine?
p

Pawel

06/14/2021, 7:25 PM
Hi @kevin yes
k

Kevin Kho

06/14/2021, 7:27 PM
You may have tagged the wrong kevin there. Checkpointing is turned on by default when using Prefect Cloud or Server, but off when running from local. You can set the PREFECT__FLOWS__CHECKPOINTING env variable to true to persist. Doc here .
p

Pawel

06/14/2021, 7:43 PM
Thanks @Kevin Kho I must be missing something obvious here but if I verify that checkpointing is turned on
from prefect.engine.results import GCSResult
from prefect import config
from prefect import task, Flow
@task(log_stdout=True, result=GCSResult(bucket='prefect-cloud-results', location="{flow_name}/{flow_run_id}/{task_slug}/{task_run_id}_dummybenchmark.pkl"))
def compute_benchmark() -> float:
    print(config.flows.checkpointing)
    return 0.753
with Flow("test_gcs") as flow:
    r = compute_benchmark()
x=flow.run()
the logs are
[2021-06-14 12:40:34-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'test_gcs'
[2021-06-14 12:40:34-0700] INFO - prefect.TaskRunner | Task 'compute_benchmark': Starting task run...
[2021-06-14 12:40:34-0700] INFO - prefect.TaskRunner | True
[2021-06-14 12:40:36-0700] INFO - prefect.TaskRunner | Task 'compute_benchmark': Finished task run for task with final state: 'Success'
[2021-06-14 12:40:36-0700] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Also
echo PREFECT__FLOWS__CHECKPOINTING
PREFECT__FLOWS__CHECKPOINTING
k

Kevin Kho

06/14/2021, 7:51 PM
Let me test. What version of Prefect are you on?
from prefect.engine.results.s3_result import S3Result
import prefect
from prefect import task, Flow

@task(log_stdout=True, result=S3Result(bucket='omlds-prefect', location="test.pkl"))
def compute_benchmark() -> float:
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(prefect.context.checkpointing)
    return 0.753

with Flow("test_gcs") as flow:
    r = compute_benchmark()

flow.run()
This worked for me. You should see True from that logging statement
p

Pawel

06/15/2021, 4:22 AM
Thank you @Kevin Kho. You're absolutely right. It was a combination of two things: I had to set the checkpointing flag AND set logging to debug to see the same logs I had in Prefect Cloud. Thanks for you kind help!