https://prefect.io logo
Title
a

alexandre kempf

01/07/2020, 1:39 PM
Hello guys ! I have a weird bug when I'm using prefect. I reduce the example to the minimal I could find. This works:
from prefect import task, Parameter, Flow

@task
def load_data(c, b):
    return c

@task
def bugtask(c):
    return c

with Flow("training") as flowModel:
    init = Parameter("c")
    data = load_data(init, b= {"f": 4})
    # data = load_data(init, b= {"f": bugtask})

state_model = flowModel.run(c=5)
Now if you just use the comment instead of
load_data
(basically, if you have a task in your arguments, even nested in other structured and not executed, there is an error. It this expected ? I have the feeling that it tries to run all the tasks, even if they are not called ! @josh @Dylan, This is a problem for subflows since I must give the configuration of the subflow as an argument of my run_flow task :s
j

Jeremiah

01/07/2020, 2:22 PM
Hi Alexandre - I don’t get an error when I run your code. What version of Prefect do you have installed?
[2020-01-07 14:21:20,207] INFO - prefect.FlowRunner | Beginning Flow run for 'training'
[2020-01-07 14:21:20,211] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-07 14:21:20,224] INFO - prefect.TaskRunner | Task 'c': Starting task run...
[2020-01-07 14:21:20,226] INFO - prefect.TaskRunner | Task 'c': finished task run for task with final state: 'Success'
[2020-01-07 14:21:20,230] INFO - prefect.TaskRunner | Task 'load_data': Starting task run...
[2020-01-07 14:21:20,232] INFO - prefect.TaskRunner | Task 'load_data': finished task run for task with final state: 'Success'
[2020-01-07 14:21:20,233] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
a

alexandre kempf

01/07/2020, 2:29 PM
Thanks for looking at it Jeremiah ! Have you uncomment the line ?
I'm running 0.8.1 🙂
j

Jeremiah

01/07/2020, 3:51 PM
Ah my apologies, I didn’t see the commented line - let me try again
👍 1
Ah, ok
Here’s what’s happening - Prefect is looking through your arguments and seeing that you are passing a task around, so it adds it to the graph - if you
flowModel.visualize()
you’ll see this:
To avoid this and get the behavior you want, wrap the
bugtask
in a
Constant
like this:
from prefect import task, Parameter, Flow
from prefect.tasks.core.constants import Constant
@task
def load_data(c, b):
    return c
@task
def bugtask(c):
    return c
with Flow("training") as flowModel:
    init = Parameter("c")
    # data = load_data(init, b= {"f": 4})
    data = load_data(init, b= {"f": Constant(bugtask)})
state_model = flowModel.run(c=5)
a

alexandre kempf

01/07/2020, 4:18 PM
can I still execute it afterwards ?
j

Jeremiah

01/07/2020, 4:18 PM
That tells Prefect not to treat it as a runnable task, but just to leave it as-is
yes, you should receive it as a
Task
object
a

alexandre kempf

01/07/2020, 4:18 PM
great 🙂 I'll try 😄
thank you 🙂
j

Jeremiah

01/07/2020, 4:19 PM
Yup, just confirmed if you put a line in
load_data
that calls (for example)
b['f'].run(c=c)
it should work
Sorry for the confusion!
a

alexandre kempf

01/07/2020, 4:22 PM
Yes I confirm it works 😄 Amazing ! thank you so much ! I really love your product, it has been my new drug for the last weeks 😄
j

Jeremiah

01/07/2020, 4:22 PM
Awesome! Glad to hear it 🙂