Hi, I am trying to set up a flow of flows with par...
# ask-community
t
Hi, I am trying to set up a flow of flows with parameters being passed from the parent flow to the child flow, but am having some issues. With hard-coded values for the parameters it works, so it seems to be related to how I am trying to set up the parameter passing.
a
Hi @Thomas Furmston, I will try to reproduce the issue. Overall, I think it’s best to set default values for parameters and override those dynamically when needed. In your flow, you specify required parameters without setting default values for them. This way, if you run this flow independently e.g. on schedule, it will fail. It can only run successfully when triggered from UI, API or from another flow when passing parameters.
Copy code
num_days_parameter = Parameter('num_days', required=True)
num_back_fill_days_parameter = Parameter('num_back_fill_days', required=True)
end_date_parameter = Parameter('end_date', required=True)
Do you have some sensible defaults for those parameters?
t
I could come up with some sensible defaults for
num_days_parameter
and
num_back_fill_days_parameter
, but I am not so sure about
end_date_parameter
I am running it through a schedule and this is the point at which is fails
You think if I provide some defaults it will work from a schedule?
a
yes, exactly @Thomas Furmston. This flow cannot run on schedule, because it doesn’t know what to use as parameters, since it’s a required parameter with no default value. You can attach default parameters to your schedule, though. You can do this either via code:
Copy code
import pendulum
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock


clock_1 = CronClock(
    '41 10 * * 1-5',
    start_date=pendulum.now(tz='Europe/London'),
    parameter_defaults={
        'num_days': 1,
        'num_back_fill_days': 1,
        'end_date': '',
    },
)

schedule = Schedule(clocks=[clock_1])
or from UI:
Then, in your task, you can implement custom business logic based on this parameter value, e.g.
Copy code
if end_date_parameter:
        end_date = end_date_parameter
    else:
        end_date = pendulum.now(tz='Europe/London').to_date_string()
t
So this would be for the child task?
I have defaults for the parent tasks, which is the one running on a schedule
Copy code
num_days_parameter = Parameter('num_days', default=1)
num_back_fill_days_parameter = Parameter('num_back_fill_days', default=1)
end_date_parameter = Parameter('end_date', default=None)
I then set the date from the schedule, if appropriate in the following:
Copy code
@task
def calculate_flow_end_date(end_date: str):
    if end_date is not None:
        return end_date
    return prefect.context.get('scheduled_start_time').to_date_string()
The child task I am not planning to run on a schedule, but pass the parameters in from the parent task.
However, when I pass in the arguments in it still fails.
Copy code
common_flow_result = common_flow(parameters={
    'num_days': num_days_parameter,
    'num_back_fill_days': num_back_fill_days_parameter,
    'end_date': task_end_date,
})
Sorry, I think my initial response was a bit misleading
What is confusing me is that the child task doesn't seem to be picking up the parameters from the parent task
a
Got it, will look into this and get back to you
@Thomas Furmston I was trying to reproduce the error and somehow it worked for me. Here is a child flow: https://gist.github.com/fd8d850f643b59b87c063581926d26db And a parent flow: https://gist.github.com/eda4b42517793db583a1c0c8b625d6e5 Can you confirm that you registered your child flow before using it in the parent flow? If this is not the case, then it could be something with the ShellTask in the child flow.
t
Hi
Yes, I have my makefile set up to ensure the child is registered first.
I don't think it even gets to the shell task in the child flow.
That was my impression anyway. It is quite hard to read from the logs
Is there a way to work out which line of a flow throws the error?
I'll try removing the shelltask and see that removes the error
a
This is the reason why: Tasks were created but not added to the flow: {<Parameter: script>}. This can occur when
Task
classes, including
Parameters
, are instantiated inside a
with flow:
block but not added to the flow either explicitly or as the input to another task. For more information, see https://docs.prefect.io/core/advanced_tutorials/task-guide.html#adding-tasks-to-flows.
Since you want to log parameter values anyway, I’d recommend doing it as part of a separate task. This way, you are passing the parameter value to a task as data dependency, thus implicitly adding the Parameter task to the flow. From then on, you can pass this parameter value to other tasks such as the shell task:
Copy code
from prefect import Flow, Parameter, task
from prefect.tasks.shell import ShellTask


@task(log_stdout=True)
def log_param_value(user_input: str):
    print(user_input)


shell = ShellTask(return_all=True)

with Flow("shell-flow") as flow:
    param = Parameter("user_input", default="your_value")
    log_param_value(param)
    shell_task = shell(command=f"echo {param}")

