Good morning everyone question for y’all. I have ...
# ask-community
m
Good morning everyone question for y’all. I have a experimental set up where I have: 1. settings.py: Basically metadata in a dictionary 2. pipeline.py: where I define my flow 3. folder tasks: with several task.py (with different names of course) For now settings.py has the metadata of a single experiment including each experiment directory. Hence it is called/imported in each task file as well as the main pipeline file in order to have it as a global variable sharer across files. For example the results folder path is passed to tasks decorators so that each task knows where to save the result of the task and where to look when cacheing (i am using file based cacheing). The approach works perfectly when I just run one experiment, for which I detail its metadata in settings.py manually. However I have been trying to set a battery of experiments where only one or a few values of the metadata change. For that I attempted to set a second settings file (battery_experiments.py). This file contains lists of values for certain arguments in settings.py. Each of these are used in a for loop where I make several calls of the
@flow
in pipeline.py, each time updating settings.py metadata with values from battery_experiments.py. The issue is that even though the global variables within settings change in each iteration (I print them out), it seem like the task decorator keeps taking the first iteration value always. I am guessing that this happens because the decorators are compiled before anything is ran, so they keep the values in settings.py before any update is done. I am not sure how to make the iterative updates in global variables to be reflected in the task decorator input. Does any of you know if this is possible at all? Thanks a lot!
a
I'm not sure I totally follow, might need to see your code, but tasks are instantiated/defined at import time, so if the settings are being passed to the task decorator, that won't change over the course of the runtime
m
@Austin Weisgrau I see. Is there a way to pass a list to a
@task
decorator argument at import time and use each value at each iteration of the flow call? I guess I could set the import of the tasks within the loop so that at each iteration the value in the task decorator changes? that may actually work but leaves the code in a weird structure with an import at teh end instead of all at the very beginning?
a
I don't totally follow but if you paste some code examples for what you're thinking I could try and help
m
My code is fairly large but I can set a made up example with the logic simplified. Will do that in a second.
a
I think if there are values that you want to be different at runtime, you'd want to pass them as arguments to the task functions themselves rather than passing them to the task decorator...
Copy code
values = [1, 2, 3]

@task
def mytask(value):
   ...

@flow
def myflow(value):
    mytask(value)

for value in values:
    myflow(value)
m
Copy code
settings.py

metadata ={
   'path':'to_be_defined'
}
Copy code
task.py

import settings

@task(cache_key_fn=cache_file_based_fn,
      persist_result=True, 
      result_storage=LocalFileSystem(basepath=settings.metadata['path'])
)
def do_task(args):
   ...
   return
a
ah i see...you could potentially instantiate the task at runtime within the flow and execute it immediately:
Copy code
def mytask():
   ...

@flow
def myflow():
    task(mytask, settings)(args)
m
Copy code
pipeline.py

import settings
import do_task

paths = [path1,path2,path2]

@flow()
def pipeline(path):
   settings.metadata["path"] = path
   do_task(args)

for path in paths:
  flow(path)
I think the example is correct now
a
yeah the way you've set it up won't work because of the reason i stated above, the task decorator is evaluated at import time but you're changing the metadata at run time
but I think my little example should work for what you want to do
m
Isnt that the same than importing the fucntion within the loop?
a
Copy code
@flow()
def pipeline(path):
   settings.metadata["path"] = path
   do_task = task(do_task, cache_key_fn=cache_file_based_fn,
persist_result=True, result_storage=LocalFileSystem(basepath=settings.metadata['path'])
   do_task(args)
rather than using the @task decorator
m
Oh! You can change the decorator arguments in a task call?
Copy code
pipeline.py

import settings

paths = [path1,path2,path2]

@flow()
def pipeline(path):
   import do_task
   settings.metadata["path"] = path
   do_task(args)

for path in paths:
  flow(path)
a
a decorator is evaluated at import time, so if you use a decorator, the value you pass the decorator is going to be wahtever it is when the program first loads
m
Oh I see
That would actually be pretty clean I think
a
a decorator is equivalent to the following:
Copy code
from prefect import task

@task
def mytask():
   ...
is the same as:
Copy code
from prefect import task

def mytask():
   ...

mytask = task(mytask)
m
What is passed at run time overrides the task decorator?
a
the difference is that when you use the decorator, the decorator is evaluated immediately at runtime, vs in the second example, you can control when the "decorator" is evaluated
m
Gotcha
a
no it doesn't override, you have to EITHER use the decorator OR do the second method i'm describing
m
Okay!
Copy code
pipeline.py

import settings

paths = [path1,path2,path2]

@flow()
def pipeline(path):
   settings.metadata["path"] = path
   do_task(args)

for path in paths:
  import do_task
  flow(path)
how about this?
it should be analogous correct?
a
🤔 not quite, because the first time you import do_task, the decorator will evaluate, and it won't override the import and reevaluate the decorator the second time
m
Yeah, but inst it importing at every loop iteration?
So it should iteratively evaluate?
a
no, once a module is imported, it won't reimport
you miiiiight be able to use importlib.reload to get that to work https://stackoverflow.com/questions/1254370/reimport-a-module-while-interactive
m
I was just looking at that yeah
Copy code
import importlib
importlib.reload(module)
@Austin Weisgrau I should use it like this yeah?
Copy code
pipeline.py

import settings
import importlib

paths = [path1,path2,path2]

@flow()
def pipeline(path):
   settings.metadata["path"] = path
   do_task(args)

for path in paths:
  importlib.reload(do_task)
  flow(path)
a
I'm honestly not sure, I haven't used that method before, I just know it exists
👍 1
m
Okay, thanks a lot for all the help!
In case it is of help for anyone: quick update, the importlib method works if specified in the following way:
Copy code
pipeline.py

import settings
import importlib

paths = [path1,path2,path2]

@flow()
def pipeline(path):
   settings.metadata["path"] = path
   importlib.reload(do_task)
   do_task(args)

for path in paths:
  flow(path)
👍 1
Update 2.0: I ran into another issue now. As I said before I am doing file_based cache using
cache_file_based_fn
from this repo i created a couple of months ago. Everything works beautifully for the first run. But when I rerun the pipeline once it has been ran (i.e. the tasks resulting files exists in the corresponding locations) the cacheing does not work properly. The expected behavior would be that each task whose file exists in the specified path is cached and the file loaded. The first task is cached and the file is loaded correctly, however the second one us cache but the file that is loaded is not the one that it generated in the previous run. In fact it loads the one generated by the preceding task whcih is very bizarre. Anyone had this happening? I will generate a simplified code version of what is going on for illustrative purposes.
The structure of
settings.py
is as defined above a metadata variable holder that is imported in task1 and task2 .py files.
Copy code
pipeline.py

impor settings
from tasks import task1, task2
import importlib

@flow
def pipeline(path):
  settings.metadata['path']=path
   
  importlib.reload(task1)
  file1 = task(args)

  importlib.reload(task2)
  file2 = task2(args)

  return

paths = [path1, path2]

for path in paths:
  pipeline(path)
For a bit more of context, I am also getting the following warnings :`@task(name='my_unique_name', ...)`
warnings.warn( UserWarning: A task named 'task1' and defined at 'path/to/task1' conflicts with another task. Consider specifying a unique 'name' parameter in the task definition:
It seems like running the same pipeline in a for loop messes up the run tracking of prefect.
Could it be that by reloading the whole task1,2.py some of the cache memory is lost? I am just speculating here
Let me know if anything is unclear
I can provide with more details of my workflow