Thomas Furmston
10/18/2021, 9:49 AMAnna Geller
num_days_parameter = Parameter('num_days', required=True)
num_back_fill_days_parameter = Parameter('num_back_fill_days', required=True)
end_date_parameter = Parameter('end_date', required=True)
Do you have some sensible defaults for those parameters?Thomas Furmston
10/18/2021, 10:59 AMnum_days_parameter  and num_back_fill_days_parameter , but I am not so sure about end_date_parameterThomas Furmston
10/18/2021, 10:59 AMThomas Furmston
10/18/2021, 11:00 AMAnna Geller
import pendulum
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
clock_1 = CronClock(
    '41 10 * * 1-5',
    start_date=pendulum.now(tz='Europe/London'),
    parameter_defaults={
        'num_days': 1,
        'num_back_fill_days': 1,
        'end_date': '',
    },
)
schedule = Schedule(clocks=[clock_1])
or from UI:Anna Geller
if end_date_parameter:
        end_date = end_date_parameter
    else:
        end_date = pendulum.now(tz='Europe/London').to_date_string()Thomas Furmston
10/18/2021, 11:56 AMThomas Furmston
10/18/2021, 11:58 AMThomas Furmston
10/18/2021, 11:58 AMnum_days_parameter = Parameter('num_days', default=1)
num_back_fill_days_parameter = Parameter('num_back_fill_days', default=1)
end_date_parameter = Parameter('end_date', default=None)Thomas Furmston
10/18/2021, 11:58 AMThomas Furmston
10/18/2021, 11:59 AM@task
def calculate_flow_end_date(end_date: str):
    if end_date is not None:
        return end_date
    return prefect.context.get('scheduled_start_time').to_date_string()Thomas Furmston
10/18/2021, 11:59 AMThomas Furmston
10/18/2021, 12:00 PMThomas Furmston
10/18/2021, 12:00 PMcommon_flow_result = common_flow(parameters={
    'num_days': num_days_parameter,
    'num_back_fill_days': num_back_fill_days_parameter,
    'end_date': task_end_date,
})Thomas Furmston
10/18/2021, 12:01 PMThomas Furmston
10/18/2021, 12:04 PMAnna Geller
Anna Geller
Thomas Furmston
10/18/2021, 1:33 PMThomas Furmston
10/18/2021, 1:33 PMThomas Furmston
10/18/2021, 1:34 PMThomas Furmston
10/18/2021, 1:36 PMThomas Furmston
10/18/2021, 1:36 PMThomas Furmston
10/18/2021, 1:37 PMAnna Geller
Task classes, including Parameters, are instantiated inside a with flow: block but not added to the flow either explicitly or as the input to another task. For more information, see https://docs.prefect.io/core/advanced_tutorials/task-guide.html#adding-tasks-to-flows.Anna Geller
from prefect import Flow, Parameter, task
from prefect.tasks.shell import ShellTask
@task(log_stdout=True)
def log_param_value(user_input: str):
    print(user_input)
shell = ShellTask(return_all=True)
with Flow("shell-flow") as flow:
    param = Parameter("user_input", default="your_value")
    log_param_value(param)
    shell_task = shell(command=f"echo {param}")
if __name__ == '__main__':
    # flow.run()
    flow.run(parameters=dict(user_input="Hello World"))Anna Geller
ShellTask before you call it within the Flow constructor, as in the code snippet above shows.
Overall, you were right when you said that your child flow previously didn’t even get to the ShellTask - because the Parameter was instantiated in the flow, but was not yet called. Now that we add this Parameter to the Flow through the log_param_value task (as data dependency), it now can be used in downstream tasks (your shell tasks and StartFlowRun).
The same happened with the ShellTask - it was instantiated within the Flow, but was not called/explicitly added to the flow via data dependency nor upstream/downstream dependencies. Now that we instantiate it beforehand and call it within the Flow constructor, it works as expected. Does it make sense for you?Thomas Furmston
10/18/2021, 1:55 PMThomas Furmston
10/18/2021, 1:56 PMThomas Furmston
10/18/2021, 1:57 PMThomas Furmston
10/18/2021, 1:57 PMThomas Furmston
10/18/2021, 1:57 PMThomas Furmston
10/18/2021, 3:41 PMThomas Furmston
10/18/2021, 3:42 PM@task(log_stdout=True)
def log_parameter_value(parameter_name: str, parameter_value: (str, int, float)):
    <http://logger.info|logger.info>('Parameter: (%s, %s)', parameter_name, parameter_value)
num_days_parameter = Parameter('num_days', required=True)
num_back_fill_days_parameter = Parameter('num_back_fill_days', required=True)
end_date_parameter = Parameter('end_date', required=True)
shell_command = ShellTask(
    name='served_advert_task',
    stream_output=True,
)
with Flow(
    'my_flow',
    storage=Docker(
        base_image='my-docker-image:latest',
        local_image=True,
    )) as flow:
    log_parameter_value('num_days', num_days_parameter)
    log_parameter_value('num_back_fill_days', num_back_fill_days_parameter)
    log_parameter_value('end_date', end_date_parameter)
    shell_command(
        command=construct_etl_command(),
    )Thomas Furmston
10/18/2021, 3:43 PMThomas Furmston
10/18/2021, 3:43 PM/tmp/prefect-qc0rm5qd: line 1: Parameter:: No such file or directoryThomas Furmston
10/18/2021, 3:44 PMKevin Kho
Anna Geller
from prefect import Flow, Parameter, task
from prefect.tasks.shell import ShellTask
@task
def construct_etl_command():
    return "ls"
@task(log_stdout=True)
def log_param_value(user_input: str):
    print(user_input)
