Hi, I have a few custom parameters that take in a ...
# ask-community
d
Hi, I have a few custom parameters that take in a user supplied string and apply some standardisation (in the run method). However, result / target templating is still seeing the original string. Is there a way to use the transformed parameter instead?
a
@Dan Zhao it’s hard to tell without seeing the flow, but overall the target templating is set at the task decorator so I would rather think that modifying this from the
.run()
method is not possible. But can you share your example? Maybe we can find a way.
d
A toy example:
Copy code
from prefect import Parameter, Task, Flow, Context
class MyParameter(Parameter):
    def run(self):
        param = super().run()
        return param + ' - Customised'

class MyTask(Task):
    def run(self):
        <http://context.logger.info|context.logger.info>(context.task_run_name)

my_param = MyParameter(name='my_param')
my_task = MyTask(name='my_task', task_run_name='Run {my_param}')

with Flow('my_flow') as flow:
    flow.add_task(my_param)
    my_task()

flow.run(parameters={'my_param': 'test'})
a
Thank you for the example - it clarifies a lot! So you want to change task run names rather than targets. This documentation page can be helpful: https://docs.prefect.io/core/idioms/task-run-names.html
@Dan Zhao some edits I added to your example: • context should be imported and accessed from prefect, e.g.
prefect.context.get("logger")
• to access the variable my_param in MyTask, I added it to the .run() method • if you move the parameter tasks to the flow constructor (`with Flow() as flow`:), it’s more convenient and they are then added to your flow automatically, • Parameters should have defaults, so I added one - otherwise if you would schedule this job, it would fail because it would have no default value to use
Copy code
import prefect
from prefect import Parameter, Task, Flow


class MyParameter(Parameter):
    def run(self):
        param = super().run()
        return param + " - Customised"


class MyTask(Task):
    def run(self, my_param):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(prefect.context.get("task_run_name"))

my_task = MyTask(name="my_task", task_run_name="Run {my_param}")


with Flow("my_flow") as flow:
    my_param = MyParameter(name="my_param", default="test")
    my_task(my_param)

