Samuel Kohlleffel
03/22/2023, 8:07 PM@task(task_run_name="{input_dict['key']}")
def task(input_dict: dict):
print(input_dict['key'])
Bianca Hoch
03/22/2023, 9:55 PMfrom prefect import task, flow
@task(task_run_name="{greeting}-{name}")
def my_task(name, greeting):
print(f"I got a new name!")
@flow()
def my_flow():
my_dictionary = {"name" : "Bianca"}
return my_task(name= my_dictionary['name'], greeting = "My New Name is ")
my_flow()
Samuel Kohlleffel
03/22/2023, 10:29 PMBianca Hoch
03/23/2023, 5:23 PMOuail Bendidi
03/29/2023, 9:05 AMimport pandas as pd
class TemplatedStr(str):
"""A string that can be formatted using items from a dict.
Example:
>>> d = {"user": {"name": "John"}, "work": {"tasks": [1, 2, 3]}}
>>> s = TemplatedStr("Hello {user_name}, you have {len_work_tasks} tasks to do")
>>> s.format(**d)
"Hello John, you have 3 tasks to do"
"""
def format(self, **kwargs):
# convert {'a': {'b': 1}} into {'a_b': 1}
_normalized = pd.json_normalize(kwargs, sep="_")
_normalized_dict = _normalized.to_dict(orient="records")[0]
# add len_* for all lists in normlized_dict
_lenght_dict = {
f"len_{k}": len(v)
for k, v in _normalized_dict.items()
if hasattr(v, "__len__")
}
parsed_kwargs = {
**_normalized_dict,
**_lenght_dict,
**kwargs,
}
return super().format(**parsed_kwargs)