Antonio Manuel BR
02/08/2022, 12:58 PMsubflow_res.result[deployed_model].result
). When I wrote the code, I read the docs, knowing this way was not valid for remote executions. I would like to know the proper way to gather results when working in a remote distributed environment.
Can anyone help me?Kevin Kho
Antonio Manuel BR
02/09/2022, 6:40 AMKevin Kho
Antonio Manuel BR
02/09/2022, 6:49 AM@task
def processMiniBatch(
online_data_dir: str,
d_model: PrefectClassifier,
c_models: List[PrefectClassifier],
n_mini_batches: int,
label_encoder: LabelEncoder,
predictions_dir: str,
)]:
loop_payload = prefect.context.get("task_loop_result", {})
loop_j = loop_payload.get("loop_j", 1)
loop_d_model = loop_payload.get("loop_d_model", d_model)
loop_c_models = loop_payload.get("loop_c_models", c_models)
with Flow(
"Process mini-batch",
executor = DaskExecutor(f"{cluster_ip}:8786")
) as flow:
deployed_model = Parameter("deployed_model")
contestant_models = Parameter("contestant_models")
(data,filename) = ingestData(online_data_dir)
is_data_tagged = checkDataTagged(data)
with case(is_data_tagged,False):
predictions = predictData(deployed_model,data)
writePredictions(predictions, predictions_dir, filename)
with case(is_data_tagged,True):
data = dataPreparation(data, label_encoder)
contestant_models = testModel.map(unmapped(data), contestant_models)
deployed_model = testModel(data, deployed_model)
contestant_models = trainModel.map(
unmapped(data), contestant_models
)
flow_res = flow.run(
deployed_model=loop_d_model, contestant_models=loop_c_models
)
is_data_tagged = flow_res.result[is_data_tagged].result
if is_data_tagged:
loop_d_model = flow_res.result[deployed_model].result
loop_c_models = flow_res.result[contestant_models].result
if loop_j == n_mini_batches:
loop_c_models.append(loop_d_model)
return loop_c_models
loop_j += 1
raise signals.LOOP(
result=dict(
loop_j=loop_j, loop_d_model=loop_d_model, loop_c_models=loop_c_models
)
)
Kevin Kho
create_flow_run
, wait_for_flow_run
, get_task_run_result
as outlined in this blogAntonio Manuel BR
02/09/2022, 7:09 AM