https://prefect.io logo
#prefect-community
Title
# prefect-community
j

Joshua Grant

01/13/2023, 9:00 PM
Couple of questions: 1. can run_deployment be used inside a task? <- specifically a mapped task 2. if so, how can wait until all the flow runs are successful?
Copy code
Crash detected! Execution was interrupted by an unexpected exception: AttributeError: 'coroutine' object has no attribute 'is_completed'
03:47:25 PM
Extract file groupings from RGI-75235cd6-0
Encountered exception during execution:
Traceback (most recent call last):
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/engine.py", line 610, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/opt/bitnami/python/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/opt/bitnami/python/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/opt/bitnami/python/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "flow_file.py", line 229, in preliminary_flow
    file_groupings = wrapped_extract_file_groupings_from_rgi(
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/tasks.py", line 360, in __call__
    return enter_task_run_engine(
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/engine.py", line 733, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 148, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/opt/bitnami/python/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/opt/bitnami/python/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/opt/bitnami/python/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/opt/bitnami/python/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/engine.py", line 874, in get_task_call_return_value
    return await future._result()
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/task_runners.py", line 204, in submit
    result = await call()
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/engine.py", line 1108, in begin_task_run
    return await orchestrate_task_run(
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/engine.py", line 1174, in orchestrate_task_run
    await resolve_inputs(wait_for, return_data=False)
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/engine.py", line 1463, in resolve_inputs
    return await run_sync_in_worker_thread(
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/opt/bitnami/python/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/opt/bitnami/python/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/opt/bitnami/python/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/utilities/collections.py", line 280, in visit_collection
    items = [visit_nested(o) for o in expr]
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/utilities/collections.py", line 280, in <listcomp>
    items = [visit_nested(o) for o in expr]
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/utilities/collections.py", line 251, in visit_nested
    return visit_collection(
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/utilities/collections.py", line 280, in visit_collection
    items = [visit_nested(o) for o in expr]
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/utilities/collections.py", line 280, in <listcomp>
    items = [visit_nested(o) for o in expr]
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/utilities/collections.py", line 251, in visit_nested
    return visit_collection(
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/utilities/collections.py", line 259, in visit_collection
    result = visit_fn(expr)
  File "/opt/bitnami/python/lib/python3.9/site-packages/prefect/engine.py", line 1453, in resolve_input
    if not state.is_completed() and not (
AttributeError: 'coroutine' object has no attribute 'is_completed'
z

Zanie

01/13/2023, 9:04 PM
Yes it should be usable in a task; that’s a weird error, what did you pass into your task?
j

Joshua Grant

01/13/2023, 9:04 PM
Copy code
def route_to_file_flow(
        file: dict,
        file_group_id: str,
        import_folder_id: str,
        context: dict,
        generate_review: bool = True,
):
z

Zanie

01/13/2023, 9:06 PM
What version are you on?
j

Joshua Grant

01/13/2023, 9:06 PM
2.6.9
I would suggest upgrading and opening an issue with a reproducible example if that happens still — that’s a weird error.
run_deployment
should be usable wherever
j

Joshua Grant

01/13/2023, 9:09 PM
unable to upgrade for a week. would something like this maybe work?
Copy code
async def run_deployment_wrapper(flow_deployment, parameters):
    # Pause here and come back to func when run_deployment() is ready
    r = await run_deployment(flow_deployment, parameters)
    return r
6 Views