Marco Barbero Mota
06/07/2023, 5:14 PM@flow
in pipeline.py, each time updating settings.py metadata with values from battery_experiments.py.
The issue is that even though the global variables within settings change in each iteration (I print them out), it seem like the task decorator keeps taking the first iteration value always. I am guessing that this happens because the decorators are compiled before anything is ran, so they keep the values in settings.py before any update is done. I am not sure how to make the iterative updates in global variables to be reflected in the task decorator input.
Does any of you know if this is possible at all?
Thanks a lot!Austin Weisgrau
06/07/2023, 5:17 PMMarco Barbero Mota
06/07/2023, 5:20 PM@task
decorator argument at import time and use each value at each iteration of the flow call?
I guess I could set the import of the tasks within the loop so that at each iteration the value in the task decorator changes? that may actually work but leaves the code in a weird structure with an import at teh end instead of all at the very beginning?Austin Weisgrau
06/07/2023, 5:22 PMMarco Barbero Mota
06/07/2023, 5:23 PMAustin Weisgrau
06/07/2023, 5:23 PMvalues = [1, 2, 3]
@task
def mytask(value):
...
@flow
def myflow(value):
mytask(value)
for value in values:
myflow(value)
Marco Barbero Mota
06/07/2023, 5:26 PMsettings.py
metadata ={
'path':'to_be_defined'
}
task.py
import settings
@task(cache_key_fn=cache_file_based_fn,
persist_result=True,
result_storage=LocalFileSystem(basepath=settings.metadata['path'])
)
def do_task(args):
...
return
Austin Weisgrau
06/07/2023, 5:29 PMdef mytask():
...
@flow
def myflow():
task(mytask, settings)(args)
Marco Barbero Mota
06/07/2023, 5:31 PMpipeline.py
import settings
import do_task
paths = [path1,path2,path2]
@flow()
def pipeline(path):
settings.metadata["path"] = path
do_task(args)
for path in paths:
flow(path)
Marco Barbero Mota
06/07/2023, 5:33 PMAustin Weisgrau
06/07/2023, 5:33 PMAustin Weisgrau
06/07/2023, 5:33 PMMarco Barbero Mota
06/07/2023, 5:34 PMAustin Weisgrau
06/07/2023, 5:35 PM@flow()
def pipeline(path):
settings.metadata["path"] = path
do_task = task(do_task, cache_key_fn=cache_file_based_fn,
persist_result=True, result_storage=LocalFileSystem(basepath=settings.metadata['path'])
do_task(args)
Austin Weisgrau
06/07/2023, 5:35 PMMarco Barbero Mota
06/07/2023, 5:35 PMMarco Barbero Mota
06/07/2023, 5:35 PMpipeline.py
import settings
paths = [path1,path2,path2]
@flow()
def pipeline(path):
import do_task
settings.metadata["path"] = path
do_task(args)
for path in paths:
flow(path)
Austin Weisgrau
06/07/2023, 5:36 PMMarco Barbero Mota
06/07/2023, 5:36 PMMarco Barbero Mota
06/07/2023, 5:36 PMAustin Weisgrau
06/07/2023, 5:37 PMfrom prefect import task
@task
def mytask():
...
is the same as:
from prefect import task
def mytask():
...
mytask = task(mytask)
Marco Barbero Mota
06/07/2023, 5:37 PMAustin Weisgrau
06/07/2023, 5:37 PMMarco Barbero Mota
06/07/2023, 5:37 PMAustin Weisgrau
06/07/2023, 5:37 PMMarco Barbero Mota
06/07/2023, 5:38 PMMarco Barbero Mota
06/07/2023, 5:38 PMpipeline.py
import settings
paths = [path1,path2,path2]
@flow()
def pipeline(path):
settings.metadata["path"] = path
do_task(args)
for path in paths:
import do_task
flow(path)
Marco Barbero Mota
06/07/2023, 5:38 PMMarco Barbero Mota
06/07/2023, 5:39 PMAustin Weisgrau
06/07/2023, 5:39 PMMarco Barbero Mota
06/07/2023, 5:40 PMMarco Barbero Mota
06/07/2023, 5:40 PMAustin Weisgrau
06/07/2023, 5:40 PMAustin Weisgrau
06/07/2023, 5:41 PMMarco Barbero Mota
06/07/2023, 5:43 PMimport importlib
importlib.reload(module)
Marco Barbero Mota
06/07/2023, 6:23 PMpipeline.py
import settings
import importlib
paths = [path1,path2,path2]
@flow()
def pipeline(path):
settings.metadata["path"] = path
do_task(args)
for path in paths:
importlib.reload(do_task)
flow(path)
Austin Weisgrau
06/07/2023, 6:26 PMMarco Barbero Mota
06/07/2023, 6:31 PMMarco Barbero Mota
06/07/2023, 7:53 PMpipeline.py
import settings
import importlib
paths = [path1,path2,path2]
@flow()
def pipeline(path):
settings.metadata["path"] = path
importlib.reload(do_task)
do_task(args)
for path in paths:
flow(path)
Marco Barbero Mota
06/07/2023, 9:19 PMcache_file_based_fn
from this repo i created a couple of months ago.
Everything works beautifully for the first run. But when I rerun the pipeline once it has been ran (i.e. the tasks resulting files exists in the corresponding locations) the cacheing does not work properly.
The expected behavior would be that each task whose file exists in the specified path is cached and the file loaded.
The first task is cached and the file is loaded correctly, however the second one us cache but the file that is loaded is not the one that it generated in the previous run. In fact it loads the one generated by the preceding task whcih is very bizarre. Anyone had this happening?
I will generate a simplified code version of what is going on for illustrative purposes.Marco Barbero Mota
06/07/2023, 9:33 PMsettings.py
is as defined above a metadata variable holder that is imported in task1 and task2 .py files.
pipeline.py
impor settings
from tasks import task1, task2
import importlib
@flow
def pipeline(path):
settings.metadata['path']=path
importlib.reload(task1)
file1 = task(args)
importlib.reload(task2)
file2 = task2(args)
return
paths = [path1, path2]
for path in paths:
pipeline(path)
Marco Barbero Mota
06/07/2023, 9:36 PMwarnings.warn( UserWarning: A task named 'task1' and defined at 'path/to/task1' conflicts with another task. Consider specifying a unique 'name' parameter in the task definition:
It seems like running the same pipeline in a for loop messes up the run tracking of prefect.Marco Barbero Mota
06/07/2023, 9:37 PMMarco Barbero Mota
06/07/2023, 9:48 PMMarco Barbero Mota
06/07/2023, 9:48 PM