KG
07/25/2025, 2:24 PMMarvin
07/25/2025, 2:24 PMMarvin
07/25/2025, 2:25 PMbash
pip install prefect-gcp
Here's a complete example:
python
from prefect import flow
from prefect_gcp.cloud_storage import GcsBucket
# Create and save the GCS block
gcs_block = GcsBucket(
bucket="your-bucket-name",
gcp_credentials={
"type": "service_account",
# ... your credentials here
}
)
gcs_block.save("my-gcs-block", overwrite=True)
@flow(
flow_result_storage=GcsBucket.load("my-gcs-block"),
result_storage_key=lambda context: f"{context.flow.name}/{context.flow_run.id}.pkl"
)
def my_example_flow():
return "some result"
# Run the flow
result = my_example_flow()
Let's break down the key parts:
1. The result_storage_key
parameter accepts a callable that determines the storage path for the results
2. Using context.flow.name
gives us the function name of the flow automatically
3. We append the flow run ID to ensure uniqueness
4. The result will be stored in your GCS bucket at: my_example_flow/<flow-run-id>.pkl
You could also create more complex prefixes, for example:
python
@flow(
flow_result_storage=GcsBucket.load("my-gcs-block"),
result_storage_key=lambda context: (
f"flow_results/{context.flow.name}/{context.flow_run.timestamp.strftime('%Y-%m-%d')}/{context.flow_run.id}.pkl"
)
)
def another_example_flow():
return "some result"
This would create a path structure like:
flow_results/
another_example_flow/
2023-10-20/
<flow-run-id>.pkl
Important notes:
1. Make sure your GCS bucket exists and your credentials have write permissions
2. The GCS credentials should be properly configured in your block
3. The .pkl
extension is conventional for pickle files, but you can use any extension that matches your serialization format
For more details on setting up GCS credentials and blocks, you can refer to the prefect-gcp documentation.
Let me know if you need any clarification or have questions about other aspects of result storage with GCS!KG
07/25/2025, 2:46 PMKG
07/25/2025, 3:06 PMMarvin
07/25/2025, 3:06 PMMarvin
07/25/2025, 3:07 PMresult_storage_key
parameter is indeed only available for tasks, not flows.
Here's the correct way to handle GCS storage for flow results:
python
from prefect import flow
from prefect_gcp.cloud_storage import GcsBucket
# Create and save the GCS block
gcs_block = GcsBucket(
bucket="your-bucket-name",
gcp_credentials={
"type": "service_account",
# ... your credentials here
}
)
gcs_block.save("my-gcs-block", overwrite=True)
@flow(
result_storage=GcsBucket.load("my-gcs-block")
)
def my_example_flow():
return "some result"
# Run the flow
result = my_example_flow()
For flows, the storage location is automatically managed by Prefect. The results will be stored in your GCS bucket with a path structure that Prefect determines, which typically includes the flow run ID to ensure uniqueness.
If you need custom path handling for specific tasks within the flow, you can use result_storage_key
at the task level:
python
from prefect import flow, task
@task(
result_storage_key=lambda context: f"{context.task.name}/{context.task_run.id}.pkl"
)
def my_task():
return "task result"
@flow(
result_storage=GcsBucket.load("my-gcs-block")
)
def my_example_flow():
result = my_task()
return result
I apologize for any confusion my previous response may have caused. Let me know if you have any questions about this corrected information!