Neil Natarajan
08/17/2022, 3:26 PMNot Ready
with the future.result()
resolving to None
. The issue is that two of the final tasks of my flow accept futures of previous tasks as input and are listed in the wait_for
section as well. These final two tasks instead are stuck in a Pending
state and this causes the flow to hang instead of running to completion. Any advice on how to properly implement the ideal behavior (tasks that depend on futures from previous tasks that fail or didn't run should also fail/not run and flow should run to completion) would be greatly appreciated, thank you!
In the logs, I see the flow encountered an UpstreamTaskError
.Mason Menges
08/17/2022, 4:04 PMNeil Natarajan
08/17/2022, 4:05 PMPending
and I see the following in my logs
raise UpstreamTaskError(
prefect.exceptions.UpstreamTaskError: Upstream task run '5ed91f00-90d8-4cd4-9e9c-2a6fcd3805ec' did not reach a 'COMPLETED' state.
Crashed
Mason Menges
08/17/2022, 4:09 PMNeil Natarajan
08/17/2022, 4:12 PM2022-08-17 11:47:00,630 - [ERROR] - base_events:default_exception_handler:1738 - Exception in callback Worker._handle_stimulus_from_task(<Task finishe...ED' state.")>>) at /Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/distributed/worker.py:1844
handle: <Handle Worker._handle_stimulus_from_task(<Task finishe...ED' state.")>>) at /Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/distributed/worker.py:1844>
Traceback (most recent call last):
File "/Users/neil.natarajan/.pyenv/versions/3.9.10/lib/python3.9/asyncio/events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/distributed/worker.py", line 193, in wrapper
return method(self, *args, **kwargs)
File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/distributed/worker.py", line 1854, in _handle_stimulus_from_task
super()._handle_stimulus_from_task(task)
File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/distributed/worker_state_machine.py", line 3337, in _handle_stimulus_from_task
stim = task.result()
File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/distributed/worker.py", line 180, in wrapper
return await method(self, *args, **kwargs) # type: ignore
File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/distributed/worker.py", line 2186, in execute
result = await apply_function_async(
File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/distributed/worker.py", line 2938, in apply_function_async
result = await function(*args, **kwargs)
File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/prefect/engine.py", line 946, in begin_task_run
return await orchestrate_task_run(
File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/prefect/engine.py", line 1007, in orchestrate_task_run
resolved_parameters = await resolve_inputs(parameters)
File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/prefect/engine.py", line 1230, in resolve_inputs
return await visit_collection(
File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/prefect/utilities/collections.py", line 288, in visit_collection
values = await gather(*[visit_nested(v) for v in values])
File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 341, in gather
keys.append(tg.start_soon(call))
File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 317, in __aexit__
retval = await self._task_group.__aexit__(*tb)
File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 660, in __aexit__
raise ExceptionGroup(exceptions)
anyio._backends._asyncio.ExceptionGroup: 2 exceptions were raised in the task group:
Mason Menges
08/17/2022, 4:40 PMNeil Natarajan
08/17/2022, 4:50 PM@flow(name="Create FD Instances Workflow", task_runner=DaskTaskRunner(), validate_parameters=False)
def create_fd_instances_workflow(
repo_dict: Dict[str, GitRepo],
create_params_list: List[CreateFDInstParams],
create_fi_name: str,
create_workflow_uuids: List[str],
):
"""
Beginning of Create Logic
"""
# Preprocess Create Requests
valid_requests_future = preprocess_duplicate_requests.submit(
params_list=create_params_list, workflow_uuids=create_workflow_uuids
)
valid_requests = valid_requests_future.result(raise_on_failure=False)
# get fd type objects
fd_type_objects_futures = [
get_fd_type_def.submit(
git_repo=repo_dict[ServiceRegistryRepo.FUNCTIONAL_DOMAIN_DEFINITION.value],
params=create_params,
)
for create_params in create_params_list
]
# validate that FI and FD Instance params are valid
is_fi_fd_valid_futures = [
validate_fi_fd_inputs.submit(
wait_for=fd_type_objects_futures,
fi_name=create_fi_name,
workflow_uuid=create_uuid,
valid_request=valid_req,
falcon_inst_repo=repo_dict[ServiceRegistryRepo.FALCON_INSTANCE_DEFINITION.value],
params=create_params,
fd_type=fd_type_obj,
)
for fd_type_obj, create_params, create_uuid, valid_req in zip(
fd_type_objects_futures, create_params_list, create_workflow_uuids, valid_requests
)
]
env_name_futures = [
get_env_name.submit(
workflow_uuid=c_uuid,
fi_name=create_fi_name,
falcon_inst_repo=repo_dict[ServiceRegistryRepo.FALCON_INSTANCE_DEFINITION.value],
params=c_params,
) for c_params, c_uuid in zip(create_params_list, create_workflow_uuids)
]
# determine if requested FI is stage/prod
req_prod_fi_futures = [
requested_prod_fi.submit(
wait_for=env_name_futures,
workflow_uuid=create_workflow_uuids,
env_name=env_name,
env_git_repo=repo_dict[ServiceRegistryRepo.FALCON_ENVIRONMENT_DEFINITION.value],
params=create_params,
) for env_name, create_params in zip(env_name_futures, create_params_list)
]
# create client to query gus
simple_sf_client_future = create_simple_sf_client.submit()
# check for approval
approvals_futures = [
evaluate_approval.submit(
wait_for=([simple_sf_client_future] + req_prod_fi_futures),
workflow_uuid=c_uuid,
fi_name=create_fi_name,
params=c_params,
client=simple_sf_client_future,
req_prod_fi=req_prod_fi,
) for c_params, c_uuid, req_prod_fi in zip(create_params_list, create_workflow_uuids, req_prod_fi_futures)
]
# create base FD inst objects
new_created_fd_inst_futures = [
create_fd_inst_obj.submit(
wait_for=fd_type_objects_futures,
params=c_params,
obj_uuid=c_uuid,
fd_type_obj=fd_obj,
) for c_params, c_uuid, fd_obj in zip(create_params_list, create_workflow_uuids, fd_type_objects_futures)
]
valid_to_write_new_fd_futures = [
determine_if_new_fd_valid.submit(
wait_for=([valid_requests_future] + approvals_futures + is_fi_fd_valid_futures + env_name_futures + fd_type_objects_futures),
valid_request=valid_r,
approval=appr,
fi_fd_valid=v_fi_fd
) for appr, v_fi_fd, valid_r in zip(approvals_futures, is_fi_fd_valid_futures, valid_requests)
]
# write objects locally for valid requests
create_success_futures = [
write_object.submit(
wait_for=([valid_requests_future] + new_created_fd_inst_futures + valid_to_write_new_fd_futures + fd_type_objects_futures + env_name_futures),
workflow_uuid=c_uuid,
valid_req=valid_to_write_new_fd,
fi_name=create_fi_name,
falcon_inst_repo=repo_dict[ServiceRegistryRepo.FALCON_INSTANCE_DEFINITION.value],
params=c_params,
fd_inst=new_created_fd_inst,
workflow_type=WorkflowTypesConfig.CREATE_FD_NAME
) for c_params, c_uuid, valid_to_write_new_fd, new_created_fd_inst in zip(
create_params_list,
create_workflow_uuids,
valid_to_write_new_fd_futures,
new_created_fd_inst_futures
)
]
create_results = [future.result(raise_on_failure=False) for future in create_success_futures]
Mason Menges
08/17/2022, 5:01 PMNeil Natarajan
08/17/2022, 5:03 PMMason Menges
08/17/2022, 5:24 PMfrom prefect import flow, task
@task
def taska(num):
if num > 5:
return num + 1
else:
return None
@task
def taskb(some_num):
return some_num + 1
@flow
def flowa():
num_inputs = [1, 2, 3, 7, 8, 9]
test_futures = [taska.submit(num=num_val) for num_val in num_inputs]
taskb_test = [taskb.submit(test_future.result()) for test_future in test_futures if test_future.result()!=None]
[print(taskb_future.result()) for taskb_future in taskb_test]
flowa()
Neil Natarajan
08/17/2022, 5:38 PMMason Menges
08/17/2022, 5:38 PMNeil Natarajan
08/17/2022, 5:38 PMMason Menges
08/17/2022, 5:43 PM