Manuel Mourato
05/07/2020, 4:00 PMtask_sequence = [load_data, task1, task2]
test_flow1.chain(*task_sequence)
which I saved to a file locally, and then loaded it via the test_flow2=Flow.load(path)
method.
Now, I want to add a new task3 to this flow, but I want to make load_data an upstream dependency of this new task, like this:
test_flow2.set_dependencies(
task=task3
upstream_tasks=[load_data])
But I get the error: A task with the slug "Task1" already exists in this flow
It seems to complain about load data already being defined in the flow, which it is. But what I want is to say load_data is a dependency of task3
What am I doing wrong?Laura Lorenz (she/her)
05/07/2020, 4:11 PMtest_flow2
, is that in a new python interpreter or the same one that you saved the flow in?Manuel Mourato
05/07/2020, 4:12 PM# CREATE EXECUTOR
env = RemoteEnvironment(
executor="prefect.engine.executors.DaskExecutor",
executor_kwargs={
"local_processes": True
}
# CREATE TASKS
@task(name='Task3',slug='Task3', max_retries=3, retry_delay=timedelta(minutes=1), timeout=300,
state_handlers=[tasks_notifications_handler])
def task3(in_path, out_path):
print("Fake Implementation")
time.sleep(3)
@task(name='Task2',slug='Task2', max_retries=3, retry_delay=timedelta(minutes=1), timeout=300, log_stdout=True,
state_handlers=[tasks_notifications_handler])
def task2(in_path, out_path):
print("Fake Implementation")
time.sleep(3)
@task(name='Task4',slug='Task4', max_retries=3, retry_delay=timedelta(minutes=1), timeout=300, log_stdout=True,
state_handlers=[tasks_notifications_handler])
def task4(in_path, out_path):
print("Fake Implementation")
time.sleep(3)
@task(name='Task1',slug='Task1', max_retries=3, retry_delay=timedelta(minutes=1), timeout=300, log_stdout=True,
state_handlers=[tasks_notifications_handler])
def task1(in_path, out_path):
print("Fake Implementation")
time.sleep(3)
# CREATE FLOW
test_flow1 = Flow(name="Test-Flow-1", environment=env)
# Bind initial tasks
task1.bind(in_path="test1", out_path="test2", flow=test_flow1)
task2.bind(in_path="test1", out_path="test2", flow=test_flow1)
task3.bind(in_path="test1", out_path="test2", flow=test_flow1)
task_sequence = [task1,task2,task3]
test_flow1.chain(*task_sequence)
test_flow1.save("/home/manuel.mourato/prefect-flows/flow1")
test_flow2 = Flow.load("/home/manuel.mourato/prefect-flows/flow1")
# Bind parallel task
task4.bind(in_path="test1", out_path="test2", flow=test_flow2)
test_flow2.set_dependencies(
task=task4,
upstream_tasks=[task1])
test_flow2.visualize()
f_state = test_flow2.run()
Laura Lorenz (she/her)
05/07/2020, 4:34 PMset_dependencies
is called, it tries to call `add_task`for the task and upstream tasks, and it will do an object comparison to determine if that task is already in the flow (https://github.com/PrefectHQ/prefect/blob/master/src/prefect/core/flow.py#L433). Since their memory addresses are different this will fail, but when it then continues to try to add it it will die for slug duplication.
Instead I can achieve what you want if I extract the load_data
task reference out of the second flow directly and use that as what I pass to set_dependencies
:
In [7]: ts = test_flow2.get_tasks()
In [8]: ts
Out[8]: [<Task: load_data>, <Task: task3>, <Task: task2>, <Task: task1>]
In [9]: second_load_data = ts[0]
In [10]: test_flow2.set_dependencies(task=task3, upstream_tasks=[second_load_data])
Manuel Mourato
05/07/2020, 4:43 PMLaura Lorenz (she/her)
05/07/2020, 5:02 PMtest_flow2
in a separate process, but you probably would feel forced to do that anyways since load_data
isn’t lying around in your locals anymore so it does kind of ‘go away’. Hopefully this little example clarifies that more too:
In [5]: %paste
@task(slug="Task1")
def load_data():
pass
## -- End pasted text --
In [6]: id(load_data)
Out[6]: 4634262352
In [8]: load_data.slug
Out[8]: 'Task1'
In [9]: id(test_flow2.get_tasks(slug='Task1'))
Out[9]: 4634674176
In [10]: load_data is test_flow2.get_tasks(slug='Task1')
Out[10]: False
Manuel Mourato
05/07/2020, 6:00 PM