David Ojeda
12/02/2019, 11:52 AM.run
method is called with the same instance but different input values. In other words, if I change a member variable inside .run
, this change is available to the next run
call.
Perhaps a minimal example can explain the situation a bit better:
from prefect import task, Task, Flow
@task
def generate_numbers():
return list(range(10))
class MyTask(Task):
def __init__(self, *, value=None, **kwargs):
super().__init__(**kwargs)
self.value = None
def run(self, *, number):
print(f'Hello I am {hex(id(self))} number is {number}')
if self.value is None:
self.value = number
else:
print(f'What? Who set this value={self.value}?')
return number + 1
instance = MyTask()
with Flow("My First Flow") as flow:
n = generate_numbers()
n1 = instance.map(number=n)
flow.run()
This outputs:
[2019-12-02 11:46:43,162] INFO - prefect.FlowRunner | Beginning Flow run for 'My First Flow'
[2019-12-02 11:46:43,162] INFO - prefect.FlowRunner | Starting flow run.
[2019-12-02 11:46:43,166] INFO - prefect.TaskRunner | Task 'generate_numbers': Starting task run...
[2019-12-02 11:46:43,166] INFO - prefect.TaskRunner | Task 'generate_numbers': finished task run for task with final state: 'Success'
[2019-12-02 11:46:43,167] INFO - prefect.TaskRunner | Task 'MyTask': Starting task run...
[2019-12-02 11:46:43,168] INFO - prefect.TaskRunner | Task 'MyTask[8]': Starting task run...
Hello I am 0x11ddaef50 number is 8
[2019-12-02 11:46:43,169] INFO - prefect.TaskRunner | Task 'MyTask[8]': finished task run for task with final state: 'Success'
[2019-12-02 11:46:43,169] INFO - prefect.TaskRunner | Task 'MyTask[1]': Starting task run...
Hello I am 0x11ddaef50 number is 1
What? Who set this value=8?
...
I assume that this is specific to the local executor.
In general, I don’t think this is a problem, but I was wondering if there is any documentation / warning somewhere in the docs that I may have missed ?Chris White
David Ojeda
12/02/2019, 2:35 PM