Tom Shaffner
12/03/2021, 12:28 AMKevin Kho
Tom Shaffner
12/03/2021, 12:30 AMTom Shaffner
12/03/2021, 9:48 PMKevin Kho
Tom Shaffner
12/03/2021, 10:04 PMinc_or_negate
function isn't actually a task, and I made mine mirroring that documentation, so while I can add templated task run names to my equivalents of the inc
or negate
functions, both the inc_or_negate
function or the case
,create_flow_run
, and wait_for_flow_run
functions in it all have these generic mapped names.Kevin Kho
Tom Shaffner
12/03/2021, 10:09 PMTom Shaffner
12/06/2021, 5:18 PMKevin Kho
Kevin Kho
task_args
like this:
from prefect import Flow, task, case, apply_map
from prefect.tasks.control_flow import merge
@task
def inc(x):
return x + 1
@task
def negate(x):
return -x
@task()
def is_even(x):
if x == 2:
raise ValueError()
return x % 2 == 0
def inc_or_negate(x):
cond = is_even(x)
# If x is even, increment it
with case(cond, True):
res1 = inc(x, task_args={"name":"test-{x}"})
# If x is odd, negate it
with case(cond, False):
res2 = negate(x)
return merge(res1, res2)
with Flow("apply-map example") as flow:
result = apply_map(inc_or_negate, range(4))
flow.run()
Kevin Kho
Tom Shaffner
12/06/2021, 5:58 PM<Task: Constant[list]>
instead of the name (I'm passing a list to the apply_map function). Is there a way to pull the value from it instead of the type?Kevin Kho
Tom Shaffner
12/06/2021, 6:01 PMTom Shaffner
12/06/2021, 6:08 PMcurrent_flow = create_flow_run(_flow_name_ = flow_name,_run_name_ = _f_'{datetime.now()} {flow_name} attempt.',_project_name_ = PROJECT_NAME,_task_args_={"name":"Retry: {flow_name}"})
wait_for_flow_run(current_flow, _raise_final_state_=True,_task_args_={"name":"Wait for Retry: {flow_name}"})
in those cases flow_name is one of the mapped values that I pass in to my equivalent of the inc_or_negate
functionKevin Kho
Kevin Kho
create_flow_run
for the run_name
, there is a chance that gets fixed to registration timeKevin Kho
Tom Shaffner
12/07/2021, 8:17 PMtask_args={"name":"test-{x}"}
which is pretty much the same as mine _task_args_={"name":"Wait for Retry: {flow_name}"}
where flow_name
is my equivalent of x (input to the function). Am I missing something/
When I try it that way I still don't get any value, it all just shows up as {flow_name}. Weird.Kevin Kho
wait_for_flow_run(current_flow, _raise_final_state_=True,_task_args_={"name":"Wait for Retry: {flow_name}"})
so {flow_name} is not the name of the input into the function. I think you need this to be:
wait_for_flow_run(current_flow, _raise_final_state_=True,_task_args_={"name":"Wait for Retry: {current_flow}"})
Kevin Kho
wait_for_flow_run(current_flow, _raise_final_state_=True,_task_args_={"name":"Wait for Retry: {flow_run_id}"})
Kevin Kho
Tom Shaffner
12/07/2021, 8:42 PMtask_args={"name":"test-{x}"}
?
If so I must be doing something else different; the only difference I can see that should matter is that x is a list in my case, but everything I do seems to either simply use the variable name I put in it or give me back <Task: Constant[list]>
Kevin Kho
flow.run()
so it’s easy to test