Getting Error when trying to pass data from task W...
# prefect-community
y
Getting Error when trying to pass data from task What is the proper way to do this ? See code in Thread
z
dt = datetime.strptime(dt, '%Y-%m-%d')
will need to occur in a task, as will
n_days = (start_date - dt).days
upvote 1
k
Hey @YD, could you move the code to the thread so we can keep the main channel clean? These two lines:
Copy code
dt = datetime.strptime(dt, '%Y-%m-%d')
        n_days = (start_date - dt).days
will not work because
dt
doesn’t have a value yet until the Flow is run. You need to put these commands in tasks so that they are run when the Flow runs. @Zanie was faster than me but I’ll just leave my answer which is the same
z
😄
y
to here ?
Getting Error when trying to pass data from task What is the proper way to do this ?
Copy code
@task(nout=2)
def get_dt():
    dt = (datetime.now() - timedelta(days=3)).strftime('%Y-%m-%d')
    x = 5
    return x, dt


@task()
def t_1(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(x * 10)


@task()
def t_2(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(x * 100)


def main():
    start_date = datetime.now(timezone('US/Pacific'))
    schedule = Schedule(clocks=[CronClock('30 8 * * *', start_date=start_date)])

    with Flow("dt_passing", schedule=schedule, executor=LocalDaskExecutor()) as flow:
        x, dt = get_dt()
        dt = datetime.strptime(dt, '%Y-%m-%d')
        n_days = (start_date - dt).days

        with case(n_days > 4, True):
            t_1(x)
            
        with case(n_days > 4, False):
            t_2(x)

    flow.run()
    print(x, dt)
Error:
Copy code
dt = datetime.strptime(dt, '%Y-%m-%d')
TypeError: strptime() argument 1 must be str, not GetItem
k
Yep thank you!
y
So, are you saying that the
n_days
should be calculated in a task ? is the x value OK ?
k
Yes x is fine. Stuff that is not in a task is executed as the flow is constructed (build time), not during flow run time
y
checking…
Works… when I print the data at the end of the flow
Copy code
flow.run()
print(x, dt)
I get
<Task: get_dt[0]> <Task: get_dt[1]>
is there a different way to print ?
k
You have to print or use the logger in a task.
Or you can do something like this for local execution
y
Another question related to this flow, I use conditional logic and not sure of: • can we merge values from tasks not in “case” statement ? • how does the
merge
work? it is not actually merging the data, but just wait for task to be done ? is this valid flow ?
Copy code
with Flow("conditional-branches") as flow:
    val3 = some_task()
    
    cond = check_condition()

    with case(cond, True):
        val1 = action_if_true()

    with case(cond, False):
        val2 = action_if_false()

    val = merge(val1, val2, val3)

    another_action(val)
k
I think you can because merge is just getting the first non null value. In this case though I dont think val3 will ever be picked up
y
should I just add at the end of each case a parameter,
case_true = True
,
case_false = True
and then
val = merge(case_true, case_false)
?
k
I am not sure what that is intending to do?
merge
takes care of merging task results
The example here just gets the first non null value for merge
From the API docs:
Copy code
The merge will return the first real result it encounters, or None. If multiple tasks might return a result, group them with a list.