Hello again! Quick question: I have a flow1 with 3...
# prefect-community
m
Hello again! Quick question: I have a flow1 with 3 sequential tasks,
Copy code
task_sequence = [load_data, task1, task2]
test_flow1.chain(*task_sequence)
which I saved to a file locally, and then loaded it via the
test_flow2=Flow.load(path)
method. Now, I want to add a new task3 to this flow, but I want to make load_data an upstream dependency of this new task, like this:
Copy code
test_flow2.set_dependencies(
    task=task3
    upstream_tasks=[load_data])
But I get the error:
A task with the slug "Task1" already exists in this flow
It seems to complain about load data already being defined in the flow, which it is. But what I want is to say load_data is a dependency of task3 What am I doing wrong?
l
Hi! I’m going to see if I can reproduce on my side so I can investigate more. Are you setting the slugs for these tasks directly? When you load the flow in as
test_flow2
, is that in a new python interpreter or the same one that you saved the flow in?
m
It is the same python interpreter, and I am setting the slugs directly, although even if I dont the issue still happens Thank you!
Copy code
# CREATE EXECUTOR
env = RemoteEnvironment(
    executor="prefect.engine.executors.DaskExecutor",
    executor_kwargs={
      "local_processes": True
    }
# CREATE TASKS

@task(name='Task3',slug='Task3', max_retries=3, retry_delay=timedelta(minutes=1), timeout=300,
      state_handlers=[tasks_notifications_handler])
def task3(in_path, out_path):
    print("Fake Implementation")
    time.sleep(3)


@task(name='Task2',slug='Task2', max_retries=3, retry_delay=timedelta(minutes=1), timeout=300, log_stdout=True,
      state_handlers=[tasks_notifications_handler])
def task2(in_path, out_path):
    print("Fake Implementation")
    time.sleep(3)


@task(name='Task4',slug='Task4', max_retries=3, retry_delay=timedelta(minutes=1), timeout=300, log_stdout=True,
      state_handlers=[tasks_notifications_handler])
def task4(in_path, out_path):
    print("Fake Implementation")
    time.sleep(3)

@task(name='Task1',slug='Task1', max_retries=3, retry_delay=timedelta(minutes=1), timeout=300, log_stdout=True,
      state_handlers=[tasks_notifications_handler])
def task1(in_path, out_path):
    print("Fake Implementation")
    time.sleep(3)

# CREATE FLOW
test_flow1 = Flow(name="Test-Flow-1", environment=env)

# Bind initial tasks
task1.bind(in_path="test1", out_path="test2", flow=test_flow1)
task2.bind(in_path="test1", out_path="test2", flow=test_flow1)
task3.bind(in_path="test1", out_path="test2", flow=test_flow1)

task_sequence = [task1,task2,task3]
test_flow1.chain(*task_sequence)
test_flow1.save("/home/manuel.mourato/prefect-flows/flow1")

test_flow2 = Flow.load("/home/manuel.mourato/prefect-flows/flow1")
# Bind parallel task
task4.bind(in_path="test1", out_path="test2", flow=test_flow2)


test_flow2.set_dependencies(
    task=task4,
    upstream_tasks=[task1])
test_flow2.visualize()
f_state = test_flow2.run()
in case it helps
l
Thanks! I could reproduce your issue. So it looks like the situation is when you load in the flow from disk the task references aren’t the same place in memory as when they were first defined on flow1. When
set_dependencies
is called, it tries to call `add_task`for the task and upstream tasks, and it will do an object comparison to determine if that task is already in the flow (https://github.com/PrefectHQ/prefect/blob/master/src/prefect/core/flow.py#L433). Since their memory addresses are different this will fail, but when it then continues to try to add it it will die for slug duplication. Instead I can achieve what you want if I extract the
load_data
task reference out of the second flow directly and use that as what I pass to
set_dependencies
:
Copy code
In [7]: ts = test_flow2.get_tasks()                                

In [8]: ts                       
Out[8]: [<Task: load_data>, <Task: task3>, <Task: task2>, <Task: task1>]

In [9]: second_load_data = ts[0] 

In [10]: test_flow2.set_dependencies(task=task3, upstream_tasks=[second_load_data])
m
Got it. I assume that this issue goes away if I call the load method in a different python process. Thank you for the help 🙂
l
No problem! Just to follow up you’ll still need to grab the upstream task ref from the deserialized flow itself even when manipulating
test_flow2
in a separate process, but you probably would feel forced to do that anyways since
load_data
isn’t lying around in your locals anymore so it does kind of ‘go away’. Hopefully this little example clarifies that more too:
Copy code
In [5]: %paste                   
@task(slug="Task1")
def load_data():
    pass

## -- End pasted text --

In [6]: id(load_data)            
Out[6]: 4634262352

In [8]: load_data.slug           
Out[8]: 'Task1'

In [9]: id(test_flow2.get_tasks(slug='Task1'))                     
Out[9]: 4634674176

In [10]: load_data is test_flow2.get_tasks(slug='Task1')           
Out[10]: False
upvote 1
m
Thank you very much @Laura Lorenz (she/her), it's working now 🙂
🎉 1