https://prefect.io logo
Title
r

Robert Kowalski

03/15/2023, 11:33 AM
Hi, I'm a little confused, in REST API prefect has a endpoint to get flows by name and return flows id, name, tags but when i want define flow tags in code i don't see parameter like
tags
or something similar in
@flow
decorator. On the other hand flow decorator has version parameter but it is not available in REST API endpoint. Is it possible to add tags or version in flow decorator and get access for it by REST API ?
image.png
r

redsquare

03/15/2023, 11:35 AM
its on the deployment - build_from_flow
***kwargs*
r

Robert Kowalski

03/15/2023, 11:51 AM
from prefect import flow, task
from prefect.deployments import Deployment


@task
def tmp_task():
    print('Task')


@flow(name='tmp_flow', log_prints=True)
def tmp_flow():
    print('flow start')
    tmp_task()
    print('flow end')


if __name__ == '__main__':
    environment = 'production'
    tmp_flow()
    Deployment.build_from_flow(
        flow=tmp_flow,
        output='tmp_flow.yml',
        name=environment,
        path=f'.',
        infra_overrides={"env": {
            "PREFECT_LOGGING_LEVEL": "INFO",
        }},
        work_queue_name=environment,
        apply=True,
        is_schedule_active=False,
        work_pool_name=environment,
        tags=[environment],
    )
and output from calling endpoints: flow by name :
{
  "id": "397bdbc2-cc4a-497b-a456-b885c7035b18",
  "created": "2023-03-07T10:14:58.549838+00:00",
  "updated": "2023-03-07T10:14:58.549855+00:00",
  "name": "tmp_flow",
  "tags": []
}
}deployment by name
{
   "id":"391259ec-ce11-45e7-a5c1-ce69d9492106",
   "created":"2023-03-15T11:42:29.227680+00:00",
   "updated":"2023-03-15T11:42:29.226010+00:00",
   "name":"production",
   "version":"cc88b092bfcc6cf80ddcc310906f151c",
   "description":"None",
   "flow_id":"397bdbc2-cc4a-497b-a456-b885c7035b18",
   "schedule":"None",
   "is_schedule_active":false,
   "infra_overrides":{
      "env":{
         "PREFECT_LOGGING_LEVEL":"INFO"
      }
   },
   "parameters":{
      
   },
   "tags":[
      "production"
   ],
   "work_queue_name":"production",
   "parameter_openapi_schema":{
      "type":"object",
      "title":"Parameters",
      "properties":{
         
      }
   },
   "path":".",
   "entrypoint":"tmp.py:tmp_flow",
   "manifest_path":"None",
   "storage_document_id":"None",
   "infrastructure_document_id":"9299ad3a-31b4-4de0-92c2-b24a32d58490",
   "created_by":{
      "id":"c4db463f-8cf4-4d76-8079-d0d144064e6a",
      "type":"USER",
      "display_value":"devalgopolisai"
   },
   "updated_by":{
      "id":"c4db463f-8cf4-4d76-8079-d0d144064e6a",
      "type":"USER",
      "display_value":"devalgopolisai"
   },
   "work_pool_name":"production"
} so tags in deployments are added but i want setup tags in flow directly and retrieve this tags in flow by name endpoint ( or version of flow) Your solution setup only tags in deployment. not in flow directly
r

redsquare

03/15/2023, 11:59 AM
@task
async def set_flow_run_tags(tags: list[str]):
    task_run_context = prefect.context.get_run_context()
    flow_run_id = task_run_context.task_run.flow_run_id

    async with prefect.client.server.get_client() as client:
        response = await client.update_flow_run(flow_run_id=flow_run_id, tags=tags)

@flow
def main():
    set_flow_run_tags(["tags", "at", "runtime"])
r

Robert Kowalski

03/15/2023, 12:13 PM
i use 2.8.4: so i changed to
async with prefect.get_client() as client:
but this still setup tags only for flow runs not for flow directly https://github.com/PrefectHQ/prefect/blob/fc433e3488d695ed614ad526ff870dab1e0391b4/src/prefect/deployments.py#L652 Here prefect create flow by name, And other flow parameters are skipped ( like flow version or flow_tags ) so i think getting flow tags or version is not possible and flow version is useless too
or maybe Im wrong and dont understand main idea
r

redsquare

03/15/2023, 12:34 PM
What exactly do you want to do
what is stopping you to redeploy with the version+tags
r

Robert Kowalski

03/15/2023, 12:55 PM
My flows have docker infrastructure and each my flow have own docker image ( different libs are required), docker image have tag
flow_name:flow_version
one flow can be run on few environments eg.: • production ◦ flow_1 ◦ flow_2 • other_production ◦ flow_1 i want create flow_watchdog to dynamically create deployments (without installing all libs for each flows) using rest api ( or in another way) But a cant create infrastructure in deployment without flow version/ tags base only on flow_name. I can redeploy manually, but i don't want 🙂 so , in conclusion, there is no way to receive flow version or flow tag from rest api base only on flow_name?
Regardless, thank You very much for your time 🙂
r

redsquare

03/15/2023, 1:00 PM
aha, ok. I just github action deploys and install the reqs