Hi (prefect) - running flows without cloud beta (o...
# prefect-community
m
Hi (prefect) - running flows without cloud beta (orion) works fine, but if I login - all works as expected until end of flow, I then receive an error consistenly:
14:17:55.129 | ERROR   | Flow run 'arrogant-yak' - Crash detected! Execution was interrupted by an unexpected exception.
followed by
prefect.exceptions.Abort: This run has already terminated.
. This happens regardless of task success/failure. Again, running without cloud this works fine
a
looks like some issue in your execution layer - can you share more info so that we can try to reproduce your issue? sharing the output of
prefect version
and your flow code/DeploymentSpec may help
m
Sure
Copy code
from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner


@task
def hello(x: int) -> int:
    return x**2


@task
def summer(xs: list[int]) -> int:
    return sum(xs)


@flow
def myflow():
    tmp_storage = []

    for x in range(10):
        tmp_storage.append(hello(x))

    y = summer(tmp_storage, wait_for=tmp_storage)

    return y


def runner():
    myflow.task_runner = SequentialTaskRunner()
    return myflow()
Copy code
Version:             2.0b2
API version:         0.3.0
Python version:      3.9.5
Git commit:          b2a048c2
Built:               Thu, Mar 17, 2022 2:24 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         hosted
This is just a minimal example to reproduce, it does not reflect production code.
a
thanks, and your deployment spec? is your agent process healthy?
m
No agent - this is just from a local run
a
ok gotcha, will reproduce and get back
m
Copy code
In [2]: runner()
14:30:38.573 | INFO    | prefect.engine - Created flow run 'amorphous-carp' for flow 'myflow'
14:30:38.573 | INFO    | Flow run 'amorphous-carp' - Using task runner 'SequentialTaskRunner'
14:30:39.075 | INFO    | Flow run 'amorphous-carp' - Created task run 'hello-c9dc7265-0' for task 'hello'
14:30:40.225 | INFO    | Task run 'hello-c9dc7265-0' - Finished in state Completed(None)
14:30:40.507 | INFO    | Flow run 'amorphous-carp' - Created task run 'hello-c9dc7265-1' for task 'hello'
14:30:41.213 | INFO    | Task run 'hello-c9dc7265-1' - Finished in state Completed(None)
14:30:41.374 | INFO    | Flow run 'amorphous-carp' - Created task run 'hello-c9dc7265-2' for task 'hello'
14:30:42.446 | INFO    | Task run 'hello-c9dc7265-2' - Finished in state Completed(None)
14:30:42.619 | INFO    | Flow run 'amorphous-carp' - Created task run 'hello-c9dc7265-3' for task 'hello'
14:30:43.268 | INFO    | Task run 'hello-c9dc7265-3' - Finished in state Completed(None)
14:30:43.466 | INFO    | Flow run 'amorphous-carp' - Created task run 'hello-c9dc7265-4' for task 'hello'
14:30:44.490 | INFO    | Task run 'hello-c9dc7265-4' - Finished in state Completed(None)
14:30:44.696 | INFO    | Flow run 'amorphous-carp' - Created task run 'hello-c9dc7265-5' for task 'hello'
14:30:45.371 | INFO    | Task run 'hello-c9dc7265-5' - Finished in state Completed(None)
14:30:45.528 | INFO    | Flow run 'amorphous-carp' - Created task run 'hello-c9dc7265-6' for task 'hello'
14:30:46.454 | INFO    | Task run 'hello-c9dc7265-6' - Finished in state Completed(None)
14:30:46.642 | INFO    | Flow run 'amorphous-carp' - Created task run 'hello-c9dc7265-7' for task 'hello'
14:30:47.461 | INFO    | Task run 'hello-c9dc7265-7' - Finished in state Completed(None)
14:30:47.686 | INFO    | Flow run 'amorphous-carp' - Created task run 'hello-c9dc7265-8' for task 'hello'
14:30:48.684 | INFO    | Task run 'hello-c9dc7265-8' - Finished in state Completed(None)
14:30:48.904 | INFO    | Flow run 'amorphous-carp' - Created task run 'hello-c9dc7265-9' for task 'hello'
14:30:49.639 | INFO    | Task run 'hello-c9dc7265-9' - Finished in state Completed(None)
14:30:49.920 | INFO    | Flow run 'amorphous-carp' - Created task run 'summer-65ebfa52-0' for task 'summer'
14:30:50.840 | INFO    | Task run 'summer-65ebfa52-0' - Finished in state Completed(None)
14:30:51.664 | ERROR   | Flow run 'amorphous-carp' - Crash detected! Execution was interrupted by an unexpected exception.
---------------------------------------------------------------------------
Abort                                     Traceback (most recent call last)
Input In [2], in <cell line: 1>()
----> 1 runner()

Input In [1], in runner()
     27 def runner():
     28     myflow.task_runner = SequentialTaskRunner()
---> 29     return myflow()

File ~/user/project/.venv/lib/python3.9/site-packages/prefect/flows.py:314, in Flow.__call__(self, *args, **kwargs)
    311 # Convert the call args/kwargs to a parameter dict
    312 parameters = get_call_parameters(self.fn, args, kwargs)
--> 314 return enter_flow_run_engine_from_flow_call(self, parameters)

File ~/user/project/.venv/lib/python3.9/site-packages/prefect/engine.py:110, in enter_flow_run_engine_from_flow_call(flow, parameters)
    107             return portal.call(begin_run)
    108     else:
    109         # An event loop is not running so we will create one
--> 110         return anyio.run(begin_run)
    112 # Sync subflow run
    113 if not parent_flow_run_context.flow.isasync:

File ~/user/project/.venv/lib/python3.9/site-packages/anyio/_core/_eventloop.py:56, in run(func, backend, backend_options, *args)
     54 try:
     55     backend_options = backend_options or {}
---> 56     return asynclib.run(func, *args, **backend_options)
     57 finally:
     58     if token:

File ~/user/project/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:233, in run(func, debug, use_uvloop, policy, *args)
    230         del _task_states[task]
    232 _maybe_set_event_loop_policy(policy, use_uvloop)
--> 233 return native_run(wrapper(), debug=debug)

File /usr/lib/python3.9/asyncio/runners.py:44, in run(main, debug)
     42     if debug is not None:
     43         loop.set_debug(debug)
---> 44     return loop.run_until_complete(main)
     45 finally:
     46     try:

File /usr/lib/python3.9/asyncio/base_events.py:642, in BaseEventLoop.run_until_complete(self, future)
    639 if not future.done():
    640     raise RuntimeError('Event loop stopped before Future completed.')
--> 642 return future.result()

File ~/user/project/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:228, in run.<locals>.wrapper()
    225     task.set_name(task_state.name)
    227 try:
--> 228     return await func(*args)
    229 finally:
    230     del _task_states[task]

File ~/user/project/.venv/lib/python3.9/site-packages/prefect/client.py:82, in i
     80 async with client_context as client:
     81     kwargs.setdefault("client", client)
---> 82     return await fn(*args, **kwargs)

File ~/user/project/.venv/lib/python3.9/site-packages/prefect/engine.py:175, in
    170     <http://engine_logger.info|engine_logger.info>(
    171         f"Flow run {flow_run.name!r} received invalid parameters and is marked as failed
    172     )
    173     return state