if __name__ == '__main__':
    # flow.run()
    flow.run(parameters=dict(user_input="Hello World"))
@Thomas Furmston additionally, you need to initialize the
ShellTask
before you call it within the Flow constructor, as in the code snippet above shows. Overall, you were right when you said that your child flow previously didn’t even get to the ShellTask - because the Parameter was instantiated in the flow, but was not yet called. Now that we add this Parameter to the Flow through the
log_param_value
task (as data dependency), it now can be used in downstream tasks (your shell tasks and
StartFlowRun
). The same happened with the
ShellTask
- it was instantiated within the Flow, but was not called/explicitly added to the flow via data dependency nor upstream/downstream dependencies. Now that we instantiate it beforehand and call it within the Flow constructor, it works as expected. Does it make sense for you?
t
I see. Let me try that out then.
I was actually setting the dependencies of the shell tasks explicitly in my actual example, but I forgot to copy it into my example that I posted in slack
I was not do the same with the parameters though.
Let me make the changes and see if it works
Thanks for helping!
👍 1
um......I still seem to be getting the same error 😐
Copy code
@task(log_stdout=True)
def log_parameter_value(parameter_name: str, parameter_value: (str, int, float)):
    <http://logger.info|logger.info>('Parameter: (%s, %s)', parameter_name, parameter_value)


num_days_parameter = Parameter('num_days', required=True)
num_back_fill_days_parameter = Parameter('num_back_fill_days', required=True)
end_date_parameter = Parameter('end_date', required=True)

shell_command = ShellTask(
    name='served_advert_task',
    stream_output=True,
)

with Flow(
    'my_flow',
    storage=Docker(
        base_image='my-docker-image:latest',
        local_image=True,
    )) as flow:

    log_parameter_value('num_days', num_days_parameter)
    log_parameter_value('num_back_fill_days', num_back_fill_days_parameter)
    log_parameter_value('end_date', end_date_parameter)

    shell_command(
        command=construct_etl_command(),
    )
This is the new code, but I still get the issue
/tmp/prefect-qc0rm5qd: line 1: Parameter:: No such file or directory
I'm really confused what is going on now
k
Hey @Thomas Furmston, what RunConfiguration are you using?
a
@Thomas Furmston you should probably add DockerRun run configuration. When I run the same logic of your flow on a local agent with local storage, it works. So @Kevin Kho is right that there must be some issue in your DockerRun and Docker storage configuration
Copy code
from prefect import Flow, Parameter, task
from prefect.tasks.shell import ShellTask

@task
def construct_etl_command():
    return "ls"


@task(log_stdout=True)
def log_param_value(user_input: str):
    print(user_input)


shell = ShellTask(stream_output=True)


with Flow("shell-flow") as flow:
    param = Parameter("user_input", default="your_value")
    log_param_value(param)
    shell = shell(command=construct_etl_command())

if __name__ == '__main__':
    # flow.run()
    flow.run(parameters=dict(user_input="Hello World"))
t
I had a dockerrun config
removing it and copying your example works for me
i am going to go backwards and add my prevous stuff back in and see what breaks it
👍 1
a
nice work! yeah definitely take it one step at a time and let us know what is the issue once you analyzed it
t
So adding in the
DockerRun
doesn't break anything and the flow still runs. However, when I remove the
task
decorator on the
construct_etl_command
function then the error returns
So I would guess that putting a task (in this case a Parameter) through an undecorated function makes the link to the parameter to be lost?
complete guess 🙂
Thanks a lot for helping me debug this issue
I do have one follow up question. Previously I was setting the dependencies between two different shell tasks like so,
shell_task2.set_dependencies(upstream_tasks=[shell_task1])
Is it still possible to set dependencies like this when I have called the instance of the
ShellTask
class in the context of the
flow
?
i.e., after the above suggested changes
a
you absolutely can. When you call this line
Copy code
shell = shell(command=construct_etl_command())
shell_2 = shell(command=construct_etl_command())
you are creating a copy of a task and you can call it by reference e.g.
Copy code
shell.set_downstream(shell_2)
t
amazing, thanks!
👍 1
Let me try that out
k
I think I know what you are saying. You have a Parameter that you want to use in
ShellTask()
, but the
Parameter
only exists in the
Flow
context manager. For example:
Copy code
mytask = MyTask(x)
with Flow(...) as flow:
     x = Paramater("x", default = 0)
     mytask()
