Vlad Koshelev
12/16/2020, 8:46 AMreferences_task = StartFlowRun(flow_name='references-etl', project_name='etl', wait=True)
facts_task = StartFlowRun(flow_name='facts-etl', project_name='etl', wait=True)
with Flow('main-etl') as main_flow:
files = get_files()
references_task.map(files)
facts_task.map(files)
@task
def get_group(file)
# get files "group" name from the file (20200101-facts.csv -> group=20200101)
@task
def check_references_done(group, file):
# get references tasks from Perfect DB for the group and check they are done
# or check if wait timeout reached (e.g. get time of the file creation and check if now - created_at > timeout)
@task
def check_no_another_facts_running(group, file):
# check if no "do_etl" tasks from Perfect DB with "running" state exist
@task
def do_etl(group, file):
...
with Flow('facts-etl') as facts_flow:
file = Parameter('file')
group = get_group(file)
check_references_done(group, file)
check_no_another_facts_running(group, file)
do_etl(group, file, upstream_tasks=[check_references_done, check_no_another_facts_running])
josh
12/16/2020, 2:17 PMget_task_run_info.
Tasks do have a tags
attribute but you would need to write a graphql query to retrieve them based on tag.
It might also be worth looking into Targets for some sort of shared state between your flow runs
https://docs.prefect.io/core/concepts/persistence.html#output-caching-based-on-a-file-target
https://docs.prefect.io/core/idioms/targets.html
Targets would allow you to perform something where you can have flow run 1 write some data to a known location (perhaps partitioned by current date or something) and then flow run 2 can use the same target location to verify existence of data before it proceeds.
Outside of that you may need to write your own way of accessing that shared persistenceVlad Koshelev
12/16/2020, 7:28 PMRESULT = LocalResult()
@task(
result=RESULT,
target=lambda **kwargs: f"{kwargs['flow_name']}-{kwargs['partition_key']}",
)
def done(partition_key):
return True
@task
def check_done(flow_name, partition_key):
if not RESULT.exists(f'{flow_name}-{partition_key}'):
raise FAIL
with Flow('ref-flow') as ref_flow:
key = Parameter('partition_key')
do_ref_etl_stuff()
done(key, upstream_tasks=[do_ref_etl_stuff])
with Flow('fact-flow') as fact_flow:
key = Parameter('partition_key')
check_done('ref-flow', key)
do_fact_etl_stuff(upstream_tasks=[check_done])