--> 175 return await begin_flow_run(
    176     flow=flow, flow_run=flow_run, parameters=parameters, client=client
    177 )

File ~/user/project/.venv/lib/python3.9/site-packages/prefect/engine.py:283, in
    272     task_runner = await stack.enter_async_context(flow.task_runner.start())
    274     terminal_state = await orchestrate_flow_run(
    275         flow,
    276         flow_run=flow_run,
   (...)
    280         sync_portal=sync_portal,
    281     )
--> 283     await client.propose_state(
    284         state=terminal_state,
    285         flow_run_id=flow_run.id,
    286     )
    288 # If debugging, use the more complete `repr` than the usual `str` description
    289 display_state = repr(terminal_state) if PREFECT_DEBUG_MODE else str(terminal_state)

File ~/user/project/.venv/lib/python3.9/site-packages/prefect/client.py:1512, in
   1509     return state
   1511 elif response.status == schemas.responses.SetStateStatus.ABORT:
-> 1512     raise prefect.exceptions.Abort(response.details.reason)
   1514 elif response.status == schemas.responses.SetStateStatus.WAIT:
   1515     self.logger.debug(
   1516         f"Received wait instruction for {response.details.delay_seconds}s: "
   1517         f"{response.details.reason}"
   1518     )

Abort: This run has already terminated.
Happens consistently both using nvim-dap, python3 and ipython runtimes calling
runner()
a
Thanks for reporting that! This seems to be some larger issue not related to your flow, even running this hello world flow gave me the same error:
Copy code
from prefect import task, flow
from prefect import get_run_logger


