Hi all! I have a problem with flow again :)
# prefect-community
b
Hi all! I have a problem with flow again :)
from prefect import task, Flow, case from random import randint from prefect.schedules import IntervalSchedule from datetime import timedelta @task def is_running(): return False def get_files(): return [i for i in range(0,randint(0,15))] @task def _print(lst): print(lst) print(len(lst)) @task def read_file(file): return True @task def preprocess_file(file): return True @task def save_file(file): return True schedule = IntervalSchedule(interval=timedelta(minutes=1)) with Flow("process", schedule=schedule) as flow: condition = is_running() with case(condition, False) as cond: files = get_files() _print(files) for file in files[03] r = read_file(file) p = preprocess_file(file) s = save_file(file) r.set_downstream(p) p.set_downstream(s) flow.run()
Every time variable 'files' is same. Why?
a
If you’re running the script and waiting until the schedule kicks it off, my guess is because the:
Copy code
files = get_files()
line only gets executed once, and thus never changes. If you’re wanting it to change, you can wrap it within a task and return it!
upvote 2
j
You're calling
get_files
as a function, not a task (there's no
@task
decorator on it). So
get_files
runs at flow build time, while the rest of your functions run at flow run time. This means that the value of
files
is frozen in your flow and won't update each run.
b
Thanks! I added decorator and : how iterate over 'files '?
j
You'll want to look at prefect's mapping concept: https://docs.prefect.io/core/concepts/mapping.html#prefect-approach The loop over files needs to be encoded as a mapped task (or a series of mapped tasks) rather than an explicit for loop.
upvote 1
b
@Jim Crist-Harif thanks!