Fina Silva-Santisteban
08/25/2021, 11:26 PMa_task = ATaskClass()
with Flow("Foo") as flow:
name = Parameter('name')
a_task = a_task(name=name)
Imperative api flow (so far):
a_task = ATaskClass()
name = Parameter('name')
flow = Flow("Foo")
flow.set_dependencies(
task=a_task()
)
a_task.bind(name=name)
The error complains about the line where I’m trying to bind the flow param to the task:
raise ValueError(
ValueError: Could not infer an active Flow context while creating edge to <Task: ATaskClass>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `ATaskClass(...).run(...)`
What’s the right way to go about this? 🤔
Some background info: our flows are getting quite large so we’re looking for ways to make them easier work with and more reusable. We considered using the flow-of-flows strategy but we don’t need to run any single part as an independent flow so splitting things up into X flows just means slowing down our development cycle since we need to register/re-register all those additional flows, instead of registering/re-registering the one big one. The imperative api seemed like a better choice since the fact that it’s so explicit makes it actually easier to read, specially when the flow is large, and we keep the registration to just that one flow.
Many thanks!Chris White
a_task()
) is actually a part of the functional API and attempts to bind the task to a flow.
I recommend updating your code to this:
flow.add_edge(name, a_task, key="name")
and note that I didn't call a_task
Fina Silva-Santisteban
08/25/2021, 11:51 PMa_task = ATaskClass()
b_task = BTaskClass()
c_task = CTaskClass()
with Flow("Foo") as flow:
name = Parameter('name')
email = Parameter('email')
a_task = a_task(name=name, email=email)
b_task = b_task(a_task_result=a_task)
c_task = c_task(a_task_result=a_task, b_task_result=b_task)
How would I model that with the imperative api?
I see that it need to start like this:
a_task = ATaskClass()
b_task = BTaskClass()
c_task = CTaskClass()
name = Parameter('name')
flow = Flow("Foo")
flow.add_edge(name, a_task, key="name")
But how do I work in all the other dependencies? I’m can’t seem to find examples that would help me better understand in the docs. (Pls do post a link if there’s a helpful article!)Chris White
add_edge
-- this adds a single dependency between two tasks
• set_dependencies
-- this allows for multiple dependencies to be set in one call
So for example, you can replace a_task = a_task(name=name, email=email)
with
flow.add_edge(name, a_task, key="name")
flow.add_edge(email, a_task, key="email")
or with
flow.set_dependencies(a_task, keyword_tasks={"name": name, "email": email})
Fina Silva-Santisteban
08/26/2021, 2:04 PMadd_edge
and set_dependencies
wasn’t quite clear to me, and I thought bind
was part of the imperative api when it’s part of the functional, so overall a lot of confusion. I think using set_dependencies
is the easiest way to go! Here the imperative api version of my example from my previous post, in case anybody else stumbles upon this thread:
a_task = ATaskClass()
b_task = BTaskClass()
c_task = CTaskClass()
name = Parameter('name')
email = Parameter('email')
flow = Flow("Foo")
flow.set_dependencies(task=a_task, keyword_tasks=dict(name=name, email=email))
flow.set_dependencies(task=b_task, keyword_tasks=dict(a_task_result=a_task))
flow.set_dependencies(task=c_task, keyword_tasks=dict(a_task_result=a_task, b_task_result=b_task))
Chris White
Marvin
08/26/2021, 3:11 PM