@task
def get_name():
    return "Marvin"


@task
def say_hi(user_name: str):
    logger = get_run_logger()
    <http://logger.info|logger.info>("Hello %s!", user_name)


@flow
def hw():
    user = get_name()
    say_hi(user)


if __name__ == "__main__":
    hw()
error:
Copy code
15:05:55.599 | INFO    | prefect.engine - Created flow run 'miniature-alpaca' for flow 'hw'
15:05:55.600 | INFO    | Flow run 'miniature-alpaca' - Using task runner 'ConcurrentTaskRunner'
15:05:55.937 | INFO    | Flow run 'miniature-alpaca' - Created task run 'get_name-2744ebee-0' for task 'get_name'
15:05:56.176 | INFO    | Flow run 'miniature-alpaca' - Created task run 'say_hi-b457a9f2-0' for task 'say_hi'
15:05:57.514 | INFO    | Task run 'get_name-2744ebee-0' - Finished in state Completed(None)
15:05:57.681 | INFO    | Task run 'say_hi-b457a9f2-0' - Hello Marvin!
15:05:58.658 | INFO    | Task run 'say_hi-b457a9f2-0' - Finished in state Completed(None)
15:06:00.632 | ERROR   | Flow run 'miniature-alpaca' - Crash detected! Execution was interrupted by an unexpected exception.
Traceback (most recent call last):
  File "/Users/anna/repos/gitops-orion-flows/flows/getting_started_with_cloud/hello_world_no_deployment.py", line 23, in <module>
    hw()
  File "/Users/anna/opt/anaconda3/envs/orion39/lib/python3.9/site-packages/prefect/flows.py", line 314, in __call__
    return enter_flow_run_engine_from_flow_call(self, parameters)
  File "/Users/anna/opt/anaconda3/envs/orion39/lib/python3.9/site-packages/prefect/engine.py", line 110, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/Users/anna/opt/anaconda3/envs/orion39/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 56, in run
    return asynclib.run(func, *args, **backend_options)
  File "/Users/anna/opt/anaconda3/envs/orion39/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 233, in run
    return native_run(wrapper(), debug=debug)
  File "/Users/anna/opt/anaconda3/envs/orion39/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Users/anna/opt/anaconda3/envs/orion39/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/Users/anna/opt/anaconda3/envs/orion39/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 228, in wrapper
    return await func(*args)
  File "/Users/anna/opt/anaconda3/envs/orion39/lib/python3.9/site-packages/prefect/client.py", line 82, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/anna/opt/anaconda3/envs/orion39/lib/python3.9/site-packages/prefect/engine.py", line 175, in create_then_begin_flow_run
    return await begin_flow_run(
  File "/Users/anna/opt/anaconda3/envs/orion39/lib/python3.9/site-packages/prefect/engine.py", line 283, in begin_flow_run
    await client.propose_state(
  File "/Users/anna/opt/anaconda3/envs/orion39/lib/python3.9/site-packages/prefect/client.py", line 1512, in propose_state
    raise prefect.exceptions.Abort(response.details.reason)
prefect.exceptions.Abort: This run has already terminated.
@Marvin open "Prefect 2.0: Abort exception is raised with any local flow run connected to Cloud 2.0 API"
m
Ok 🙂
a
@Malthe Karbo, to keep you up to date, the issue is resolved in the latest release - LMK if you still have any questions