https://prefect.io logo
Title
y

YD

03/01/2022, 6:14 PM
Getting Error when trying to pass data from task What is the proper way to do this ? See code in Thread
z

Zanie

03/01/2022, 6:18 PM
dt = datetime.strptime(dt, '%Y-%m-%d')
will need to occur in a task, as will
n_days = (start_date - dt).days
:upvote: 1
k

Kevin Kho

03/01/2022, 6:18 PM
Hey @YD, could you move the code to the thread so we can keep the main channel clean? These two lines:
dt = datetime.strptime(dt, '%Y-%m-%d')
        n_days = (start_date - dt).days
will not work because
dt
doesn’t have a value yet until the Flow is run. You need to put these commands in tasks so that they are run when the Flow runs. @Zanie was faster than me but I’ll just leave my answer which is the same
z

Zanie

03/01/2022, 6:19 PM
😄
y

YD

03/01/2022, 6:20 PM
to here ?
Getting Error when trying to pass data from task What is the proper way to do this ?
@task(nout=2)
def get_dt():
    dt = (datetime.now() - timedelta(days=3)).strftime('%Y-%m-%d')
    x = 5
    return x, dt


@task()
def t_1(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(x * 10)


@task()
def t_2(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(x * 100)


def main():
    start_date = datetime.now(timezone('US/Pacific'))
    schedule = Schedule(clocks=[CronClock('30 8 * * *', start_date=start_date)])

    with Flow("dt_passing", schedule=schedule, executor=LocalDaskExecutor()) as flow:
        x, dt = get_dt()
        dt = datetime.strptime(dt, '%Y-%m-%d')
        n_days = (start_date - dt).days

        with case(n_days > 4, True):
            t_1(x)
            
        with case(n_days > 4, False):
            t_2(x)

    flow.run()
    print(x, dt)
Error:
dt = datetime.strptime(dt, '%Y-%m-%d')
TypeError: strptime() argument 1 must be str, not GetItem
k

Kevin Kho

03/01/2022, 6:23 PM
Yep thank you!
y

YD

03/01/2022, 6:24 PM
So, are you saying that the
n_days
should be calculated in a task ? is the x value OK ?
k

Kevin Kho

03/01/2022, 6:28 PM
Yes x is fine. Stuff that is not in a task is executed as the flow is constructed (build time), not during flow run time
y

YD

03/01/2022, 6:42 PM
checking…
Works… when I print the data at the end of the flow
flow.run()
print(x, dt)
I get
<Task: get_dt[0]> <Task: get_dt[1]>
is there a different way to print ?
k

Kevin Kho

03/01/2022, 6:51 PM
You have to print or use the logger in a task.
Or you can do something like this for local execution
y

YD

03/01/2022, 7:19 PM
Another question related to this flow, I use conditional logic and not sure of: • can we merge values from tasks not in “case” statement ? • how does the
merge
work? it is not actually merging the data, but just wait for task to be done ? is this valid flow ?
with Flow("conditional-branches") as flow:
    val3 = some_task()
    
    cond = check_condition()

    with case(cond, True):
        val1 = action_if_true()

    with case(cond, False):
        val2 = action_if_false()

    val = merge(val1, val2, val3)

    another_action(val)
k

Kevin Kho

03/01/2022, 7:34 PM
I think you can because merge is just getting the first non null value. In this case though I dont think val3 will ever be picked up
y

YD

03/01/2022, 7:36 PM
should I just add at the end of each case a parameter,
case_true = True
,
case_false = True
and then
val = merge(case_true, case_false)
?
k

Kevin Kho

03/01/2022, 7:42 PM
I am not sure what that is intending to do?
merge
takes care of merging task results
The example here just gets the first non null value for merge
From the API docs:
The merge will return the first real result it encounters, or None. If multiple tasks might return a result, group them with a list.