Bastian Röhrig
09/08/2021, 8:51 AMcreate_flow_run and get_task_run_result to interact with child flows. Now I would love to set some general task properties like state_handlers and task_run_name on these. Maybe I am missing something, but I can't figure out how to do this without writing my own wrapper task. Is there a way to do this?Kevin Kho
create_flow_run call through parameters if that is what you’re saying. If you’re saying you have a common state handler that you want to attach to all tasks, then yes it’s very common for users to either use a function to set it, or to make their own version of the task decorator.Kevin Kho
def custom_task(func=None, **task_init_kwargs):
if func is None:
return partial(custom_task, **task_init_kwargs)
@wraps(func)
def safe_func(**kwargs):
try:
return func(**kwargs)
except Exception as e:
print(f"Full Traceback: {traceback.format_exc()}")
raise RuntimeError(type(e)) from None # from None is necessary to not log the stacktrace
safe_func.__name__ = func.__name__
return task(safe_func, **task_init_kwargs)
@custom_task
def abc(x):
return xBastian Röhrig
09/08/2021, 2:14 PMcreate_flow_run call through parameters. Do you happen to know why that is not supported or are there plans to support it in the future?Kevin Kho
state_handler needs to be attached during registration. The task is already registered by the name you are doing create_flow_run. But there is a workaround here. You can do prefect.context.get("parameters") inside a state_handler. This returns a dict of values, and then you can do an if-else based on that. The task_run_name can be altered based on inputs so you shouldn’t have a problem there.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by