flow.run(parameters={"my_param": "test"})
d
Hi Anna, thanks for the quick reply. Sorry for not making myself clearer. In original example, what I am aiming for is the have the task_run_name set to 'Run test - Customised', instead of 'Run test'.
Also, my actual use case is to try to template the target based on 'modified parameter'. I am showing you this task_run_name example because it's easier to write.
a
So the flow example I sent you does exactly that - the task run name is Run test - Customized:
Copy code
[2021-11-09 13:36:33+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'my_flow'
[2021-11-09 13:36:33+0100] INFO - prefect.TaskRunner | Task 'my_param': Starting task run...
[2021-11-09 13:36:33+0100] INFO - prefect.TaskRunner | Task 'my_param': Finished task run for task with final state: 'Success'
[2021-11-09 13:36:33+0100] INFO - prefect.TaskRunner | Task 'my_task': Starting task run...
[2021-11-09 13:36:33+0100] INFO - prefect.my_task | Run test - Customised
[2021-11-09 13:36:33+0100] INFO - prefect.TaskRunner | Task 'my_task': Finished task run for task with final state: 'Success'
[2021-11-09 13:36:33+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
d
ah sorry for missing that part - i'll study your example
👍 1
a
btw I tested this with targets and it should work, too:
Copy code
import prefect
from prefect.engine.results import LocalResult
from prefect.engine.serializers import JSONSerializer
from prefect import Task, Flow, Parameter


FLOW_NAME = "templatable_target"


class MyParameter(Parameter):
    def run(self):
        param = super().run()
        return param + " - Customised"


class MyTask(Task):
    def run(self, param):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(prefect.context.get("task_run_name"))
        return param


my_task = MyTask(
    name="my_task",
    target="Run {param}.txt",
    checkpoint=True,
    result=LocalResult(
        dir="/Users/anna/community", serializer=JSONSerializer()
    ),
    task_run_name="Run {param}",
)


with Flow(FLOW_NAME) as flow:
    my_param = MyParameter(name="param", default="test")
    my_task(my_param)
But you need to register the flow and run it using the backend, e.g. from the UI or CLI. Using flow.run() in a local process will not generate a result file.
d
Thanks. So going back to the previous example, it seems when I change the parametrisation of the task run method, i.e.
Copy code
class MyTask(Task):
    def run(self, my_param2):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(prefect.context.get("task_run_name"))
it stops working.
According to this doc https://docs.prefect.io/core/concepts/templating.html, objects available to be used in templating are • parameter • context • task inputs I think your example works because 'my_param' happens to be a task input as well.
a
that was my understanding of what you want to achieve. So far, I understood you have a Parameter task. Then you have another task MyTask that receives this parameter as data dependency - therefore, you can use the task input templating for that. If I misunderstood your use case, can you describe it a bit more?
d
Sorry again for not being crystal clear. We have quite a few flows; all of them have a same set of parameters (modified similarly to my toy example). Within each flow, we want to checkpoint some tasks (by setting 'target'). However, not all parameters are direct inputs to these tasks. My hope is to use the modified params (i.e. return value of the parameter run method), rather than the raw strings in templating the target.
a
I see. In that case, perhaps it would make sense to build it directly into your custom tasks? Especially, if you use imperative API, you can subclass Task class and build your target logic this way. I can have a look later, here is a documentation that explains this: https://docs.prefect.io/core/about_prefect/thinking-prefectly.html#task-classes
d
Understood - we'll look into it. Just curious, is exposing the parameter original string (rather than the run result) to templating a specific design choice?
a
I’m not sure if I understand the question - can you explain? In the examples we discussed, the parameter value was passed as data dependency as an input to the next task, and therefore it could be used as a template variable in the task input template. But
Parameter("name", default="some value")
is a Prefect task, not a string, so to pass it downstream, the run method of this Prefect task must be executed and it then returns the parameter value, which could be a string. Does it make sense?
d
So in the templating document, one thing that's available for templating is the flow parameters. From what I have observed, what is really available is the user-supplied parameter value or the default value, rather than the output of the parameter's run method. Is this behaviour a design choice?
a
You meant this: https://docs.prefect.io/core/concepts/templating.html#using-flow-parameters now I understand. Let me test it with some examples and get back to you later
d
Exactly
Great - thanks!
a
@Dan Zhao I’ve tried that with the example from the docs and your use case works well, as long as you add the Parameter task to your flow before registration (
flow.add_task(result_basepath)
), it doesn’t have to be included in your task and you can point the results to the target based on your custom Parameter value provided at runtime (or using default value if no value was provided at runtime).
Copy code
import os
from prefect import task, Flow, Parameter
from prefect.engine.results import LocalResult
from prefect.engine.serializers import JSONSerializer


result_basepath = Parameter("result_basepath", default="~")


def format_location(result_basepath, task_name, **kwargs):
    # Notice we expand the user at runtime so the user of the parameter
    # does not need to worry about the path to the home directory on the
    # server that the flow runs on
    return f"{os.path.expanduser(result_basepath)}/prefect_results/{task_name}.prefect"


@task(result=LocalResult(location=format_location, serializer=JSONSerializer()))
def my_task():
    return 99


with Flow("local-result-parametrized") as flow:
    my_task()

if __name__ == "__main__":
    # Ensure the parameter is registered to the flow even though no task uses it
    flow.add_task(result_basepath)
    flow.register("p")
You can see in the main that we first add this parameter to the flow, then we register the flow. Then when I run it with the default parameter:
Copy code
```
the result is stored in ~/prefect_results/my_task.prefect.

When I run it with a custom parameter value:
```prefect run --id f3c076a7-6703-47bc-9d59-e78dad8baebd --param result_basepath="/Users/anna/repos" --watch
My result is stored in /Users/anna/repos/prefect_results/my_task.prefect LMK if something is unclear or if you have any other questions about it
d
Hi Anna, slightly modifying your example:
Copy code
import os
from prefect import task, Flow, Parameter
from prefect.engine.results import LocalResult
from prefect.engine.serializers import JSONSerializer

class MyParameter(Parameter):
    def run(self):
        param = super().run()
        return param + '/customized'
result_basepath = MyParameter("result_basepath", default="~")


def format_location(result_basepath, task_name, **kwargs):
    # Notice we expand the user at runtime so the user of the parameter
    # does not need to worry about the path to the home directory on the
    # server that the flow runs on
    return f"{os.path.expanduser(result_basepath)}/prefect_results/{task_name}.prefect"


@task(result=LocalResult(location=format_location, serializer=JSONSerializer()))
def my_task():
    return 99


with Flow("local-result-parametrized") as flow:
    my_task()

if __name__ == "__main__":
    # Ensure the parameter is registered to the flow even though no task uses it
    flow.add_task(result_basepath)
    flow.register("p")
If I run with {'result_basepath': 'some_path'}, it will save results to some_path/prefect_results/my_task.prefect
But instead I am hoping to use the 'modified' version of the parameter, i.e. some_path/customized. Ideally the result gets saved to some_path/customized/prefect_results/my_task.prefect
a
Can you use this?
Copy code
def format_location(result_basepath, task_name, **kwargs):
    # Notice we expand the user at runtime so the user of the parameter
    # does not need to worry about the path to the home directory on the
    # server that the flow runs on
    return f"{os.path.expanduser(result_basepath)}/customized/prefect_results/{task_name}.prefect"
d
preferably (and logically) this modification logic stays within the parameter run method the modified parameter is also an input to some tasks
a
@Dan Zhao but you need to run it with the backend, not just flow.run() i.e. you need to register and then trigger from UI or from “prefect run” CLI
@Dan Zhao I think the only way to make it work with custom Parameter class would be to call it within the format_function:
Copy code
import os
from prefect import task, Flow, Parameter
from prefect.engine.results import LocalResult
from prefect.engine.serializers import JSONSerializer


class MyParameter(Parameter):
    def run(self):
        param = super().run()
        return param + '/customized'


result_basepath = MyParameter("result_basepath", default="~")
# result_basepath = Parameter("result_basepath", default="~")


def format_location(task_name, **kwargs):
    # Notice we expand the user at runtime so the user of the parameter
    # does not need to worry about the path to the home directory on the
    # server that the flow runs on
    path = result_basepath.run()
    return f"{os.path.expanduser(path)}/prefect_results/{task_name}.prefect"


@task(result=LocalResult(location=format_location, serializer=JSONSerializer()))
def my_task():
    return 99


with Flow("local-result-parametrized") as flow:
    my_task()

if __name__ == "__main__":
    # Ensure the parameter is registered to the flow even though no task uses it
    flow.add_task(result_basepath)
    flow.register("p")
I would still recommend the approach below, but you can choose the option that you prefer:
Copy code
def format_location(result_basepath, task_name, **kwargs):
    # Notice we expand the user at runtime so the user of the parameter
    # does not need to worry about the path to the home directory on the
    # server that the flow runs on
    return f"{os.path.expanduser(result_basepath)}/customized/prefect_results/{task_name}.prefect"
d
understood
👍 1
another slightly relevant question, is there a way to specify which tasks to run first in a flow? (without adding dependencies)
a
you can pass it as upstream_tasks
Copy code
@task(result=LocalResult(location=format_location, serializer=JSONSerializer()))
def my_task():
    return 99

@task
def task_a():
    return "a"


with Flow("local-result-parametrized") as flow:
    a = task_a()
    my_task(upstream_tasks=[a])
d
does this create a dependency between tasks?
More specifically, in the FlowRunner.run() method, there is an argument named 'task_states'. It seems you can specify which tasks to run first. Is it possible to make use of this feature when scheduling flow runs? https://docs.prefect.io/api/latest/engine/flow_runner.html#flowrunner-2
Sorry please ignore my previous comments - I need to think my questions through
👍 1