shell = ShellTask(stream_output=True)
with Flow("shell-flow") as flow:
    param = Parameter("user_input", default="your_value")
    log_param_value(param)
    shell = shell(command=construct_etl_command())
if __name__ == '__main__':
    # flow.run()
    flow.run(parameters=dict(user_input="Hello World"))Thomas Furmston
10/18/2021, 5:02 PMThomas Furmston
10/18/2021, 5:02 PMThomas Furmston
10/18/2021, 5:03 PMAnna Geller
Thomas Furmston
10/18/2021, 5:38 PMDockerRun  doesn't break anything and the flow still runs. However, when I remove the task  decorator on the construct_etl_command  function then the error returnsThomas Furmston
10/18/2021, 5:39 PMThomas Furmston
10/18/2021, 5:39 PMThomas Furmston
10/18/2021, 5:40 PMThomas Furmston
10/18/2021, 5:41 PMshell_task2.set_dependencies(upstream_tasks=[shell_task1])Thomas Furmston
10/18/2021, 5:42 PMShellTask  class in the context of the flow?Thomas Furmston
10/18/2021, 5:43 PMAnna Geller
shell = shell(command=construct_etl_command())
shell_2 = shell(command=construct_etl_command())
you are creating a copy of a task and you can call it by reference e.g.
shell.set_downstream(shell_2)Thomas Furmston
10/18/2021, 5:47 PMThomas Furmston
10/18/2021, 5:47 PMKevin Kho
ShellTask(), but the Parameter only exists in the Flow context manager. For example:
mytask = MyTask(x)
with Flow(...) as flow:
     x = Paramater("x", default = 0)
     mytask()
Is that right?
I think you need to make MyTask() configurable during runtime such that
mytask = MyTask()
with Flow(...) as flow:
     x = Paramater("x", default = 0)
     mytask(x)
would work because the init method is evaluated during build time when the Parameter is empty but the run  method is deferred.
I believe the ShellTask can be configured during runtime so you want to push more of the parameters to the run method where the Parameter will have a value.
You can also do
with Flow(...) as flow:
     x = Paramater("x", default = 0)
     MyTask(...)(x)
The first () is the init and the second () is the runThomas Furmston
10/18/2021, 5:56 PMwith Flow(...) as flow:
     x = Paramater("x", default = 0)
     MyTask(...)(command=f(x))
in which the function f just constructs the command to be run in the shell task.Thomas Furmston
10/18/2021, 5:56 PMf  is not decorated as a task.Thomas Furmston
10/18/2021, 5:57 PMThomas Furmston
10/18/2021, 5:58 PMThomas Furmston
10/18/2021, 5:58 PMThomas Furmston
10/18/2021, 5:58 PMKevin Kho
Kevin Kho
with Flow(...) as flow:
     x = Paramater("x", default = 0)
     MyTask(...)(command=f(x())
because that () after x  will call the run of the task. Not 100% sure it will work.Kevin Kho
from prefect import Flow, task, Parameter, Task
import prefect
class TestTask(Task):
    def run(self, x):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(x)
        return "test_" + x
test = TestTask()
def mycallable(x):
    return "call_" + x
with Flow("aaa") as flow:
    x = Parameter("x", default="x")
    test(mycallable(x))
flow.run()Kevin Kho
with Flow("aaa") as flow:
    x = Parameter("x", default="x")
    TestTask()(mycallable(x))
flow.run()
So I’m not quite sure what is up yet 😅Thomas Furmston
10/18/2021, 6:05 PMThomas Furmston
10/18/2021, 6:05 PMKevin Kho
Thomas Furmston
10/18/2021, 6:18 PMThomas Furmston
10/18/2021, 6:19 PMThomas Furmston
10/19/2021, 10:14 AMimport prefect
from prefect import (
    Flow,
    Parameter,
    task,
)
from prefect.storage import Docker
from prefect.tasks.shell import ShellTask
# @task(log_stdout=True)
def construct_etl_command(num_days: str = None) -> str:
    return 'echo {0}'.format(num_days)
@task(log_stdout=True)
def log_parameter_value(parameter_name: str, parameter_value: (str, int, float)):
    """
    Log the given parameter to the Prefect logger.
    :param: parameter_name: The name of the parameter.
    :param: parameter_value: The value of the parameter.
    """
    prefect_logger = prefect.context.get('logger')
    <http://prefect_logger.info|prefect_logger.info>('Parameter: (%s, %s)', parameter_name, parameter_value)
num_days_parameter = Parameter('num_days', required=True)
shell_task = ShellTask(name='shell_task', stream_output=True)
with Flow(
        'minimal_broken_docker_flow',
        storage=Docker(
            base_image='python:3.9',
        )) as flow:
    log_parameter_value('num_days', num_days_parameter)
    shell_command = construct_etl_command(
        num_days=num_days_parameter,
    )
    shell_task = shell_task(command=shell_command)Thomas Furmston
10/19/2021, 10:14 AMThomas Furmston
10/19/2021, 10:15 AM# @task(log_stdout=True)  fixes the issue, as expectedThomas Furmston
10/19/2021, 10:17 AMKevin Kho
Thomas Furmston
10/19/2021, 2:59 PMlog_parameter_value  is a task? I thought putting the parameter through a task added it to the dependency graph.Kevin Kho
Kevin Kho
@task will have deferred execution, but that function call is evaluated. It is evaluated when the flow is serialized. So by default, Prefect uses pickle-based storage, but you can opt to use script-based storage. See this for more info. When you use pickle-based storage, stuff is evaluated as the flow is registered. When you use script-based storage, the function is deferred until runtime.Thomas Furmston
10/19/2021, 3:59 PMThomas Furmston
10/19/2021, 4:00 PMThomas Furmston
10/19/2021, 4:00 PMKevin Kho
Kevin Kho