An Hoang
12/13/2021, 1:55 PMflow_result = flow.run(...)
? I'm missing a couple of output files and I have no idea where they are saved so trying to hunt them down to prevent memory leak (not in my expected folder and also not in ~/.prefect/results
)Anna Geller
Kevin Kho
An Hoang
12/13/2021, 4:59 PM@task(target=target_file_str, result = LocalResult(dir="./test_prefect"))
def task_A(...):
helper_func_A()
I need to change directory for specific tasks/part of the flow. Here is my code for changing directory, mimicking cd
in bash
from functools import wraps
from contextlib import contextmanager
@contextmanager
def cd(newdir):
prevdir = os.getcwd()
os.chdir(os.path.expanduser(newdir))
try:
yield
finally:
os.chdir(prevdir)
def change_cwd_dir(new_dir):
def decorator(func):
@wraps(func)
def wrapped_func(*args, **kwargs):
with cd(new_dir):
func_result = func(*args, **kwargs)
return func_result
return wrapped_func
return decorator
1. Changing where I call flow.run
with cd("new/dir"):
flow.run(...)
2. Changing directory within a task
@task(target=target_file_str, result = LocalResult(dir="./test_prefect"))
def task_A(...):
with cd("new/dir"):
helper_func_A()
3. Wrapping it around a decorator before wrapping it with task
@task(target=target_file_str, result = LocalResult(dir="./test_prefect"))
@change_cwd_dir("new/dir")
def task_A(...):
helper_func_A()
None of these worked. Some targets went to the correct directory ./test_prefect
, some were at the wrong place , some went to ~/.prefect/results
and some went somewhere that I don't know, but I'm sure it was persisted because those tasks were skipped in a repeat run. The wrong directories are not related to new/dir
It was a humbling and frustrating experience 😅. Any insights as to what might have went wrong here?Kevin Kho
An Hoang
12/13/2021, 5:09 PMprefect run
process there lol... Definitely needs to be looked at though. I think since the context change, it changes the meaning of where ./test_prefect
is