Hello, Could I change the task name on runtime(The main problem I need to cast dict to BaseModel but...
a
Hello, Could I change the task name on runtime(The main problem I need to cast dict to BaseModel but I need to keep the parameter name)? I provide the minimal example with flow in comment
Copy code
from pydantic import BaseModel
from prefect import Flow, Parameter, task
from prefect.tasks.prefect import create_flow_run
from typing import Type


class TestForPrefect(BaseModel):
    id_test: str
    nmb_test: int


@task(name="dict_to_pydantic")
def dict_to_pydantic(data: dict, pydantic_class: Type[BaseModel]) -> BaseModel:
    return pydantic_class(**data)


@task(log_stdout=True)
def change_class_value(test: TestForPrefect) -> TestForPrefect:
    test.id_test = 2
    test.nmb_test = 2
    return test


with Flow("child") as child_flow:
    test_class = dict_to_pydantic(
        Parameter("test_class", required=True), TestForPrefect
    )

    change_class_value(child_flow.get_tasks("test_class")[0])


with Flow("parent") as parent_flow:
    test_class = TestForPrefect(id_test="1", nmb_test=1).dict()
    child_run_id = create_flow_run(
        flow_name=child_flow.name,
        parameters=dict(test_class=test_class),
    )
🙏 1
Copy code
AttributeError: 'dict' object has no attribute 'id_test'
I understand that I can do following, but It isnt what I need :(
Copy code
with Flow("child") as child_flow:
    data_size = Parameter("data_size", default=5)
    test_class = dict_to_pydantic(
        Parameter("test_class", required=True), TestForPrefect
    )

    change_class_value(test_class)
    data = create_some_data(data_size)
a
@Aleksandr Liadov thanks for moving the code to the thread! In general, it’s possible to customize the task name and task run name, as you want. E.g. you can change the task run names based on inputs. If I understand correctly, you only want to change the task name, not the task run name?
this line will not work, because the TestForPrefect is not a Prefect task:
Copy code
test_class = TestForPrefect(id_test="1", nmb_test=1).dict()
but you could do something like this - is this what you were trying to do?
Copy code
from pydantic import BaseModel
from prefect import Flow, Parameter, task
from prefect.tasks.prefect import create_flow_run


class TestForPrefect(BaseModel):
    id_test: str
    nmb_test: int


@task(log_stdout=True)
def get_test_dict():
    return TestForPrefect().dict()


with Flow("parent") as parent_flow:
    data_size = Parameter("data_size", default=5)
    test_dict = get_test_dict()
    child_run_id = create_flow_run(
        flow_name="name",
        parameters=dict(test_dict=test_dict, data_size=data_size),
    )
a
@Anna Geller thanks for you quick response! How I suppose to do If I need to pass the
test_class
from the parent flow to subflow (child_flow)? If I good understand we can pass only
dict
?
a
in the code above we do pass a dictionary, not a class
the data that you pass to the child flow’s parameters can be passed in the same way as you pass data between Prefect tasks, e.g.:
Copy code
@task(log_stdout=True)
def return_some_data():
    return "some data"


with Flow("parent") as parent_flow:
    some_data = return_some_data()
    child_run_id = create_flow_run(
        flow_name="name",
        parameters=dict(some_data=some_data),
    )
a
Ok I understood! Thanks! The second part of my problem, how I could recast this parameter in the child flow keeping the task name?
a
You can use task_args for it, e.g.:
Copy code
with Flow("parent") as parent_flow:
    test_dict = get_test_dict(task_args=dict(name="Your Custom Task Name"))
a
I want to do this:
Copy code
change_class_value(child_flow.get_tasks("test_class")[0])
But I cannot, because
test_class
is a dict
a
it’s probably easier to set the task name using
task_args
as above, or even directly on the task decorator:
Copy code
@task(name="Some Custom Task Name")
def get_test_dict():
    return TestForPrefect().dict()
a
Ok in this simple case, I agree, that it’s easier to change only task_name using the decorator. The real problem when I do this (every Feature inherits Task):
Copy code
for feature in dependency_ordered_features:
             feature_results = feature()(
                 **{
                     parameter: flow.get_tasks(parameter)[0]
                     for parameter in feature.serializable_parameters
                 }
             )
In every feature we specify the name of the serializable parameters. So parameters are passed from parent flow like dicts. But features wait for BaseModel. Is it possible to cast them(using one common function
dict_to_pydantic
from my example) and keep the right name (from parameters)?
a
hard to say immediately. Can you build a minimal complete example of a child and parent flow I could reproduce?
a
Yes, I ping you, when it’ll be ready! Thanks for your help 🙂
@Anna Geller , your solution:
Copy code
test_dict = get_test_dict(task_args=dict(name="Your Custom Task Name"))
Work nice! Thanks!
🙌 1
@Côme Arvis, @Jean-David Fiquet
👀 1