https://prefect.io logo
r

Rob Fowler

01/05/2021, 12:25 AM
I think this has been asked multiple times but maybe there is not an answer. Is there any way, hack or otherwise to get task parameters in a state? When a mapped task fails I need to be able to catch the failure so I can put the 'id' into the list so I can come back later and actually do something. For example, in the following working example, I would like the 'item' to be available. Otherwise maybe the way is to re-iterate through the results in the linear list in a new unmapped task and fix up the failed errors?
Copy code
from time import sleep

from prefect.engine import state
from prefect import task, Task, Flow, context
import prefect

from prefect.engine.executors import LocalDaskExecutor, LocalExecutor


def ignore_timeout_handler(task, old_state, new_state):
    print(f"{prefect.context.parameters} old {old_state} new {new_state}")
    if new_state.is_failed() and isinstance(new_state, state.TimedOut):
        return_state = state.Success(result={"state": "forced ok"})
    else:
        return_state = new_state
    return return_state


@task
def produce_range():
    return range(5, 10)


class SlowTask(Task):
    def run(self, item, sleep_time=9, **kwopts):
        sleep(sleep_time)
        # doing stuff with a host called 'item'
        return item


with Flow("Slow flow") as flow:
    slow_task = SlowTask(timeout=6, max_retries=2, retry_delay=2, state_handlers=[ignore_timeout_handler])
    nrange = produce_range()
    result = slow_task.map(item=nrange,
                           sleep_time=nrange)

# executor = LocalDaskExecutor(scheduler="threads", num_workers=10)
executor = LocalExecutor()
for ii in flow.run(executor=executor).result[result].result:
    print(ii)
k

Kyle Moon-Wright

01/05/2021, 12:58 AM
Hey @Rob Fowler , Apologies, what would your ideal solution be here? To expose the
item
value to the State Handler or the logger/context? Certainly your proposed idea is viable, perhaps a task that collects these values upon mapped failure for later iteration.
r

Rob Fowler

01/05/2021, 1:19 AM
yes, the idea of iterating in a second task will work but ideally I'd like to mutate the list so the error gets {'itemid': 123, 'error': state.Timeout(.. }
if the state handler runs in the same space as the process surely the task itself is present somewhere. I notice I am not the only one with this issue so far.
ideal solution: prefect.context.parameters have {'itemid': 123} or old_state have the params, with it being OK for new_state to have mashed them
actually ideal would be 'new_state.task_params' having {'itemid': 123} Sorry for typing and thinking on the run.
big picture is I have a list of tasks (winrm remote calls. Being windows it's all flakey as anything else MS), quite a few of the time out. I just want to record the hosts that timeout and not fail the flow but I do want a record of the failed hosts. Unlike ansible, all my code at scale must deal with the bad as much as the good.
k

Kyle Moon-Wright

01/05/2021, 1:30 AM
Do you mean logging the argument you passed to the child task in a state handler? Maybe something like this:
Copy code
from prefect import context

def capture_error_params(task, old_state, new_state):
    if new_state.is_failed():
        <http://logger.info|logger.info>(
            f"\nFailed child task parameter value: {context.parameters.get('name')}\n")
    # ^ you can 'get' any arg you passed to the task    

    return new_state
r

Rob Fowler

01/05/2021, 1:43 AM
Copy code
context.parameters
is always empty
in my example above, that would be 100% cool it it worked
[2021-01-05 01:44:41] INFO - prefect.FlowRunner | Beginning Flow run for 'Slow flow'
[2021-01-05 01:44:41] INFO - prefect.TaskRunner | Task 'produce_range': Starting task run...
[2021-01-05 01:44:41] INFO - prefect.TaskRunner | Task 'produce_range': finished task run for task with final state: 'Success'
[2021-01-05 01:44:41] INFO - prefect.TaskRunner | Task 'SlowTask': Starting task run...
{} old <Pending> new <Mapped: "Ready to proceed with mapping.">
[2021-01-05 01:44:41] INFO - prefect.TaskRunner | Task 'SlowTask': finished task run for task with final state: 'Mapped'
[2021-01-05 01:44:41] INFO - prefect.TaskRunner | Task 'SlowTask[0]': Starting task run...
{} old <Pending> new <Running: "Starting task run.">
{} old <Running: "Starting task run."> new <Success: "Task run succeeded.">
[2021-01-05 01:44:46] INFO - prefect.TaskRunner | Task 'SlowTask[0]': finished task run for task with final state: 'Success'
[2021-01-05 01:44:46] INFO - prefect.TaskRunner | Task 'SlowTask[1]': Starting task run...
{} old <Pending> new <Running: "Starting task run.">
{} old <Running: "Starting task run."> new <TimedOut: "Task timed out during execution.">
[2021-01-05 01:44:52] INFO - prefect.TaskRunner | Task 'SlowTask[1]': finished task run for task with final state: 'Success'
[2021-01-05 01:44:52] INFO - prefect.TaskRunner | Task 'SlowTask[2]': Starting task run...
{} old <Pending> new <Running: "Starting task run.">
{} old <Running: "Starting task run."> new <TimedOut: "Task timed out during execution.">
[2021-01-05 01:44:59] INFO - prefect.TaskRunner | Task 'SlowTask[2]': finished task run for task with final state: 'Success'
[2021-01-05 01:44:59] INFO - prefect.TaskRunner | Task 'SlowTask[3]': Starting task run...
{} old <Pending> new <Running: "Starting task run.">
{} old <Running: "Starting task run."> new <TimedOut: "Task timed out during execution.">
[2021-01-05 01:45:05] INFO - prefect.TaskRunner | Task 'SlowTask[3]': finished task run for task with final state: 'Success'
[2021-01-05 01:45:05] INFO - prefect.TaskRunner | Task 'SlowTask[4]': Starting task run...
{} old <Pending> new <Running: "Starting task run.">
{} old <Running: "Starting task run."> new <TimedOut: "Task timed out during execution.">
[2021-01-05 01:45:11] INFO - prefect.TaskRunner | Task 'SlowTask[4]': finished task run for task with final state: 'Success'
[2021-01-05 01:45:11] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
5
{'state': 'forced ok'}
{'state': 'forced ok'}
{'state': 'forced ok'}
{'state': 'forced ok'}
sorry for the large paste. not used to slack after using the dumpster fire that is teams for the last year
k

Kyle Moon-Wright

01/05/2021, 1:54 AM
Ha, no problem! Hmm, interestingā€¦
r

Rob Fowler

01/05/2021, 2:00 AM
so params are meant to be in there? If this is a bug that would be great
k

Kyle Moon-Wright

01/05/2021, 2:04 AM
This may be a bug, so I may have to get back to you on this. Thanks for bringing this to our attention! Will let you know when I have more answers.
OK I believe we found a bug! šŸ› Please feel free to open an issue on this, otherwise I will open one first thing in the morning. Thanks again! hero
r

Rob Fowler

01/05/2021, 2:18 AM
OK, I'll open an issue with my test program after lunch (down here in Sydney), thanks again. (I'm on holidays so I am working on this for an idea at work LOL)
šŸ„Ŗ 1