Dan Zhao
11/09/2021, 11:39 AMAnna Geller
.run()
method is not possible. But can you share your example? Maybe we can find a way.Dan Zhao
11/09/2021, 12:10 PMfrom prefect import Parameter, Task, Flow, Context
class MyParameter(Parameter):
def run(self):
param = super().run()
return param + ' - Customised'
class MyTask(Task):
def run(self):
<http://context.logger.info|context.logger.info>(context.task_run_name)
my_param = MyParameter(name='my_param')
my_task = MyTask(name='my_task', task_run_name='Run {my_param}')
with Flow('my_flow') as flow:
flow.add_task(my_param)
my_task()
flow.run(parameters={'my_param': 'test'})
Anna Geller
Anna Geller
prefect.context.get("logger")
• to access the variable my_param in MyTask, I added it to the .run() method
• if you move the parameter tasks to the flow constructor (`with Flow() as flow`:), it’s more convenient and they are then added to your flow automatically,
• Parameters should have defaults, so I added one - otherwise if you would schedule this job, it would fail because it would have no default value to use
import prefect
from prefect import Parameter, Task, Flow
class MyParameter(Parameter):
def run(self):
param = super().run()
return param + " - Customised"
class MyTask(Task):
def run(self, my_param):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(prefect.context.get("task_run_name"))
my_task = MyTask(name="my_task", task_run_name="Run {my_param}")
with Flow("my_flow") as flow:
my_param = MyParameter(name="my_param", default="test")
my_task(my_param)
flow.run(parameters={"my_param": "test"})
Dan Zhao
11/09/2021, 12:38 PMDan Zhao
11/09/2021, 12:39 PMAnna Geller
[2021-11-09 13:36:33+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'my_flow'
[2021-11-09 13:36:33+0100] INFO - prefect.TaskRunner | Task 'my_param': Starting task run...
[2021-11-09 13:36:33+0100] INFO - prefect.TaskRunner | Task 'my_param': Finished task run for task with final state: 'Success'
[2021-11-09 13:36:33+0100] INFO - prefect.TaskRunner | Task 'my_task': Starting task run...
[2021-11-09 13:36:33+0100] INFO - prefect.my_task | Run test - Customised
[2021-11-09 13:36:33+0100] INFO - prefect.TaskRunner | Task 'my_task': Finished task run for task with final state: 'Success'
[2021-11-09 13:36:33+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Dan Zhao
11/09/2021, 12:51 PMAnna Geller
import prefect
from prefect.engine.results import LocalResult
from prefect.engine.serializers import JSONSerializer
from prefect import Task, Flow, Parameter
FLOW_NAME = "templatable_target"
class MyParameter(Parameter):
def run(self):
param = super().run()
return param + " - Customised"
class MyTask(Task):
def run(self, param):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(prefect.context.get("task_run_name"))
return param
my_task = MyTask(
name="my_task",
target="Run {param}.txt",
checkpoint=True,
result=LocalResult(
dir="/Users/anna/community", serializer=JSONSerializer()
),
task_run_name="Run {param}",
)
with Flow(FLOW_NAME) as flow:
my_param = MyParameter(name="param", default="test")
my_task(my_param)
But you need to register the flow and run it using the backend, e.g. from the UI or CLI. Using flow.run() in a local process will not generate a result file.Dan Zhao
11/09/2021, 2:07 PMclass MyTask(Task):
def run(self, my_param2):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(prefect.context.get("task_run_name"))
it stops working.Dan Zhao
11/09/2021, 2:10 PMAnna Geller
Dan Zhao
11/09/2021, 2:20 PMAnna Geller
Dan Zhao
11/09/2021, 2:33 PMAnna Geller
Parameter("name", default="some value")
is a Prefect task, not a string, so to pass it downstream, the run method of this Prefect task must be executed and it then returns the parameter value, which could be a string. Does it make sense?Dan Zhao
11/09/2021, 2:44 PMAnna Geller
Dan Zhao
11/09/2021, 2:54 PMDan Zhao
11/09/2021, 2:54 PMAnna Geller
flow.add_task(result_basepath)
), it doesn’t have to be included in your task and you can point the results to the target based on your custom Parameter value provided at runtime (or using default value if no value was provided at runtime).
import os
from prefect import task, Flow, Parameter
from prefect.engine.results import LocalResult
from prefect.engine.serializers import JSONSerializer
result_basepath = Parameter("result_basepath", default="~")
def format_location(result_basepath, task_name, **kwargs):
# Notice we expand the user at runtime so the user of the parameter
# does not need to worry about the path to the home directory on the
# server that the flow runs on
return f"{os.path.expanduser(result_basepath)}/prefect_results/{task_name}.prefect"
@task(result=LocalResult(location=format_location, serializer=JSONSerializer()))
def my_task():
return 99
with Flow("local-result-parametrized") as flow:
my_task()
if __name__ == "__main__":
# Ensure the parameter is registered to the flow even though no task uses it
flow.add_task(result_basepath)
flow.register("p")
You can see in the main that we first add this parameter to the flow, then we register the flow. Then when I run it with the default parameter:
```
the result is stored in ~/prefect_results/my_task.prefect.
When I run it with a custom parameter value:
```prefect run --id f3c076a7-6703-47bc-9d59-e78dad8baebd --param result_basepath="/Users/anna/repos" --watch
My result is stored in /Users/anna/repos/prefect_results/my_task.prefect
LMK if something is unclear or if you have any other questions about itDan Zhao
11/09/2021, 4:03 PMimport os
from prefect import task, Flow, Parameter
from prefect.engine.results import LocalResult
from prefect.engine.serializers import JSONSerializer
class MyParameter(Parameter):
def run(self):
param = super().run()
return param + '/customized'
result_basepath = MyParameter("result_basepath", default="~")
def format_location(result_basepath, task_name, **kwargs):
# Notice we expand the user at runtime so the user of the parameter
# does not need to worry about the path to the home directory on the
# server that the flow runs on
return f"{os.path.expanduser(result_basepath)}/prefect_results/{task_name}.prefect"
@task(result=LocalResult(location=format_location, serializer=JSONSerializer()))
def my_task():
return 99
with Flow("local-result-parametrized") as flow:
my_task()
if __name__ == "__main__":
# Ensure the parameter is registered to the flow even though no task uses it
flow.add_task(result_basepath)
flow.register("p")
Dan Zhao
11/09/2021, 4:06 PMDan Zhao
11/09/2021, 4:07 PMAnna Geller
def format_location(result_basepath, task_name, **kwargs):
# Notice we expand the user at runtime so the user of the parameter
# does not need to worry about the path to the home directory on the
# server that the flow runs on
return f"{os.path.expanduser(result_basepath)}/customized/prefect_results/{task_name}.prefect"
Dan Zhao
11/09/2021, 4:16 PMDan Zhao
11/09/2021, 4:19 PMAnna Geller
Anna Geller
import os
from prefect import task, Flow, Parameter
from prefect.engine.results import LocalResult
from prefect.engine.serializers import JSONSerializer
class MyParameter(Parameter):
def run(self):
param = super().run()
return param + '/customized'
result_basepath = MyParameter("result_basepath", default="~")
# result_basepath = Parameter("result_basepath", default="~")
def format_location(task_name, **kwargs):
# Notice we expand the user at runtime so the user of the parameter
# does not need to worry about the path to the home directory on the
# server that the flow runs on
path = result_basepath.run()
return f"{os.path.expanduser(path)}/prefect_results/{task_name}.prefect"
@task(result=LocalResult(location=format_location, serializer=JSONSerializer()))
def my_task():
return 99
with Flow("local-result-parametrized") as flow:
my_task()
if __name__ == "__main__":
# Ensure the parameter is registered to the flow even though no task uses it
flow.add_task(result_basepath)
flow.register("p")
I would still recommend the approach below, but you can choose the option that you prefer:
def format_location(result_basepath, task_name, **kwargs):
# Notice we expand the user at runtime so the user of the parameter
# does not need to worry about the path to the home directory on the
# server that the flow runs on
return f"{os.path.expanduser(result_basepath)}/customized/prefect_results/{task_name}.prefect"
Dan Zhao
11/09/2021, 4:45 PMDan Zhao
11/09/2021, 4:46 PMAnna Geller
@task(result=LocalResult(location=format_location, serializer=JSONSerializer()))
def my_task():
return 99
@task
def task_a():
return "a"
with Flow("local-result-parametrized") as flow:
a = task_a()
my_task(upstream_tasks=[a])
Dan Zhao
11/09/2021, 4:51 PMDan Zhao
11/09/2021, 5:00 PMDan Zhao
11/09/2021, 5:10 PM