Michael Hadorn
02/11/2022, 8:21 AMflow.visualize()
in Orion?
I understand the concept and benefits for the use with decorators, it's easy. But not that flexible.
Thanks a lot for any clarification.Anna Geller
Michael Hadorn
02/11/2022, 2:18 PMDeploymentSpec
as prefect deployment create xx
will not result in display the flow in the GUI). - > Maybe it's a bug?
About my second question:
I try to explain the process of building my flow:
• I've one task (one function)
• This task has to be executed two times (this could be a generic amount)
◦ e.g. task_run name a
, b
• each task run (a, b), know he's own dependencies
◦ e.g. a
depends on b
With prefect core I could create all tasks, and then set all references (and every reference was valid, because all tasks exists already).
With Orion it seems I can not create tasks and set the deps later.
At the moment of set task_run a()
I can not set the wait_for b
, because b
is no task at this moment.
How can I add deps later? I understand from your answer, that there is a way to still change deps at runtime. How can I do this?
I would like, that i do not have to solve the calls of my tasks by myself.
I also try to code this. My problem is this block at # second: set deps according to external logic
from prefect import task, flow, get_run_logger
from prefect.deployments import DeploymentSpec
@task
def do_stuff(part):
logger = get_run_logger()
<http://logger.info|logger.info>(f"""{part} - here i\'am""")
@flow(name='late_deps')
def late_deps():
# key = task_run-name; value = [deps]
task_exec_config = {
'a': ['b'] # a needs b
, 'b': None # b can start
}
# collection of prefect PrefectFutures with task_run-name as key to maintain references
task_exec = {}
# first: creation of all existing tasks
for task_exec_name, task_deps_config in task_exec_config.items():
task_exec[task_exec_name] = do_stuff.with_options(name=task_exec_name)(task_exec_name)
# do_stuff.with_options(name=task_exec_name)(task_exec_name)
# second: set deps according to external logic -> this will not work
for task_exec_name, task_deps_config in task_exec_config.items():
for dep in task_deps_config:
print(f'Task {task_exec_name} needs task {dep}')
# if __name__ == '__main__':
# late_deps()
DeploymentSpec(
flow=late_deps,
name="late_deps",
)
Anna Geller
Michael Hadorn
02/14/2022, 10:01 AMAnna Geller
Michael Hadorn
02/14/2022, 10:21 AMAnna Geller
Michael Hadorn
02/14/2022, 10:53 AM