Rob Fowler
02/08/2021, 11:24 AMfrom time import sleep
from prefect.engine import state
from prefect import task, Task, Flow, context
import prefect
from prefect.engine.executors import LocalDaskExecutor, LocalExecutor
@task
def produce_range():
return range(1, 10)
class SlowTask(Task):
def run(self, item, sleep_time=9, **kwopts):
sleep(sleep_time)
# doing stuff with a host called 'item'
return item
class CaughtTimeout:
pass
def ignore_timeout_handler(task, old_state, new_state):
if new_state.is_failed() and isinstance(new_state, state.TimedOut):
return state.Success(result=CaughtTimeout())
else:
return new_state
@prefect.task
def combine_failed(item_list, results):
rvals = list()
for ii, oo in zip(item_list, results):
if isinstance(oo, CaughtTimeout):
rvals.append({'item': ii, 'status': 'timeout'})
else:
rvals.append({'item': ii, 'status': 'OK'})
return rvals
with Flow("Slow flow") as flow:
slow_task = SlowTask(timeout=5, state_handlers=[ignore_timeout_handler])
nrange = produce_range()
t_result = slow_task.map(item=nrange,
sleep_time=nrange)
result = combine_failed(nrange, t_result)
if __name__ == '__main__':
executor = LocalDaskExecutor(scheduler="processes", num_workers=10)
#executor = LocalExecutor()
for ii in flow.run(executor=executor).result[result].result:
print(ii)
~
Zanie
Rob Fowler
02/08/2021, 8:52 PMJim Crist-Harif
02/08/2021, 8:54 PM