Aleksandr Liadov
11/26/2021, 2:44 PMAleksandr Liadov
11/26/2021, 2:46 PMfrom pydantic import BaseModel
from prefect import Flow, Parameter, task
from prefect.tasks.prefect import create_flow_run
from typing import Type
class TestForPrefect(BaseModel):
id_test: str
nmb_test: int
@task(name="dict_to_pydantic")
def dict_to_pydantic(data: dict, pydantic_class: Type[BaseModel]) -> BaseModel:
return pydantic_class(**data)
@task(log_stdout=True)
def change_class_value(test: TestForPrefect) -> TestForPrefect:
test.id_test = 2
test.nmb_test = 2
return test
with Flow("child") as child_flow:
test_class = dict_to_pydantic(
Parameter("test_class", required=True), TestForPrefect
)
change_class_value(child_flow.get_tasks("test_class")[0])
with Flow("parent") as parent_flow:
test_class = TestForPrefect(id_test="1", nmb_test=1).dict()
child_run_id = create_flow_run(
flow_name=child_flow.name,
parameters=dict(test_class=test_class),
)
Aleksandr Liadov
11/26/2021, 2:46 PMAttributeError: 'dict' object has no attribute 'id_test'
Aleksandr Liadov
11/26/2021, 2:46 PMwith Flow("child") as child_flow:
data_size = Parameter("data_size", default=5)
test_class = dict_to_pydantic(
Parameter("test_class", required=True), TestForPrefect
)
change_class_value(test_class)
data = create_some_data(data_size)
Anna Geller
Anna Geller
test_class = TestForPrefect(id_test="1", nmb_test=1).dict()
Anna Geller
from pydantic import BaseModel
from prefect import Flow, Parameter, task
from prefect.tasks.prefect import create_flow_run
class TestForPrefect(BaseModel):
id_test: str
nmb_test: int
@task(log_stdout=True)
def get_test_dict():
return TestForPrefect().dict()
with Flow("parent") as parent_flow:
data_size = Parameter("data_size", default=5)
test_dict = get_test_dict()
child_run_id = create_flow_run(
flow_name="name",
parameters=dict(test_dict=test_dict, data_size=data_size),
)
Aleksandr Liadov
11/26/2021, 2:54 PMtest_class
from the parent flow to subflow (child_flow)?
If I good understand we can pass only dict
?Anna Geller
Anna Geller
@task(log_stdout=True)
def return_some_data():
return "some data"
with Flow("parent") as parent_flow:
some_data = return_some_data()
child_run_id = create_flow_run(
flow_name="name",
parameters=dict(some_data=some_data),
)
Aleksandr Liadov
11/26/2021, 2:57 PMAnna Geller
with Flow("parent") as parent_flow:
test_dict = get_test_dict(task_args=dict(name="Your Custom Task Name"))
Aleksandr Liadov
11/26/2021, 2:58 PMchange_class_value(child_flow.get_tasks("test_class")[0])
But I cannot, because test_class
is a dictAnna Geller
task_args
as above, or even directly on the task decorator:
@task(name="Some Custom Task Name")
def get_test_dict():
return TestForPrefect().dict()
Aleksandr Liadov
11/26/2021, 3:17 PMfor feature in dependency_ordered_features:
feature_results = feature()(
**{
parameter: flow.get_tasks(parameter)[0]
for parameter in feature.serializable_parameters
}
)
In every feature we specify the name of the serializable parameters.
So parameters are passed from parent flow like dicts.
But features wait for BaseModel.
Is it possible to cast them(using one common function dict_to_pydantic
from my example) and keep the right name (from parameters)?Anna Geller
Aleksandr Liadov
11/26/2021, 3:23 PMAleksandr Liadov
11/26/2021, 3:30 PMtest_dict = get_test_dict(task_args=dict(name="Your Custom Task Name"))
Work nice!
Thanks!Aleksandr Liadov
11/26/2021, 3:35 PMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by