Is that right? I think you need to make
MyTask()
configurable during runtime such that
Copy code
mytask = MyTask()
with Flow(...) as flow:
     x = Paramater("x", default = 0)
     mytask(x)
would work because the
init
method is evaluated during build time when the Parameter is empty but the
run
method is deferred. I believe the
ShellTask
can be configured during runtime so you want to push more of the parameters to the run method where the
Parameter
will have a value. You can also do
Copy code
with Flow(...) as flow:
     x = Paramater("x", default = 0)
     MyTask(...)(x)
The first
()
is the
init
and the second
()
is the run
t
I mean that I have something like
Copy code
with Flow(...) as flow:
     x = Paramater("x", default = 0)
     MyTask(...)(command=f(x))
in which the function
f
just constructs the command to be run in the shell task.
The error seems to come when
f
is not decorated as a task.
I am guessing that the fact that it is not decorated means that the dependency on the Parameter on the ShellTask is lost.
I currently don't know enough about Prefect to make that statement more precise. 🙂
but hopefully you get my general jist
Does it make sense?
k
Ah ok, yes I think it needs to be a task because there is some “magic” where Task results are passed on to each other. The Parameter is just a special task. If you pass a Task to a Python function, you are passing the class. If you pass a Task to another task, it gets the result.
This might be solvable with:
Copy code
with Flow(...) as flow:
     x = Paramater("x", default = 0)
     MyTask(...)(command=f(x())
because that () after
x
will call the run of the task. Not 100% sure it will work.
This is working for me though:
Copy code
from prefect import Flow, task, Parameter, Task
import prefect

class TestTask(Task):

    def run(self, x):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(x)
        return "test_" + x

test = TestTask()

def mycallable(x):
    return "call_" + x

with Flow("aaa") as flow:
    x = Parameter("x", default="x")
    test(mycallable(x))

flow.run()
upvote 1
Also this similarly works,
Copy code
with Flow("aaa") as flow:
    x = Parameter("x", default="x")
    TestTask()(mycallable(x))

flow.run()
So I’m not quite sure what is up yet 😅
t
yeah, it kind of frazzled my brain, to be honest. 😅
I'd love to understand the issue better though
k
Could you give me a minimal example of the broken code?
t
sure, no problem
it will probably be tomorrow now. 🙂
👍 1
Copy code
import prefect
from prefect import (
    Flow,
    Parameter,
    task,
)
from prefect.storage import Docker
from prefect.tasks.shell import ShellTask


# @task(log_stdout=True)
def construct_etl_command(num_days: str = None) -> str:
    return 'echo {0}'.format(num_days)


@task(log_stdout=True)
def log_parameter_value(parameter_name: str, parameter_value: (str, int, float)):
    """
    Log the given parameter to the Prefect logger.
    :param: parameter_name: The name of the parameter.
    :param: parameter_value: The value of the parameter.
    """
    prefect_logger = prefect.context.get('logger')
    <http://prefect_logger.info|prefect_logger.info>('Parameter: (%s, %s)', parameter_name, parameter_value)


num_days_parameter = Parameter('num_days', required=True)

shell_task = ShellTask(name='shell_task', stream_output=True)

with Flow(
        'minimal_broken_docker_flow',
        storage=Docker(
            base_image='python:3.9',
        )) as flow:

    log_parameter_value('num_days', num_days_parameter)

    shell_command = construct_etl_command(
        num_days=num_days_parameter,
    )
    shell_task = shell_task(command=shell_command)
So here is an example that is broken, though annoyingly giving a slightly different error from yesterday.....
Uncommenting
# @task(log_stdout=True)
fixes the issue, as expected
I will try to replicate the exact error message. I am not sure why it is currently different from the one I got yesterday.
k
Oh I think the Parameter needs to be inside the Flow since it’s a task and the Flow object is the one that connects it to other tasks
t
Even is
log_parameter_value
is a task? I thought putting the parameter through a task added it to the dependency graph.
k
Ah I see what you are saying. You might be right there. Let me dig a bit.
Ah ok I understand it now I think.
@task
will have deferred execution, but that function call is evaluated. It is evaluated when the flow is serialized. So by default, Prefect uses pickle-based storage, but you can opt to use script-based storage. See this for more info. When you use pickle-based storage, stuff is evaluated as the flow is registered. When you use script-based storage, the function is deferred until runtime.
t
I see. That makes sense.
The difference in evaluation between evaluations during serialisation and runtime is a little confusing at first.
Thanks for the help
k
We agree and are moving away from serialization for Prefect 2.0
👍 1
And of course!