hey folks. Hopefully simple question, I’m testing ...
# prefect-community
a
hey folks. Hopefully simple question, I’m testing out prefect as a replacement to my org’s current tools. I want the ability to restart a failed flow manually, AFTER I make changes in the code base. Basically — I’m anticipating flows to fail and I’ll need to make tweaks to our credentials or logic in the code. But I don’t want to re-run the entire flow, just pick up at the current task where it failed after making changes. I can’t seem to register the new version of my edited flow as the same version id as the prior one. I’d need to do this because I need to restart the prior version that failed. The only thing I can do is publish the new version say (version id 15) and then version 14 that failed get’s archived and I can’t restart it with the new code base from version id 15
k
You can’t do this because you need to re-register the changes (mostly). If you use script-based storage, changes can be reflected but it’s only for changes within a task. If you create more tasks or alter the DAG structure, it will throw an error. You can also use caching and start a new flow run so that it doesn’t re-run the previously run tasks
a
k
Yes exactly
Caching and Targets are just two different forms of caching.
a
my flows in concept will be basic. I want to record if I’ve already downloaded a certain file so I can return the file url in a mapped task with that txt file defined and if it fails on file #10, it will skip the first 9 b/c they’ve been recorded already
ah so yes, that begs the question of how this works with a mapped task
I’ll test it out 🙂
k
Use that with this for mapping
a
@Kevin Kho
@Kevin Kho couldn’t get this to work. I used task_run_name={url} (url is my mapped input from a list of urls). And I did target = “{date:%A}/“{task_name}{url}.txt”
and that threw an error
“{date:%A}/“{task_name}.txt” works but it caches just the first URL and skips the rest b/c it found a return value in that file
k
Is url an input into the task?
a
it’s an input into the function.
k
Like
Copy code
@task
def mytask(url):
a
yup
and then I’m just doing mytask.map(listofURLS)
k
You could try the callable way?
a
I think I’d get the same output. So the task run name successfully changes to the specific url. in the UI I see 4 different tasks being run. (4 URLS)
but the target doesn’t update to the new task name. my result folder is just the base name of my function
I was able to use task_full_name and get 4 different txt file outputs
would be nice tho if I just had 1 generated file
k
This is a working example
Copy code
import os
os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "true"

from prefect import Flow, task 
from prefect.engine.results import LocalResult

@task(result=LocalResult("/Users/kevinkho/Work/scratch"), target="{x}.txt")
def abc(x):
    return x

with Flow("test") as flow:
    abc.map(["a","b","c"])

flow.run()
Just change the directory to your current and you should see the 3 new files
a
duh, I think I know why.. it’s a url. so there are chars in the url that can’t be in a filename
got it. thank you for the example, you
you’re best the best!
k
oh lol that makes sense!