merlin
10/30/2022, 9:59 PM# i've left out all the imports and task definitions
# trino_flows.py
@flow(name="extract write")
def extract_write(config):
logger = get_run_logger()
<http://logger.info|logger.info>(f"extract file: {config.filepath}")
sql = load_sqlfile(config.filepath)
trino_cnxn = trino_connect()
data = send_query(trino_cnxn, sql)
write_output(data, config.outfile_path)
# file: extract_write.py
filepath = Path(sys.argv[1])
extract_config = ExtractConfig(filepath=filepath)
with tags(extract_config.dataset_name, "extract"):
extract_write(extract_config)
In development I'm calling the script with:
python src/extract_write.py src/extracts/weekly_date.sql
So the ExtractConfig object creates a dataset_name, rundate, and filepath field used by the flow code.
How do I build/apply a deployment when I'm passing an object to the flow function in my script?Anna Geller
merlin
10/30/2022, 11:43 PMAnna Geller
the idea is a flow takes a path to sql script as parameter, along with a date and maybe otherssounds like a perfect use case for parameters
merlin
10/31/2022, 2:59 AM@flow(name="extract write")
def extract_write(**kwargs):
extract_config = ExtractConfig(filepath=kwargs['filename'])
with tags(dataset_name, "extract"):
inner_extract_write(extract_config)
Anna Geller
merlin
10/31/2022, 3:00 AMAnna Geller
So searching the UI for flow name isn't helpful in my case. Tagging the flow with the name of the dataset makes the UI more convenient.fully understand the problem, and I'm 100% with you on that, we would like to allow more ways to attach metadata to flow runs and deployments but for now deployment object is the best place to do that
merlin
10/31/2022, 3:12 AMextract_write
can be scheduled over many parameters (sql files). I'm able to tag flow runs based on that input parameter at top, but it may come out differently as a subflow.src/script1.sql
, etc)
Each deployment has fixed parameters, and the tags would be set at the deployment build stage. You can change the parameters for an ad-hoc run to override the defaults, in my design this wouldn't be needed.
Another way, I could have a control flow, daily_jobs
just loop through the all the sql files and call the extract_write
flow on each of them. In this case my with tags...
thing would help again.
I like prefect because the flexibility of how to design a system. Once I learn all of this limited set of constraints I will be able to just think about what design approach to take.with tags..
to tag the flow run according to the name of the dataset being processed.
• entrypoint takes a string, the file path of a folder full of SQL files.
• a for loop iterates through each file and sends for execution on cloud DB
• this 'extract' flow is deployed for folders with daily/weekly jobs
here's the entrypoint flow code:
# bulk_extract.py
@flow(name="bulk-extracts")
def bulk_extracts(directory_name):
logger = get_run_logger()
extract_folder_items=Path(directory_name).iterdir()
for item in extract_folder_items:
<http://logger.info|logger.info>(f"daily file: {item}")
extract_config = ExtractConfig(filepath=item)
with tags(extract_config.dataset_name, "extract"):
extract_write(extract_config)
if __name__ == "__main__":
bulk_extracts(Path(sys.argv[1]))
The tags are really useful in the UI, I can gather up jobs by dataset_name, distinguish between DDL/extract jobs, etc.