Hi folks, I have a task decorated with GCSResult `...
# prefect-server
p
Hi folks, I have a task decorated with GCSResult
Copy code
from prefect.engine.results import GCSResult
from prefect import task, Flow

@task(log_stdout=True, result=GCSResult(bucket='prefect-cloud-results', location="{flow_name}/{flow_run_id}/{task_slug}/{task_run_id}_dummybenchmark.pkl"))
def compute_benchmark() -> float:
    return 0.753

with Flow("test_gcs") as flow:
    r = compute_benchmark()

x=flow.run()
When running in Prefect Cloud, the result is written to the specified GCS location and logs show as much, but when running the same flow on my local machine, the flow runs successfully but no attempt is made to write to GCS. It's not an access issue as I can manually call GCSResult.write(...) with success. Is the GCSResult decorator only expected to work from Prefect Cloud runs or am I missing some setting?
k
Hey @Pawel , do you mean using
flow.run()
when you say running the same flow on your local machine?
p
Hi @kevin yes
k
You may have tagged the wrong kevin there. Checkpointing is turned on by default when using Prefect Cloud or Server, but off when running from local. You can set the PREFECT__FLOWS__CHECKPOINTING env variable to true to persist. Doc here .
p
Thanks @Kevin Kho I must be missing something obvious here but if I verify that checkpointing is turned on
Copy code
from prefect.engine.results import GCSResult
from prefect import config
from prefect import task, Flow
@task(log_stdout=True, result=GCSResult(bucket='prefect-cloud-results', location="{flow_name}/{flow_run_id}/{task_slug}/{task_run_id}_dummybenchmark.pkl"))
def compute_benchmark() -> float:
    print(config.flows.checkpointing)
    return 0.753
with Flow("test_gcs") as flow:
    r = compute_benchmark()
x=flow.run()
the logs are
Copy code
[2021-06-14 12:40:34-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'test_gcs'
[2021-06-14 12:40:34-0700] INFO - prefect.TaskRunner | Task 'compute_benchmark': Starting task run...
[2021-06-14 12:40:34-0700] INFO - prefect.TaskRunner | True
[2021-06-14 12:40:36-0700] INFO - prefect.TaskRunner | Task 'compute_benchmark': Finished task run for task with final state: 'Success'
[2021-06-14 12:40:36-0700] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Also
Copy code
echo PREFECT__FLOWS__CHECKPOINTING
PREFECT__FLOWS__CHECKPOINTING
k
Let me test. What version of Prefect are you on?
Copy code
from prefect.engine.results.s3_result import S3Result
import prefect
from prefect import task, Flow

@task(log_stdout=True, result=S3Result(bucket='omlds-prefect', location="test.pkl"))
def compute_benchmark() -> float:
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(prefect.context.checkpointing)
    return 0.753

with Flow("test_gcs") as flow:
    r = compute_benchmark()

flow.run()
This worked for me. You should see True from that logging statement
p
Thank you @Kevin Kho. You're absolutely right. It was a combination of two things: I had to set the checkpointing flag AND set logging to debug to see the same logs I had in Prefect Cloud. Thanks for you kind help!