d

    David Ojeda

    2 years ago
    Hello there! I was suprised to find that a tasks instances are reused when executed; that is, the
    .run
    method is called with the same instance but different input values. In other words, if I change a member variable inside
    .run
    , this change is available to the next
    run
    call. Perhaps a minimal example can explain the situation a bit better:
    from prefect import task, Task, Flow
    
    
    @task
    def generate_numbers():
        return list(range(10))
    
    
    class MyTask(Task):
    
        def __init__(self, *, value=None, **kwargs):
            super().__init__(**kwargs)
            self.value = None
    
        def run(self, *, number):
            print(f'Hello I am {hex(id(self))} number is {number}')
            if self.value is None:
                self.value = number
            else:
                print(f'What? Who set this value={self.value}?')
            return number + 1
    
    
    instance = MyTask()
    
    with Flow("My First Flow") as flow:
        n = generate_numbers()
        n1 = instance.map(number=n)
    
    flow.run()
    This outputs:
    [2019-12-02 11:46:43,162] INFO - prefect.FlowRunner | Beginning Flow run for 'My First Flow'
    [2019-12-02 11:46:43,162] INFO - prefect.FlowRunner | Starting flow run.
    [2019-12-02 11:46:43,166] INFO - prefect.TaskRunner | Task 'generate_numbers': Starting task run...
    [2019-12-02 11:46:43,166] INFO - prefect.TaskRunner | Task 'generate_numbers': finished task run for task with final state: 'Success'
    [2019-12-02 11:46:43,167] INFO - prefect.TaskRunner | Task 'MyTask': Starting task run...
    [2019-12-02 11:46:43,168] INFO - prefect.TaskRunner | Task 'MyTask[8]': Starting task run...
    Hello I am 0x11ddaef50 number is 8
    [2019-12-02 11:46:43,169] INFO - prefect.TaskRunner | Task 'MyTask[8]': finished task run for task with final state: 'Success'
    [2019-12-02 11:46:43,169] INFO - prefect.TaskRunner | Task 'MyTask[1]': Starting task run...
    Hello I am 0x11ddaef50 number is 1
    What? Who set this value=8?
    ...
    I assume that this is specific to the local executor. In general, I don’t think this is a problem, but I was wondering if there is any documentation / warning somewhere in the docs that I may have missed ?
    Chris White

    Chris White

    2 years ago
    Hey @David Ojeda ! While this isn’t explicitly stated anywhere, I think it falls under the category of “don’t rely on statefulness within your flow” as described further here: https://docs.prefect.io/core/tutorials/task-guide.html#best-practices-in-writing-tasks
    d

    David Ojeda

    2 years ago
    Ah, yes… That’s what I was looking for!
    Thanks a lot Chris