[Prefect 1.0 -> 2.0 Migration Question] Hi! I h...
# prefect-community
n
[Prefect 1.0 -> 2.0 Migration Question] Hi! I have a flow that accepts lists of params as input. The flow is composed of many tasks that occur sequentially and in parallel. One of the initial tasks for a particular input param results in failure so the subsequent tasks are generally marked
Not Ready
with the
future.result()
resolving to
None
. The issue is that two of the final tasks of my flow accept futures of previous tasks as input and are listed in the
wait_for
section as well. These final two tasks instead are stuck in a
Pending
state and this causes the flow to hang instead of running to completion. Any advice on how to properly implement the ideal behavior (tasks that depend on futures from previous tasks that fail or didn't run should also fail/not run and flow should run to completion) would be greatly appreciated, thank you! In the logs, I see the flow encountered an
UpstreamTaskError
.
m
Hey @Neil Natarajan What version of prefect are you running? for 2.0 specifically you should be able to implement a simple If check to ensure the data fits the format you're wanting to pass.
n
I am using 2.02 atm and this is a visual of what my flow looks like
the last two tasks on the right are
Pending
and I see the following in my logs
Copy code
raise UpstreamTaskError(
prefect.exceptions.UpstreamTaskError: Upstream task run '5ed91f00-90d8-4cd4-9e9c-2a6fcd3805ec' did not reach a 'COMPLETED' state.
The flow doesn't actually complete, I have to terminate the process
and then the flow is marked
Crashed
m
Well for starters you shouldn't normally need to call .result() on a task it should just return the output of the task, do you have a minimum example of how you're flow/tasks are setup to reproduce this with?
Do you see any of the logs for the flow run specifically around the failed tasks?
n
Copy code
2022-08-17 11:47:00,630 - [ERROR] - base_events:default_exception_handler:1738 - Exception in callback Worker._handle_stimulus_from_task(<Task finishe...ED' state.")>>) at /Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/distributed/worker.py:1844
handle: <Handle Worker._handle_stimulus_from_task(<Task finishe...ED' state.")>>) at /Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/distributed/worker.py:1844>
Traceback (most recent call last):
  File "/Users/neil.natarajan/.pyenv/versions/3.9.10/lib/python3.9/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/distributed/worker.py", line 193, in wrapper
    return method(self, *args, **kwargs)
  File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/distributed/worker.py", line 1854, in _handle_stimulus_from_task
    super()._handle_stimulus_from_task(task)
  File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/distributed/worker_state_machine.py", line 3337, in _handle_stimulus_from_task
    stim = task.result()
  File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/distributed/worker.py", line 180, in wrapper
    return await method(self, *args, **kwargs)  # type: ignore
  File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/distributed/worker.py", line 2186, in execute
    result = await apply_function_async(
  File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/distributed/worker.py", line 2938, in apply_function_async
    result = await function(*args, **kwargs)
  File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/prefect/engine.py", line 946, in begin_task_run
    return await orchestrate_task_run(
  File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/prefect/engine.py", line 1007, in orchestrate_task_run
    resolved_parameters = await resolve_inputs(parameters)
  File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/prefect/engine.py", line 1230, in resolve_inputs
    return await visit_collection(
  File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/prefect/utilities/collections.py", line 288, in visit_collection
    values = await gather(*[visit_nested(v) for v in values])
  File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 341, in gather
    keys.append(tg.start_soon(call))
  File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 317, in __aexit__
    retval = await self._task_group.__aexit__(*tb)
  File "/Users/neil.natarajan/PycharmProjects/ssr-manager/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 660, in __aexit__
    raise ExceptionGroup(exceptions)
anyio._backends._asyncio.ExceptionGroup: 2 exceptions were raised in the task group:
m
Do you have an example of your flow code you'd be able to share?
n
Copy code
@flow(name="Create FD Instances Workflow", task_runner=DaskTaskRunner(), validate_parameters=False)
def create_fd_instances_workflow(
    repo_dict: Dict[str, GitRepo],
    create_params_list: List[CreateFDInstParams],
    create_fi_name: str,
    create_workflow_uuids: List[str],
):

    """
    Beginning of Create Logic
    """
    # Preprocess Create Requests
    valid_requests_future = preprocess_duplicate_requests.submit(
        params_list=create_params_list, workflow_uuids=create_workflow_uuids
    )
    valid_requests = valid_requests_future.result(raise_on_failure=False)

    # get fd type objects
    fd_type_objects_futures = [
        get_fd_type_def.submit(
            git_repo=repo_dict[ServiceRegistryRepo.FUNCTIONAL_DOMAIN_DEFINITION.value],
            params=create_params,
        )
        for create_params in create_params_list
    ]

    # validate that FI and FD Instance params are valid
    is_fi_fd_valid_futures = [
        validate_fi_fd_inputs.submit(
            wait_for=fd_type_objects_futures,
            fi_name=create_fi_name,
            workflow_uuid=create_uuid,
            valid_request=valid_req,
            falcon_inst_repo=repo_dict[ServiceRegistryRepo.FALCON_INSTANCE_DEFINITION.value],
            params=create_params,
            fd_type=fd_type_obj,
        )
        for fd_type_obj, create_params, create_uuid, valid_req in zip(
            fd_type_objects_futures, create_params_list, create_workflow_uuids, valid_requests
        )
    ]

    env_name_futures = [
        get_env_name.submit(
            workflow_uuid=c_uuid,
            fi_name=create_fi_name,
            falcon_inst_repo=repo_dict[ServiceRegistryRepo.FALCON_INSTANCE_DEFINITION.value],
            params=c_params,
        ) for c_params, c_uuid in zip(create_params_list, create_workflow_uuids)
    ]

    # determine if requested FI is stage/prod
    req_prod_fi_futures = [
        requested_prod_fi.submit(
            wait_for=env_name_futures,
            workflow_uuid=create_workflow_uuids,
            env_name=env_name,
            env_git_repo=repo_dict[ServiceRegistryRepo.FALCON_ENVIRONMENT_DEFINITION.value],
            params=create_params,
        ) for env_name, create_params in zip(env_name_futures, create_params_list)
    ]

    # create client to query gus
    simple_sf_client_future = create_simple_sf_client.submit()

    # check for approval
    approvals_futures = [
        evaluate_approval.submit(
            wait_for=([simple_sf_client_future] + req_prod_fi_futures),
            workflow_uuid=c_uuid,
            fi_name=create_fi_name,
            params=c_params,
            client=simple_sf_client_future,
            req_prod_fi=req_prod_fi,
        ) for c_params, c_uuid, req_prod_fi in zip(create_params_list, create_workflow_uuids, req_prod_fi_futures)
    ]

    # create base FD inst objects
    new_created_fd_inst_futures = [
        create_fd_inst_obj.submit(
            wait_for=fd_type_objects_futures,
            params=c_params,
            obj_uuid=c_uuid,
            fd_type_obj=fd_obj,
        ) for c_params, c_uuid, fd_obj in zip(create_params_list, create_workflow_uuids, fd_type_objects_futures)
    ]

    valid_to_write_new_fd_futures = [
        determine_if_new_fd_valid.submit(
            wait_for=([valid_requests_future] + approvals_futures + is_fi_fd_valid_futures + env_name_futures + fd_type_objects_futures),
            valid_request=valid_r,
            approval=appr,
            fi_fd_valid=v_fi_fd
        ) for appr, v_fi_fd, valid_r in zip(approvals_futures, is_fi_fd_valid_futures, valid_requests)
    ]

    # write objects locally for valid requests
    create_success_futures = [
        write_object.submit(
            wait_for=([valid_requests_future] + new_created_fd_inst_futures + valid_to_write_new_fd_futures + fd_type_objects_futures + env_name_futures),
            workflow_uuid=c_uuid,
            valid_req=valid_to_write_new_fd,
            fi_name=create_fi_name,
            falcon_inst_repo=repo_dict[ServiceRegistryRepo.FALCON_INSTANCE_DEFINITION.value],
            params=c_params,
            fd_inst=new_created_fd_inst,
            workflow_type=WorkflowTypesConfig.CREATE_FD_NAME
        ) for c_params, c_uuid, valid_to_write_new_fd, new_created_fd_inst in zip(
            create_params_list,
            create_workflow_uuids,
            valid_to_write_new_fd_futures,
            new_created_fd_inst_futures
        )
    ]
    create_results = [future.result(raise_on_failure=False) for future in create_success_futures]
m
At a glance It looks like you're calling .submit() on most of you're tasks, you shouldn't need to do this anymore in recent versions of prefect, this also applies to .result(), have you tried running this without those set on your tasks or task results, recent versions of prefect just return the resulting output of the task so you may need to adjust some of the checks you have in place to check for the output rather than the future.
n
Just to clarify, what should I be calling instead of .submit() to execute the same task for many inputs in parallel?
@Mason Menges
also what version of Prefect were these changes made in, I am following the prefect 2.0 docs
m
Ah nope that's correct, this was my bad I knew we had updated the tasks to return a python object instead of a future but I didn't know we had altered the task behavior to return a future when calling .submit(), That said you should still be able to implement an if check on the result of the failing tasks to only run the downstream tasks if the result is not None.
👍 1
this is a simple example that might be helpful
Copy code
from prefect import flow, task

@task
def taska(num):
    if num > 5:
        return num + 1
    else:
        return None

@task
def taskb(some_num):
    return some_num + 1

@flow
def flowa():

    num_inputs = [1, 2, 3, 7, 8, 9]

    test_futures = [taska.submit(num=num_val) for num_val in num_inputs]

    taskb_test = [taskb.submit(test_future.result()) for test_future in test_futures if test_future.result()!=None]

    [print(taskb_future.result()) for taskb_future in taskb_test]

flowa()
n
Fair enough, I'll try that
thanks
m
No problem 😄
n
So in my case I shouldn't pass the future and should pass the result directly?
m
I think that should work yes, at least for validating the value itself otherwise passing the future is still fine I would think