https://prefect.io logo
v

Vlad Koshelev

12/16/2020, 8:46 AM
Hi, everyone. I'm trying to build ETL flow for star schema DB with references and facts tables. They are loaded from csv files daily. The facts should be loaded after the references, so facts ETL flow depends on already loaded references for the day. The problem is that files come in unpredictable order. E.g. facts.csv first, references.csv next. Moreover, references.csv may not be present on some days. In that case facts flow should wait for references for some time. The cases are: 1. references.csv uploaded first, facts.csv uploaded last: load references -> load facts, 2. facts.csv first, references.csv last: wait for references -> load references -> load facts, 3. facts.csv and no references.csv: wait for references with timeout -> load facts Another problem is that I need to forbid parallel execution of facts loading if there are several fact files uploaded. (facts data come in sliding time ranges without primary keys, so before loading them to DB I need to delete previous data within the time range). I guess I need a shared state which tells facts flow if references are already loaded or not and tells it that some other facts flow task is running. And the state is already present in Prefect DB tasks. Is there an API to get Prefect tasks from DB and which criteria I can use (e.g. some custom tags, dynamically set) to filter them? Something like this:
Copy code
references_task = StartFlowRun(flow_name='references-etl', project_name='etl', wait=True)
facts_task = StartFlowRun(flow_name='facts-etl', project_name='etl', wait=True)

with Flow('main-etl') as main_flow:
    files = get_files()
    references_task.map(files)
    facts_task.map(files)


@task
def get_group(file)
    # get files "group" name from the file (20200101-facts.csv -> group=20200101)

@task
def check_references_done(group, file):
    # get references tasks from Perfect DB for the group and check they are done
    # or check if wait timeout reached (e.g. get time of the file creation and check if now - created_at > timeout)

@task
def check_no_another_facts_running(group, file):
    # check if no "do_etl" tasks from Perfect DB with "running" state exist

@task
def do_etl(group, file):
    ...

with Flow('facts-etl') as facts_flow:
    file = Parameter('file')
    group = get_group(file)
    check_references_done(group, file)
    check_no_another_facts_running(group, file)
    do_etl(group, file, upstream_tasks=[check_references_done, check_no_another_facts_running])
j

josh

12/16/2020, 2:17 PM
Hey @Vlad Koshelev not sure if this will accomplish all of your goals but take a look at some of the Client methods available to you. Most notably
get_task_run_info.
Tasks do have a
tags
attribute but you would need to write a graphql query to retrieve them based on tag. It might also be worth looking into Targets for some sort of shared state between your flow runs https://docs.prefect.io/core/concepts/persistence.html#output-caching-based-on-a-file-target https://docs.prefect.io/core/idioms/targets.html Targets would allow you to perform something where you can have flow run 1 write some data to a known location (perhaps partitioned by current date or something) and then flow run 2 can use the same target location to verify existence of data before it proceeds. Outside of that you may need to write your own way of accessing that shared persistence
v

Vlad Koshelev

12/16/2020, 7:28 PM
Thanks a lot @josh! Targets are exactly what I need for sharing state. For those who are looking for a solution, here's an example:
Copy code
RESULT = LocalResult()


@task(
    result=RESULT,
    target=lambda **kwargs: f"{kwargs['flow_name']}-{kwargs['partition_key']}",
)
def done(partition_key):
    return True


@task
def check_done(flow_name, partition_key):
    if not RESULT.exists(f'{flow_name}-{partition_key}'):
        raise FAIL


with Flow('ref-flow') as ref_flow:
    key = Parameter('partition_key')
    do_ref_etl_stuff()
    done(key, upstream_tasks=[do_ref_etl_stuff])


with Flow('fact-flow') as fact_flow:
    key = Parameter('partition_key')
    check_done('ref-flow', key)
    do_fact_etl_stuff(upstream_tasks=[check_done])
5 Views