Pawel
06/14/2021, 7:17 PMfrom prefect.engine.results import GCSResult
from prefect import task, Flow
@task(log_stdout=True, result=GCSResult(bucket='prefect-cloud-results', location="{flow_name}/{flow_run_id}/{task_slug}/{task_run_id}_dummybenchmark.pkl"))
def compute_benchmark() -> float:
return 0.753
with Flow("test_gcs") as flow:
r = compute_benchmark()
x=flow.run()
When running in Prefect Cloud, the result is written to the specified GCS location and logs show as much, but when running the same flow on my local machine, the flow runs successfully but no attempt is made to write to GCS. It's not an access issue as I can manually call GCSResult.write(...) with success. Is the GCSResult decorator only expected to work from Prefect Cloud runs or am I missing some setting?Kevin Kho
06/14/2021, 7:24 PMflow.run()
when you say running the same flow on your local machine?Pawel
06/14/2021, 7:25 PMKevin Kho
06/14/2021, 7:27 PMPawel
06/14/2021, 7:43 PMfrom prefect.engine.results import GCSResult
from prefect import config
from prefect import task, Flow
@task(log_stdout=True, result=GCSResult(bucket='prefect-cloud-results', location="{flow_name}/{flow_run_id}/{task_slug}/{task_run_id}_dummybenchmark.pkl"))
def compute_benchmark() -> float:
print(config.flows.checkpointing)
return 0.753
with Flow("test_gcs") as flow:
r = compute_benchmark()
x=flow.run()
the logs are
[2021-06-14 12:40:34-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'test_gcs'
[2021-06-14 12:40:34-0700] INFO - prefect.TaskRunner | Task 'compute_benchmark': Starting task run...
[2021-06-14 12:40:34-0700] INFO - prefect.TaskRunner | True
[2021-06-14 12:40:36-0700] INFO - prefect.TaskRunner | Task 'compute_benchmark': Finished task run for task with final state: 'Success'
[2021-06-14 12:40:36-0700] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Also
echo PREFECT__FLOWS__CHECKPOINTING
PREFECT__FLOWS__CHECKPOINTING
Kevin Kho
06/14/2021, 7:51 PMfrom prefect.engine.results.s3_result import S3Result
import prefect
from prefect import task, Flow
@task(log_stdout=True, result=S3Result(bucket='omlds-prefect', location="test.pkl"))
def compute_benchmark() -> float:
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(prefect.context.checkpointing)
return 0.753
with Flow("test_gcs") as flow:
r = compute_benchmark()
flow.run()
Pawel
06/15/2021, 4:22 AM