https://prefect.io logo
#prefect-community
Title
# prefect-community
a

Adam

07/08/2022, 2:47 PM
hey folks. Hopefully simple question, I’m testing out prefect as a replacement to my org’s current tools. I want the ability to restart a failed flow manually, AFTER I make changes in the code base. Basically — I’m anticipating flows to fail and I’ll need to make tweaks to our credentials or logic in the code. But I don’t want to re-run the entire flow, just pick up at the current task where it failed after making changes. I can’t seem to register the new version of my edited flow as the same version id as the prior one. I’d need to do this because I need to restart the prior version that failed. The only thing I can do is publish the new version say (version id 15) and then version 14 that failed get’s archived and I can’t restart it with the new code base from version id 15
k

Kevin Kho

07/08/2022, 2:50 PM
You can’t do this because you need to re-register the changes (mostly). If you use script-based storage, changes can be reflected but it’s only for changes within a task. If you create more tasks or alter the DAG structure, it will throw an error. You can also use caching and start a new flow run so that it doesn’t re-run the previously run tasks
a

Adam

07/08/2022, 2:56 PM
k

Kevin Kho

07/08/2022, 2:56 PM
Yes exactly
Caching and Targets are just two different forms of caching.
a

Adam

07/08/2022, 2:57 PM
my flows in concept will be basic. I want to record if I’ve already downloaded a certain file so I can return the file url in a mapped task with that txt file defined and if it fails on file #10, it will skip the first 9 b/c they’ve been recorded already
ah so yes, that begs the question of how this works with a mapped task
I’ll test it out 🙂
k

Kevin Kho

07/08/2022, 3:00 PM
Use that with this for mapping
a

Adam

07/08/2022, 7:01 PM
@Kevin Kho
@Kevin Kho couldn’t get this to work. I used task_run_name={url} (url is my mapped input from a list of urls). And I did target = “{date:%A}/“{task_name}{url}.txt”
and that threw an error
“{date:%A}/“{task_name}.txt” works but it caches just the first URL and skips the rest b/c it found a return value in that file
k

Kevin Kho

07/08/2022, 7:04 PM
Is url an input into the task?
a

Adam

07/08/2022, 7:04 PM
it’s an input into the function.
k

Kevin Kho

07/08/2022, 7:04 PM
Like
Copy code
@task
def mytask(url):
a

Adam

07/08/2022, 7:04 PM
yup
and then I’m just doing mytask.map(listofURLS)
k

Kevin Kho

07/08/2022, 7:05 PM
You could try the callable way?
a

Adam

07/08/2022, 7:06 PM
I think I’d get the same output. So the task run name successfully changes to the specific url. in the UI I see 4 different tasks being run. (4 URLS)
but the target doesn’t update to the new task name. my result folder is just the base name of my function
I was able to use task_full_name and get 4 different txt file outputs
would be nice tho if I just had 1 generated file
k

Kevin Kho

07/08/2022, 7:11 PM
This is a working example
Copy code
import os
os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "true"

from prefect import Flow, task 
from prefect.engine.results import LocalResult

@task(result=LocalResult("/Users/kevinkho/Work/scratch"), target="{x}.txt")
def abc(x):
    return x

with Flow("test") as flow:
    abc.map(["a","b","c"])

flow.run()
Just change the directory to your current and you should see the 3 new files
a

Adam

07/08/2022, 7:13 PM
duh, I think I know why.. it’s a url. so there are chars in the url that can’t be in a filename
got it. thank you for the example, you
you’re best the best!
k

Kevin Kho

07/08/2022, 7:14 PM
oh lol that makes sense!
2 Views