Slackbot
07/04/2024, 7:01 AMMarvin
07/04/2024, 7:02 AMflow_run
between functions in different scripts without losing information. This could take a few parallel searches.
I'll start by looking for relevant documentation and existing issues:
- Queries related to passing flow_run
between scripts.
- Queries about prefect.runtime.flow_run
.
Give me a moment.
Well, this is delightfully frustrating. It seems I didn't find any relevant issues or precise matches in the documentation with those queries.
However, I can attempt a different approach. Let's see what happens if I focus on best practices for sharing data between flows and handling flow_run
information.
I'll query Prefect documentation again, but this time with slightly altered queries:
- "share data between flows"
- "flow_run reference Prefect 2.0"
Let's give it another shot.
Oh, the cruel irony. The documentation search seems utterly incapable of understanding my brilliance today.
Let's try a different angle, shall we? Prefect often recommends using state and results to persist data between tasks and flows. Perhaps we could explore using the .result()
method or utilizing external storage (like an S3 bucket or a database) to share the necessary information.
In any event, I recommend you consider these ideas:
1. State Results: Use Prefect's way of handling state and results to persist data between flows.
2. External Storage: Utilize a shared external storage to save and retrieve flow_run
information.
Would you like to get some example code on how to share data between flows or use external storage methods?Marvin
07/04/2024, 7:12 AMif i proceed in this way, all works fine""" from datetime import datetime from prefect import flow, get_run_logger from prefect.runtime import flow_run from da_prefect.tasks.data_contract import get_data_contract from tasks.production_process_tasks import copy_file_from_source_to_dockpath def generate_flow_run_name(): asset_id = flow_run.parameters["asset_id"] return f"{asset_id}-production" @flow(flow_run_name=generate_flow_run_name) def cherrybank_production_mappatura_valutaria(asset_id: str, production_time: datetime = datetime.now()): logger = get_run_logger() logger.info(f"Load data contract for asset '{asset_id}'") data_contract = get_data_contract(asset_id) copy_file_from_source_to_dockpath(data_contract, logger) logger.info(f"Produced asset '{asset_id}'") if name == "__main__": asset_id = "cherrybank-mappatura_valutaria" cherrybank_production_mappatura_valutaria(asset_id=asset_id) """ but as that generate_flow_run_name function I use it in many flows I would like to write it once and then reuse it around so i try in that way """ from datetime import datetime from prefect import flow, get_run_logger from prefect.runtime import flow_run from da_prefect.tasks.data_contract import get_data
Marvin
07/04/2024, 7:13 AMif i proceed in this way, all works fine""" from datetime import datetime from prefect import flow, get_run_logger from prefect.runtime import flow_run from da_prefect.tasks.data_contract import get_data_contract from tasks.production_process_tasks import copy_file_from_source_to_dockpath def generate_flow_run_name(): asset_id = flow_run.parameters["asset_id"] return f"{asset_id}-production" @flow(flow_run_name=generate_flow_run_name) def cherrybank_production_mappatura_valutaria(asset_id: str, production_time: datetime = datetime.now()): logger = get_run_logger() logger.info(f"Load data contract for asset '{asset_id}'") data_contract = get_data_contract(asset_id) copy_file_from_source_to_dockpath(data_contract, logger) logger.info(f"Produced asset '{asset_id}'") if name == "__main__": asset_id = "cherrybank-mappatura_valutaria" cherrybank_production_mappatura_valutaria(asset_id=asset_id) """ but as that generate_flow_run_name function I use it in many flows I would like to write it once and then reuse it around, so i try in that way """ from datetime import datetime from prefect import flow, get_run_logger from prefect.runtime import flow_run from da_prefect.tasks.data_contract import get_data