odd, I have had a prefect flow running for months,...
# prefect-community
r
odd, I have had a prefect flow running for months, not I am getting: Unexpected error occured in FlowRunner: ValueError('Could not infer an active Flow context.')
Copy code
from time import sleep

from prefect.engine import state
from prefect import task, Task, Flow, context
import prefect

from prefect.engine.executors import LocalDaskExecutor, LocalExecutor

@task
def produce_range():
    return range(1, 10)


class SlowTask(Task):
    def run(self, item, sleep_time=9, **kwopts):
        sleep(sleep_time)
        # doing stuff with a host called 'item'
        return item


class CaughtTimeout:
    pass

def ignore_timeout_handler(task, old_state, new_state):
    if new_state.is_failed() and isinstance(new_state, state.TimedOut):
        return state.Success(result=CaughtTimeout())
    else:
        return new_state


@prefect.task
def combine_failed(item_list, results):
    rvals = list()
    for ii, oo in zip(item_list, results):
        if isinstance(oo, CaughtTimeout):
            rvals.append({'item': ii, 'status': 'timeout'})
        else:
            rvals.append({'item': ii, 'status': 'OK'})
    return rvals


with Flow("Slow flow") as flow:
    slow_task = SlowTask(timeout=5, state_handlers=[ignore_timeout_handler])
    nrange = produce_range()
    t_result = slow_task.map(item=nrange,
                           sleep_time=nrange)
    result = combine_failed(nrange, t_result)

if __name__ == '__main__':
    executor = LocalDaskExecutor(scheduler="processes", num_workers=10)
    #executor = LocalExecutor()
    for ii in flow.run(executor=executor).result[result].result:
        print(ii)
~
standalone app
fails: python stest.py [2021-02-08 114436] INFO - prefect.FlowRunner | Beginning Flow run for 'Slow flow' [2021-02-08 114436] ERROR - prefect.FlowRunner | Unexpected error: ValueError('Could not infer an active Flow context.') Traceback (most recent call last): File "/home/rfo/wa/jet-automation/venv/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner new_state = method(self, state, *args, **kwargs) File "/home/rfo/wa/jet-automation/venv/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 494, in get_flow_run_state upstream_states = executor.wait( File "/home/rfo/wa/jet-automation/venv/lib/python3.8/site-packages/prefect/engine/executors/dask.py", line 549, in wait return dask.compute( File "/home/rfo/wa/jet-automation/venv/lib/python3.8/site-packages/dask/base.py", line 563, in compute results = schedule(dsk, keys, **kwargs) File "/home/rfo/wa/jet-automation/venv/lib/python3.8/site-packages/dask/multiprocessing.py", line 202, in get dsk2, dependencies = cull(dsk, keys) File "/home/rfo/wa/jet-automation/venv/lib/python3.8/site-packages/dask/optimization.py", line 51, in cull dependencies_k = get_dependencies(dsk, k, as_list=True) # fuse needs lists File "/home/rfo/wa/jet-automation/venv/lib/python3.8/site-packages/dask/core.py", line 258, in get_dependencies return keys_in_tasks(dsk, [arg], as_list=as_list) File "/home/rfo/wa/jet-automation/venv/lib/python3.8/site-packages/dask/core.py", line 186, in keys_in_tasks if w in keys: File "/usr/lib/python3.8/_collections_abc.py", line 666, in contains self[key] File "/home/rfo/wa/jet-automation/venv/lib/python3.8/site-packages/dask/highlevelgraph.py", line 509, in getitem return self.layers[key[0]][key] File "/home/rfo/wa/jet-automation/venv/lib/python3.8/site-packages/prefect/core/task.py", line 804, in getitem return prefect.tasks.core.operators.GetItem().bind(self, key) File "/home/rfo/wa/jet-automation/venv/lib/python3.8/site-packages/prefect/core/task.py", line 523, in bind raise ValueError("Could not infer an active Flow context.") ValueError: Could not infer an active Flow context. [2021-02-08 114436] ERROR - prefect.Slow flow | Unexpected error occured in FlowRunner: ValueError('Could not infer an active Flow context.') Traceback (most recent call last): File "stest.py", line 52, in <module> for ii in flow.run(executor=executor).result[result].result: TypeError: 'ValueError' object is not subscriptable
works in 13!
so LocalDaskExecutor is not supported anymore?
z
Hi @Rob Fowler -- this looks worth opening an issue in the Prefect repo on Github, the executor should still be supported.
r
Just for the record, it's a dask issue that comes up with the latest release: https://github.com/dask/dask/issues/7190
thanks to @Jim Crist-Harif
j
I'll also push a patch upstream, but that may take longer to propagate out. We're releasing Prefect tomorrow anyway, so you can get this fairly quickly