Hey guys, looking for some advice on the best way to do the following. We’re emulating
SubFlows until they’re available in Prefect , and we have a bunch of partitions that we map over and use the Prefect client to trigger a run of A.N. Other flow with that data partition as input. Depnding on our input data set size, this results in up to 100 sub flows. Using the standard map functionality we can trigger the flows and then return the flow ids to the parent flow.
We then want to check the status of those flows, so we know when the entire data set has been processed. We have a task defined that takes a sub_flow_id, and queries the client for flow state, until it is completed.
The problem is that the 2nd map, to check the state of each sub flow, seems to be a synchronous call - If I have 10, 20, whatever sub flows, the map task to check their state is only executing sequentially.
So the question is: Is that expected [sequential/blocking run] or am I doing something wrong? Sample code attached:
@task(trigger=triggers.all_successful, log_stdout=True)
def run_collector(partition, bucket_name, collector_name):
logger = prefect.context.get(“logger”)
logger.info(“Run CollectorSubFlows”)
FlowRun = FlowRunTask()
sub_flow_id = FlowRun.run(flow_name=collector_name,
parameters={“bucket_name”: bucket_name, “input_location”: partition[“input”],
“output_location”: partition[“output”]})
logger.info(“Started Collector Flow ID: %s”, sub_flow_id)
return sub_flow_id
@task(trigger=triggers.all_successful, log_stdout=True)
def get_state(flow_run_id):
logger = prefect.context.get(“logger”)
logger.info(“Checking state for subFlow: %s”, flow_run_id)
from prefect.client import Client
import time
client = Client()
flow_in_progress = True
flow_status = “”
while flow_in_progress:
query = ‘{flow_run(where: {id: {_eq:“’ + flow_run_id + ‘“}}, limit: 1) {id current_state {message state}}}’
flow_run = client.graphql(query).data.flow_run
if (flow_run[0].current_state.state == “Success” or flow_run[0].current_state.state == “Failed”):
flow_status = flow_run[0].current_state.state
flow_in_progress = False
else:
time.sleep(2.4)
return flow_status
def main()
with Flow(“Partition & Collect”) as flow:
partitions = partition_input_data(bucket_name, input_location, output_location, partition_count)
sub_flow_ids = run_collector.map(partitions, unmapped(bucket_name),
unmapped(collector_flow))
final_states = get_state.map(sub_flow_ids)