Robert Kowalski
03/15/2023, 11:33 AMtags
or something similar in @flow
decorator. On the other hand flow decorator has version parameter but it is not available in REST API endpoint. Is it possible to add tags or version in flow decorator and get access for it by REST API ?redsquare
03/15/2023, 11:35 AMRobert Kowalski
03/15/2023, 11:51 AMfrom prefect import flow, task
from prefect.deployments import Deployment
@task
def tmp_task():
print('Task')
@flow(name='tmp_flow', log_prints=True)
def tmp_flow():
print('flow start')
tmp_task()
print('flow end')
if __name__ == '__main__':
environment = 'production'
tmp_flow()
Deployment.build_from_flow(
flow=tmp_flow,
output='tmp_flow.yml',
name=environment,
path=f'.',
infra_overrides={"env": {
"PREFECT_LOGGING_LEVEL": "INFO",
}},
work_queue_name=environment,
apply=True,
is_schedule_active=False,
work_pool_name=environment,
tags=[environment],
)
and output from calling endpoints:
flow by name :
{
"id": "397bdbc2-cc4a-497b-a456-b885c7035b18",
"created": "2023-03-07T10:14:58.549838+00:00",
"updated": "2023-03-07T10:14:58.549855+00:00",
"name": "tmp_flow",
"tags": []
}
}deployment by name
{
"id":"391259ec-ce11-45e7-a5c1-ce69d9492106",
"created":"2023-03-15T11:42:29.227680+00:00",
"updated":"2023-03-15T11:42:29.226010+00:00",
"name":"production",
"version":"cc88b092bfcc6cf80ddcc310906f151c",
"description":"None",
"flow_id":"397bdbc2-cc4a-497b-a456-b885c7035b18",
"schedule":"None",
"is_schedule_active":false,
"infra_overrides":{
"env":{
"PREFECT_LOGGING_LEVEL":"INFO"
}
},
"parameters":{
},
"tags":[
"production"
],
"work_queue_name":"production",
"parameter_openapi_schema":{
"type":"object",
"title":"Parameters",
"properties":{
}
},
"path":".",
"entrypoint":"tmp.py:tmp_flow",
"manifest_path":"None",
"storage_document_id":"None",
"infrastructure_document_id":"9299ad3a-31b4-4de0-92c2-b24a32d58490",
"created_by":{
"id":"c4db463f-8cf4-4d76-8079-d0d144064e6a",
"type":"USER",
"display_value":"devalgopolisai"
},
"updated_by":{
"id":"c4db463f-8cf4-4d76-8079-d0d144064e6a",
"type":"USER",
"display_value":"devalgopolisai"
},
"work_pool_name":"production"
}
so tags in deployments are added but i want setup tags in flow directly
and retrieve this tags in flow by name endpoint ( or version of flow)
Your solution setup only tags in deployment. not in flow directlyredsquare
03/15/2023, 11:59 AM@task
async def set_flow_run_tags(tags: list[str]):
task_run_context = prefect.context.get_run_context()
flow_run_id = task_run_context.task_run.flow_run_id
async with prefect.client.server.get_client() as client:
response = await client.update_flow_run(flow_run_id=flow_run_id, tags=tags)
@flow
def main():
set_flow_run_tags(["tags", "at", "runtime"])
Robert Kowalski
03/15/2023, 12:13 PMasync with prefect.get_client() as client:
but this still setup tags only for flow runs not for flow directly
https://github.com/PrefectHQ/prefect/blob/fc433e3488d695ed614ad526ff870dab1e0391b4/src/prefect/deployments.py#L652
Here prefect create flow by name, And other flow parameters are skipped ( like flow version or flow_tags )
so i think getting flow tags or version is not possible and flow version is useless tooredsquare
03/15/2023, 12:34 PMRobert Kowalski
03/15/2023, 12:55 PMflow_name:flow_version
one flow can be run on few environments eg.:
• production
◦ flow_1
◦ flow_2
• other_production
◦ flow_1
i want create flow_watchdog to dynamically create deployments (without installing all libs for each flows) using rest api ( or in another way)
But a cant create infrastructure in deployment without flow version/ tags base only on flow_name.
I can redeploy manually, but i don't want 🙂
so , in conclusion, there is no way to receive flow version or flow tag from rest api base only on flow_name?redsquare
03/15/2023, 1:00 PM