https://prefect.io logo
#prefect-community
How can I leverage tags for better organization (and search from the UI) of metadata related to my flow runs?
m

merlin

10/30/2022, 9:59 PM
Hello, I'm struggling with the entrypoint in deployments, I think because of the way I've called the flow when running as a script. I'm calling the flow and assigning a dyanmic tag at runtime, after instantiating an object to pass as a parameter to the flow:
Copy code
# i've left out all the imports and task definitions

# trino_flows.py
@flow(name="extract write")
def extract_write(config):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"extract file: {config.filepath}")

    sql = load_sqlfile(config.filepath)
    trino_cnxn = trino_connect()
    data = send_query(trino_cnxn, sql)
    write_output(data, config.outfile_path)


# file: extract_write.py
filepath = Path(sys.argv[1])
extract_config = ExtractConfig(filepath=filepath)

with tags(extract_config.dataset_name, "extract"):
    extract_write(extract_config)
In development I'm calling the script with:
Copy code
python src/extract_write.py src/extracts/weekly_date.sql
So the ExtractConfig object creates a dataset_name, rundate, and filepath field used by the flow code. How do I build/apply a deployment when I'm passing an object to the flow function in my script?
1
a

Anna Geller

10/30/2022, 10:43 PM
You can leverage Prefect parametrization instead of sys.argv - requires a little rewrite but I'd recommend it
The entrypoint is always flow_script:flow_function_name
m

merlin

10/30/2022, 11:43 PM
awesome. so the deployment is set with parameters defined at runtime. can the parameter be an object? or perhaps i need a task to instantiate the object from a string path? the idea is a flow takes a path to sql script as parameter, along with a date and maybe others.
wait -- how does the deployment call 'with tags(...)' to assign a runtime tag? the entrypoint is only starting w the call to flow
a

Anna Geller

10/31/2022, 2:56 AM
there is really no such thing as runtime tags in Prefect 😅 not sure what are you referring to there are two types of tags: • task tags used for task run concurrency limit • deployment tags used for deployment organization (e.g. you could have tags corresponding to projects or teams), their only use is using them for filtering in the UI and organization but those are statically defined "labels", they are not dynamic what are you trying to solve with tags here?
parameters can be any JSON-serializable data objects, we see most users use it for strings, int, floats, dict, lists etc
the idea is a flow takes a path to sql script as parameter, along with a date and maybe others
sounds like a perfect use case for parameters
m

merlin

10/31/2022, 2:59 AM
argh sorry to be pestering you on a weekend, i just realized. The tags attached to flow are to use the UI to quickly filter for flows tagged with the name of dataset being created. The UI has only limited search, and I have only a couple of distinct flows that take one of many SQL scripts. So searching the UI for flow name isn't helpful in my case. Tagging the flow with the name of the dataset makes the UI more convenient.
😁 1
So after reading the deployments: parameters section of the docs carefully, it looks like the entrypoint parameter cannot be an object passed from another process, but only as key-value pairs. That's reasonable. I can call the entrypoint with a string parameter variable, and initialize the object as part of a task. To solve the dynamic tagging, I think I can just use a nested flow:
Copy code
@flow(name="extract write")
def extract_write(**kwargs):
    extract_config = ExtractConfig(filepath=kwargs['filename'])    

    with tags(dataset_name, "extract"):
        inner_extract_write(extract_config)
a

Anna Geller

10/31/2022, 3:00 AM
KV pairs, correct
m

merlin

10/31/2022, 3:00 AM
by 'dynamic' tag I just mean the tag is decided when the flow is called. Its obviously static bit of metadata after thtat.
👍 1
a

Anna Geller

10/31/2022, 3:02 AM
I believe the docs here are a bit outdated https://docs.prefect.io/api-ref/prefect/context/?h=tags#prefect.context.tags sorry for that, afaik the tags are not intended to be used that way, at least not anymore, instead you would typically assign the tags you want on a relevant deployment so that you don't need to hardcode those in your flow and you can change those anytime on your deployment object, even from the UI
So searching the UI for flow name isn't helpful in my case. Tagging the flow with the name of the dataset makes the UI more convenient.
fully understand the problem, and I'm 100% with you on that, we would like to allow more ways to attach metadata to flow runs and deployments but for now deployment object is the best place to do that
I will keep this thread in mind when discussing this problem with the product team, if you want to, feel free to share more context of where/how you would like to organize your flow runs for better search in the UI -- this will be helpful to figure out how best we can support it this tagging functionality is still quite early and I expect it to be way nicer in the near future, your input to improve it will be valuable, thanks in advance
1
m

merlin

10/31/2022, 3:12 AM
I think I organized myself around manual development process and need to get deeper into the deployment paradigm. Seems like my single deployment
extract_write
can be scheduled over many parameters (sql files). I'm able to tag flow runs based on that input parameter at top, but it may come out differently as a subflow.
ohhh now I see -- deployments are not going to receive parameters from some other active process. The deployment is setting the job 'in stone' as it were. • I have a single flow • every SQL job gets its own deployment, params (
src/script1.sql
, etc) Each deployment has fixed parameters, and the tags would be set at the deployment build stage. You can change the parameters for an ad-hoc run to override the defaults, in my design this wouldn't be needed. Another way, I could have a control flow,
daily_jobs
just loop through the all the sql files and call the
extract_write
flow on each of them. In this case my
with tags...
thing would help again. I like prefect because the flexibility of how to design a system. Once I learn all of this limited set of constraints I will be able to just think about what design approach to take.
💯 2
Just following up with how it worked out for me with tagging: • Using
with tags..
to tag the flow run according to the name of the dataset being processed. • entrypoint takes a string, the file path of a folder full of SQL files. • a for loop iterates through each file and sends for execution on cloud DB • this 'extract' flow is deployed for folders with daily/weekly jobs here's the entrypoint flow code:
Copy code
# bulk_extract.py

@flow(name="bulk-extracts")
def bulk_extracts(directory_name):
    logger = get_run_logger()
    extract_folder_items=Path(directory_name).iterdir()

    for item in extract_folder_items:
        <http://logger.info|logger.info>(f"daily file: {item}")
        extract_config = ExtractConfig(filepath=item)
        with tags(extract_config.dataset_name, "extract"):
            extract_write(extract_config)

if __name__ == "__main__":
    bulk_extracts(Path(sys.argv[1]))
The tags are really useful in the UI, I can gather up jobs by dataset_name, distinguish between DDL/extract jobs, etc.
gratitude thank you 1
7 Views