Thomas Furmston
10/18/2021, 9:49 AMAnna Geller
num_days_parameter = Parameter('num_days', required=True)
num_back_fill_days_parameter = Parameter('num_back_fill_days', required=True)
end_date_parameter = Parameter('end_date', required=True)
Do you have some sensible defaults for those parameters?Thomas Furmston
10/18/2021, 10:59 AMnum_days_parameter
and num_back_fill_days_parameter
, but I am not so sure about end_date_parameter
Thomas Furmston
10/18/2021, 10:59 AMThomas Furmston
10/18/2021, 11:00 AMAnna Geller
import pendulum
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
clock_1 = CronClock(
'41 10 * * 1-5',
start_date=pendulum.now(tz='Europe/London'),
parameter_defaults={
'num_days': 1,
'num_back_fill_days': 1,
'end_date': '',
},
)
schedule = Schedule(clocks=[clock_1])
or from UI:Anna Geller
if end_date_parameter:
end_date = end_date_parameter
else:
end_date = pendulum.now(tz='Europe/London').to_date_string()
Thomas Furmston
10/18/2021, 11:56 AMThomas Furmston
10/18/2021, 11:58 AMThomas Furmston
10/18/2021, 11:58 AMnum_days_parameter = Parameter('num_days', default=1)
num_back_fill_days_parameter = Parameter('num_back_fill_days', default=1)
end_date_parameter = Parameter('end_date', default=None)
Thomas Furmston
10/18/2021, 11:58 AMThomas Furmston
10/18/2021, 11:59 AM@task
def calculate_flow_end_date(end_date: str):
if end_date is not None:
return end_date
return prefect.context.get('scheduled_start_time').to_date_string()
Thomas Furmston
10/18/2021, 11:59 AMThomas Furmston
10/18/2021, 12:00 PMThomas Furmston
10/18/2021, 12:00 PMcommon_flow_result = common_flow(parameters={
'num_days': num_days_parameter,
'num_back_fill_days': num_back_fill_days_parameter,
'end_date': task_end_date,
})
Thomas Furmston
10/18/2021, 12:01 PMThomas Furmston
10/18/2021, 12:04 PMAnna Geller
Anna Geller
Thomas Furmston
10/18/2021, 1:33 PMThomas Furmston
10/18/2021, 1:33 PMThomas Furmston
10/18/2021, 1:34 PMThomas Furmston
10/18/2021, 1:36 PMThomas Furmston
10/18/2021, 1:36 PMThomas Furmston
10/18/2021, 1:37 PMAnna Geller
Task
classes, including Parameters
, are instantiated inside a with flow:
block but not added to the flow either explicitly or as the input to another task. For more information, see https://docs.prefect.io/core/advanced_tutorials/task-guide.html#adding-tasks-to-flows.Anna Geller
from prefect import Flow, Parameter, task
from prefect.tasks.shell import ShellTask
@task(log_stdout=True)
def log_param_value(user_input: str):
print(user_input)
shell = ShellTask(return_all=True)
with Flow("shell-flow") as flow:
param = Parameter("user_input", default="your_value")
log_param_value(param)
shell_task = shell(command=f"echo {param}")
if __name__ == '__main__':
# flow.run()
flow.run(parameters=dict(user_input="Hello World"))
Anna Geller
ShellTask
before you call it within the Flow constructor, as in the code snippet above shows.
Overall, you were right when you said that your child flow previously didn’t even get to the ShellTask - because the Parameter was instantiated in the flow, but was not yet called. Now that we add this Parameter to the Flow through the log_param_value
task (as data dependency), it now can be used in downstream tasks (your shell tasks and StartFlowRun
).
The same happened with the ShellTask
- it was instantiated within the Flow, but was not called/explicitly added to the flow via data dependency nor upstream/downstream dependencies. Now that we instantiate it beforehand and call it within the Flow constructor, it works as expected. Does it make sense for you?Thomas Furmston
10/18/2021, 1:55 PMThomas Furmston
10/18/2021, 1:56 PMThomas Furmston
10/18/2021, 1:57 PMThomas Furmston
10/18/2021, 1:57 PMThomas Furmston
10/18/2021, 1:57 PMThomas Furmston
10/18/2021, 3:41 PMThomas Furmston
10/18/2021, 3:42 PM@task(log_stdout=True)
def log_parameter_value(parameter_name: str, parameter_value: (str, int, float)):
<http://logger.info|logger.info>('Parameter: (%s, %s)', parameter_name, parameter_value)
num_days_parameter = Parameter('num_days', required=True)
num_back_fill_days_parameter = Parameter('num_back_fill_days', required=True)
end_date_parameter = Parameter('end_date', required=True)
shell_command = ShellTask(
name='served_advert_task',
stream_output=True,
)
with Flow(
'my_flow',
storage=Docker(
base_image='my-docker-image:latest',
local_image=True,
)) as flow:
log_parameter_value('num_days', num_days_parameter)
log_parameter_value('num_back_fill_days', num_back_fill_days_parameter)
log_parameter_value('end_date', end_date_parameter)
shell_command(
command=construct_etl_command(),
)
Thomas Furmston
10/18/2021, 3:43 PMThomas Furmston
10/18/2021, 3:43 PM/tmp/prefect-qc0rm5qd: line 1: Parameter:: No such file or directory
Thomas Furmston
10/18/2021, 3:44 PMKevin Kho
Anna Geller
from prefect import Flow, Parameter, task
from prefect.tasks.shell import ShellTask
@task
def construct_etl_command():
return "ls"
@task(log_stdout=True)
def log_param_value(user_input: str):
print(user_input)
shell = ShellTask(stream_output=True)
with Flow("shell-flow") as flow:
param = Parameter("user_input", default="your_value")
log_param_value(param)
shell = shell(command=construct_etl_command())
if __name__ == '__main__':
# flow.run()
flow.run(parameters=dict(user_input="Hello World"))
Thomas Furmston
10/18/2021, 5:02 PMThomas Furmston
10/18/2021, 5:02 PMThomas Furmston
10/18/2021, 5:03 PMAnna Geller
Thomas Furmston
10/18/2021, 5:38 PMDockerRun
doesn't break anything and the flow still runs. However, when I remove the task
decorator on the construct_etl_command
function then the error returnsThomas Furmston
10/18/2021, 5:39 PMThomas Furmston
10/18/2021, 5:39 PMThomas Furmston
10/18/2021, 5:40 PMThomas Furmston
10/18/2021, 5:41 PMshell_task2.set_dependencies(upstream_tasks=[shell_task1])
Thomas Furmston
10/18/2021, 5:42 PMShellTask
class in the context of the flow
?Thomas Furmston
10/18/2021, 5:43 PMAnna Geller
shell = shell(command=construct_etl_command())
shell_2 = shell(command=construct_etl_command())
you are creating a copy of a task and you can call it by reference e.g.
shell.set_downstream(shell_2)
Thomas Furmston
10/18/2021, 5:47 PMThomas Furmston
10/18/2021, 5:47 PMKevin Kho
ShellTask()
, but the Parameter
only exists in the Flow
context manager. For example:
mytask = MyTask(x)
with Flow(...) as flow:
x = Paramater("x", default = 0)
mytask()
Is that right?
I think you need to make MyTask()
configurable during runtime such that
mytask = MyTask()
with Flow(...) as flow:
x = Paramater("x", default = 0)
mytask(x)
would work because the init
method is evaluated during build time when the Parameter is empty but the run
method is deferred.
I believe the ShellTask
can be configured during runtime so you want to push more of the parameters to the run method where the Parameter
will have a value.
You can also do
with Flow(...) as flow:
x = Paramater("x", default = 0)
MyTask(...)(x)
The first ()
is the init
and the second ()
is the runThomas Furmston
10/18/2021, 5:56 PMwith Flow(...) as flow:
x = Paramater("x", default = 0)
MyTask(...)(command=f(x))
in which the function f
just constructs the command to be run in the shell task.Thomas Furmston
10/18/2021, 5:56 PMf
is not decorated as a task.Thomas Furmston
10/18/2021, 5:57 PMThomas Furmston
10/18/2021, 5:58 PMThomas Furmston
10/18/2021, 5:58 PMThomas Furmston
10/18/2021, 5:58 PMKevin Kho
Kevin Kho
with Flow(...) as flow:
x = Paramater("x", default = 0)
MyTask(...)(command=f(x())
because that () after x
will call the run of the task. Not 100% sure it will work.Kevin Kho
from prefect import Flow, task, Parameter, Task
import prefect
class TestTask(Task):
def run(self, x):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(x)
return "test_" + x
test = TestTask()
def mycallable(x):
return "call_" + x
with Flow("aaa") as flow:
x = Parameter("x", default="x")
test(mycallable(x))
flow.run()
Kevin Kho
with Flow("aaa") as flow:
x = Parameter("x", default="x")
TestTask()(mycallable(x))
flow.run()
So I’m not quite sure what is up yet 😅Thomas Furmston
10/18/2021, 6:05 PMThomas Furmston
10/18/2021, 6:05 PMKevin Kho
Thomas Furmston
10/18/2021, 6:18 PMThomas Furmston
10/18/2021, 6:19 PMThomas Furmston
10/19/2021, 10:14 AMimport prefect
from prefect import (
Flow,
Parameter,
task,
)
from prefect.storage import Docker
from prefect.tasks.shell import ShellTask
# @task(log_stdout=True)
def construct_etl_command(num_days: str = None) -> str:
return 'echo {0}'.format(num_days)
@task(log_stdout=True)
def log_parameter_value(parameter_name: str, parameter_value: (str, int, float)):
"""
Log the given parameter to the Prefect logger.
:param: parameter_name: The name of the parameter.
:param: parameter_value: The value of the parameter.
"""
prefect_logger = prefect.context.get('logger')
<http://prefect_logger.info|prefect_logger.info>('Parameter: (%s, %s)', parameter_name, parameter_value)
num_days_parameter = Parameter('num_days', required=True)
shell_task = ShellTask(name='shell_task', stream_output=True)
with Flow(
'minimal_broken_docker_flow',
storage=Docker(
base_image='python:3.9',
)) as flow:
log_parameter_value('num_days', num_days_parameter)
shell_command = construct_etl_command(
num_days=num_days_parameter,
)
shell_task = shell_task(command=shell_command)
Thomas Furmston
10/19/2021, 10:14 AMThomas Furmston
10/19/2021, 10:15 AM# @task(log_stdout=True)
fixes the issue, as expectedThomas Furmston
10/19/2021, 10:17 AMKevin Kho
Thomas Furmston
10/19/2021, 2:59 PMlog_parameter_value
is a task? I thought putting the parameter through a task added it to the dependency graph.Kevin Kho
Kevin Kho
@task
will have deferred execution, but that function call is evaluated. It is evaluated when the flow is serialized. So by default, Prefect uses pickle-based storage, but you can opt to use script-based storage. See this for more info. When you use pickle-based storage, stuff is evaluated as the flow is registered. When you use script-based storage, the function is deferred until runtime.Thomas Furmston
10/19/2021, 3:59 PMThomas Furmston
10/19/2021, 4:00 PMThomas Furmston
10/19/2021, 4:00 PMKevin Kho
Kevin Kho