Kyle Pierce
05/25/2022, 6:53 PMcheck_file_generated.map(
api.map(task_scheduler(configs,
single_date=single_date,
target_dates=target_dates)
)
)
Kevin Kho
unmapped()
for one of the items?Kyle Pierce
05/25/2022, 7:05 PMKevin Kho
Kyle Pierce
05/25/2022, 7:11 PMtask_scheduler
and it goes to api
and one of items doesnt return so it returns 2 items to check_file_generated
that would cause it to use the last output from api
3 times?Kevin Kho
A -> B -> C
and all are map and A is 2 items but B is 3 items and C is 3 items, only 2 will run. Let me show an example.Kyle Pierce
05/25/2022, 7:15 PMFAIL
signal because the file isnt ready but ultimately has a Success
signal.
Would that be a 3 > 2 > 1 (repeated 3 times) output?Kevin Kho
from prefect import Flow, task
@task
def A(x):
return x
@task
def B(x):
if x == 2:
raise ValueError()
return x
@task(log_stdout=True)
def C(x):
print(x)
return x
with Flow("..") as flow:
a = A.map([1,2,3])
b = B.map(a)
c = C.map(b)
flow.run()
It should just show TriggerFailed
. Do you think you can edit this example to clarify the behavior? Sorry it’s not 100% in my head yet.with Flow("..") as flow:
C.map(B.map(A()))
I also don’t see how the length of the list could change between those unless you have other inputs to B
and C
Kyle Pierce
05/25/2022, 7:24 PMfrom prefect import Flow, task
@task
def A(arg1, arg2):
return [arg1A, arg1B, arg2A, arg2B]
@task
def B(x, y):
if x == 2:
raise ValueError()
return x
@task(log_stdout=True)
def C(x, y):
print(x)
return x
with Flow("..") as flow:
a = A(arg1, arg2)
b = B.map(a)
c = C.map(b)
flow.run()
A creates a list from multiple argsA
function that is creating the list of arguments for function B, but function B spits out the values correctly. There is just something happening in step C that isnt use the out generated.Kevin Kho
A
is not a mapped call. Should still be fine. I had an argument y
for B
and C
because I was trying something else but you can remove those and run and I think it should be as expectedKyle Pierce
05/25/2022, 7:33 PM