https://prefect.io logo
a

Antonio Manuel BR

02/08/2022, 12:58 PM
Hello, I am executing one flow in a remote distributed dask cluster. Previously, I executed my code locally and gathered the flow results in the local easy way (e.g.
subflow_res.result[deployed_model].result
). When I wrote the code, I read the docs, knowing this way was not valid for remote executions. I would like to know the proper way to gather results when working in a remote distributed environment. Can anyone help me?
k

Kevin Kho

02/08/2022, 2:55 PM
Hi @Antonio Manuel BR, what are you trying to do with the result? Because the purpose of the remote execution is just to run the Flow
a

Antonio Manuel BR

02/09/2022, 6:40 AM
I'm iterating over a task containing that flow using LOOP signal, in order to start the next iteration, I need to save some results.
k

Kevin Kho

02/09/2022, 6:43 AM
Could you give me an example code? You may have to persist the data inside the LOOP
a

Antonio Manuel BR

02/09/2022, 6:49 AM
Here is the task containing the flow
@task
def processMiniBatch(
   
online_data_dir: str,
   
d_model: PrefectClassifier,
   
c_models: List[PrefectClassifier],
   
n_mini_batches: int,
   
label_encoder: LabelEncoder,
   
predictions_dir: str,
)]:
   
loop_payload = prefect.context.get("task_loop_result", {})
   
loop_j = loop_payload.get("loop_j", 1)
   
loop_d_model = loop_payload.get("loop_d_model", d_model)
   
loop_c_models = loop_payload.get("loop_c_models", c_models)
   
with Flow(
       
"Process mini-batch",
       
executor = DaskExecutor(f"{cluster_ip}:8786")
   
) as flow:
       
deployed_model = Parameter("deployed_model")
       
contestant_models = Parameter("contestant_models")
       
(data,filename) = ingestData(online_data_dir)
       
is_data_tagged = checkDataTagged(data)
       
with case(is_data_tagged,False):
           
predictions = predictData(deployed_model,data)
           
writePredictions(predictions, predictions_dir, filename)
       
with case(is_data_tagged,True):
           
data = dataPreparation(data, label_encoder)
           
contestant_models = testModel.map(unmapped(data), contestant_models)
           
deployed_model = testModel(data, deployed_model)
           
contestant_models = trainModel.map(
               
unmapped(data), contestant_models
           
)
   
flow_res = flow.run(
       
deployed_model=loop_d_model, contestant_models=loop_c_models
   
)
   
is_data_tagged = flow_res.result[is_data_tagged].result
   
if is_data_tagged:
           
loop_d_model = flow_res.result[deployed_model].result
       
loop_c_models = flow_res.result[contestant_models].result
       
if loop_j == n_mini_batches:
           
loop_c_models.append(loop_d_model)
           
return loop_c_models
       
loop_j += 1
   
raise signals.LOOP(
       
result=dict(
           
loop_j=loop_j, loop_d_model=loop_d_model, loop_c_models=loop_c_models
       
)
   
)
k

Kevin Kho

02/09/2022, 6:58 AM
You’ll want to rip out the Flow into it’s own Flow file, register that, and then the LOOP task can used the 3 tasks from Prefect:
create_flow_run
,
wait_for_flow_run
,
get_task_run_result
as outlined in this blog
a

Antonio Manuel BR

02/09/2022, 7:09 AM
I'll give it a try, thank you very much. Will write here for additional problems, if needed
4 Views