https://prefect.io logo
b

bral

08/18/2020, 4:10 PM
Hi all! I have a problem with flow again :)
from prefect import task, Flow, case from random import randint from prefect.schedules import IntervalSchedule from datetime import timedelta @task def is_running(): return False def get_files(): return [i for i in range(0,randint(0,15))] @task def _print(lst): print(lst) print(len(lst)) @task def read_file(file): return True @task def preprocess_file(file): return True @task def save_file(file): return True schedule = IntervalSchedule(interval=timedelta(minutes=1)) with Flow("process", schedule=schedule) as flow: condition = is_running() with case(condition, False) as cond: files = get_files() _print(files) for file in files[03] r = read_file(file) p = preprocess_file(file) s = save_file(file) r.set_downstream(p) p.set_downstream(s) flow.run()
Every time variable 'files' is same. Why?
a

Alex Cano

08/18/2020, 4:15 PM
If you’re running the script and waiting until the schedule kicks it off, my guess is because the:
Copy code
files = get_files()
line only gets executed once, and thus never changes. If you’re wanting it to change, you can wrap it within a task and return it!
upvote 2
j

Jim Crist-Harif

08/18/2020, 4:15 PM
You're calling
get_files
as a function, not a task (there's no
@task
decorator on it). So
get_files
runs at flow build time, while the rest of your functions run at flow run time. This means that the value of
files
is frozen in your flow and won't update each run.
b

bral

08/18/2020, 4:26 PM
Thanks! I added decorator and : how iterate over 'files '?
j

Jim Crist-Harif

08/18/2020, 4:27 PM
You'll want to look at prefect's mapping concept: https://docs.prefect.io/core/concepts/mapping.html#prefect-approach The loop over files needs to be encoded as a mapped task (or a series of mapped tasks) rather than an explicit for loop.
upvote 1
b

bral

08/18/2020, 8:59 PM
@Jim Crist-Harif thanks!