Hey guys, looking for some advice on the best way ...
# prefect-community
d
Hey guys, looking for some advice on the best way to do the following. We’re emulating SubFlows until they’re available in Prefect , and we have a bunch of partitions that we map over and use the Prefect client to trigger a run of A.N. Other flow with that data partition as input. Depnding on our input data set size, this results in up to 100 sub flows. Using the standard map functionality we can trigger the flows and then return the flow ids to the parent flow. We then want to check the status of those flows, so we know when the entire data set has been processed. We have a task defined that takes a sub_flow_id, and queries the client for flow state, until it is completed. The problem is that the 2nd map, to check the state of each sub flow, seems to be a synchronous call - If I have 10, 20, whatever sub flows, the map task to check their state is only executing sequentially. So the question is: Is that expected [sequential/blocking run] or am I doing something wrong? Sample code attached: @task(trigger=triggers.all_successful, log_stdout=True) def run_collector(partition, bucket_name, collector_name): logger = prefect.context.get(“logger”) logger.info(“Run CollectorSubFlows”) FlowRun = FlowRunTask() sub_flow_id = FlowRun.run(flow_name=collector_name, parameters={“bucket_name”: bucket_name, “input_location”: partition[“input”], “output_location”: partition[“output”]}) logger.info(“Started Collector Flow ID: %s”, sub_flow_id) return sub_flow_id @task(trigger=triggers.all_successful, log_stdout=True) def get_state(flow_run_id): logger = prefect.context.get(“logger”) logger.info(“Checking state for subFlow: %s”, flow_run_id) from prefect.client import Client import time client = Client() flow_in_progress = True flow_status = “” while flow_in_progress: query = ‘{flow_run(where: {id: {_eq:“’ + flow_run_id + ‘“}}, limit: 1) {id current_state {message state}}}’ flow_run = client.graphql(query).data.flow_run if (flow_run[0].current_state.state == “Success” or flow_run[0].current_state.state == “Failed”): flow_status = flow_run[0].current_state.state flow_in_progress = False else: time.sleep(2.4) return flow_status def main() with Flow(“Partition & Collect”) as flow: partitions = partition_input_data(bucket_name, input_location, output_location, partition_count) sub_flow_ids = run_collector.map(partitions, unmapped(bucket_name), unmapped(collector_flow)) final_states = get_state.map(sub_flow_ids)
j
Hi @Darragh I believe what would help you here is Depth First Execution which is coming out in the next release! (or on the master branch currently if you want to play around with it) In the version you are using it is requiring the first map to complete before the next one begins and with DFE this won’t be the case 🙂
d
Perfect! Thanks for that Josh, I'll try it out now. When is the next version due out?
j
I think sometime in the next day or two
d
Hey @josh Back again on this one - I’m still hitting the non DFE problem with mapped tasks. I’ve updated the base image I build flows into to use build from master
Copy code
FROM prefecthq/prefect:master
And I’ve also updated the version on AWS to use master, but there’s no change in behaviour. One thing I noticed is that the version reported in the UI for that Flow, is that Prefect Core Version is still listed as 0.11.5, despite my changes. I can also see that when the docker storage is being built it shows this output. Is there a way to override? Step 3/9 : RUN pip show prefect || pip install git+https://github.com/PrefectHQ/prefect.git@0.11.5#egg=prefect[kubernetes]
Actually I should put this out to the larger group rather than targeting one person :)