Rob Fowler
01/05/2021, 12:25 AMfrom time import sleep
from prefect.engine import state
from prefect import task, Task, Flow, context
import prefect
from prefect.engine.executors import LocalDaskExecutor, LocalExecutor
def ignore_timeout_handler(task, old_state, new_state):
print(f"{prefect.context.parameters} old {old_state} new {new_state}")
if new_state.is_failed() and isinstance(new_state, state.TimedOut):
return_state = state.Success(result={"state": "forced ok"})
else:
return_state = new_state
return return_state
@task
def produce_range():
return range(5, 10)
class SlowTask(Task):
def run(self, item, sleep_time=9, **kwopts):
sleep(sleep_time)
# doing stuff with a host called 'item'
return item
with Flow("Slow flow") as flow:
slow_task = SlowTask(timeout=6, max_retries=2, retry_delay=2, state_handlers=[ignore_timeout_handler])
nrange = produce_range()
result = slow_task.map(item=nrange,
sleep_time=nrange)
# executor = LocalDaskExecutor(scheduler="threads", num_workers=10)
executor = LocalExecutor()
for ii in flow.run(executor=executor).result[result].result:
print(ii)
Kyle Moon-Wright
01/05/2021, 12:58 AMitem
value to the State Handler or the logger/context? Certainly your proposed idea is viable, perhaps a task that collects these values upon mapped failure for later iteration.Rob Fowler
01/05/2021, 1:19 AMKyle Moon-Wright
01/05/2021, 1:30 AMfrom prefect import context
def capture_error_params(task, old_state, new_state):
if new_state.is_failed():
<http://logger.info|logger.info>(
f"\nFailed child task parameter value: {context.parameters.get('name')}\n")
# ^ you can 'get' any arg you passed to the task
return new_state
Rob Fowler
01/05/2021, 1:43 AMcontext.parameters
is always empty[2021-01-05 01:44:41] INFO - prefect.FlowRunner | Beginning Flow run for 'Slow flow'
[2021-01-05 01:44:41] INFO - prefect.TaskRunner | Task 'produce_range': Starting task run...
[2021-01-05 01:44:41] INFO - prefect.TaskRunner | Task 'produce_range': finished task run for task with final state: 'Success'
[2021-01-05 01:44:41] INFO - prefect.TaskRunner | Task 'SlowTask': Starting task run...
{} old <Pending> new <Mapped: "Ready to proceed with mapping.">
[2021-01-05 01:44:41] INFO - prefect.TaskRunner | Task 'SlowTask': finished task run for task with final state: 'Mapped'
[2021-01-05 01:44:41] INFO - prefect.TaskRunner | Task 'SlowTask[0]': Starting task run...
{} old <Pending> new <Running: "Starting task run.">
{} old <Running: "Starting task run."> new <Success: "Task run succeeded.">
[2021-01-05 01:44:46] INFO - prefect.TaskRunner | Task 'SlowTask[0]': finished task run for task with final state: 'Success'
[2021-01-05 01:44:46] INFO - prefect.TaskRunner | Task 'SlowTask[1]': Starting task run...
{} old <Pending> new <Running: "Starting task run.">
{} old <Running: "Starting task run."> new <TimedOut: "Task timed out during execution.">
[2021-01-05 01:44:52] INFO - prefect.TaskRunner | Task 'SlowTask[1]': finished task run for task with final state: 'Success'
[2021-01-05 01:44:52] INFO - prefect.TaskRunner | Task 'SlowTask[2]': Starting task run...
{} old <Pending> new <Running: "Starting task run.">
{} old <Running: "Starting task run."> new <TimedOut: "Task timed out during execution.">
[2021-01-05 01:44:59] INFO - prefect.TaskRunner | Task 'SlowTask[2]': finished task run for task with final state: 'Success'
[2021-01-05 01:44:59] INFO - prefect.TaskRunner | Task 'SlowTask[3]': Starting task run...
{} old <Pending> new <Running: "Starting task run.">
{} old <Running: "Starting task run."> new <TimedOut: "Task timed out during execution.">
[2021-01-05 01:45:05] INFO - prefect.TaskRunner | Task 'SlowTask[3]': finished task run for task with final state: 'Success'
[2021-01-05 01:45:05] INFO - prefect.TaskRunner | Task 'SlowTask[4]': Starting task run...
{} old <Pending> new <Running: "Starting task run.">
{} old <Running: "Starting task run."> new <TimedOut: "Task timed out during execution.">
[2021-01-05 01:45:11] INFO - prefect.TaskRunner | Task 'SlowTask[4]': finished task run for task with final state: 'Success'
[2021-01-05 01:45:11] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
5
{'state': 'forced ok'}
{'state': 'forced ok'}
{'state': 'forced ok'}
{'state': 'forced ok'}Kyle Moon-Wright
01/05/2021, 1:54 AMRob Fowler
01/05/2021, 2:00 AMKyle Moon-Wright
01/05/2021, 2:04 AMRob Fowler
01/05/2021, 2:18 AM