https://prefect.io logo
Title
f

Federico Zambelli

02/15/2023, 11:03 PM
Hello everyone! I'm experimenting with prefect2.0 and I ran in a situation I don't know how to address, hope someone can help me here. I'm downloading a bunch of CSV files, save them locally, and upload them to BigQuery one by one. Since data is quite large, I'm trying to skip the download task if the file has already been downloaded previously. Prefect1 had the
target
functionality that seemed to fit my case, but it doesn't exist in prefect 2.0. I tried in so many different ways but I can't figure out how. Reason I'm saving locally as an intermediate step is because I run OOM otherwise. My code (simplified) goes like this:
@task
def download(url, filename):
    file_path = f'/absolute/path/to/{filename}.csv'
    with open(file_path, 'wb') as file:
        res = requests.get(url+filename)
        file.write(res.content)
    return file_path

@task
def write_to_bq(file_path):
    df = pd.read_csv(file_path)
    df.to_gbq(...)

@flow
def download_all(url_list):
    paths = []
    for url in url_list:
        file_path = download(url)
        paths.append(file_path)
    return paths

@flow
def upload_all(paths):
    for path in paths:
        write_to_bq(path)

@flow
def main(url_list):
    paths = download_all(url_list)
    upload_all(paths)
r

Ryan Peden

02/15/2023, 11:22 PM
Could you use task caching to avoid downloading a file that has already been downloaded? See this section of the docs for more information: https://docs.prefect.io/concepts/tasks/#caching
If you're trying to avoid repetition based on url and/or filename, I think that would accomplish the same thing you could achieve with targets in Prefect 1.
f

Federico Zambelli

02/15/2023, 11:25 PM
That was the idea, but I have no idea how to cache something that returns a
path
rather than the file data itself (see the task
download
)
and if I return the data, it wouldn't be a
dataframe
, but a bytes blob given that I download it using
requests
so yeah i'm a bit at loss here
r

Ryan Peden

02/15/2023, 11:38 PM
If you've already downloaded the data for a given url and filename, will it already exist at
file_path
? If so, then
from prefect.tasks import task_input_hash
And then
@task(cache_key_fn=task_input_hash)
def download(url, download):
  ...the rest of your code
Should just return the file path immediately without re-downloading if you've previously called
download
with a specific url + filename combo. Is that what you're trying to do? Apologies if I'm misunderstanding.
f

Federico Zambelli

02/15/2023, 11:40 PM
yes, exactly what i'm trying to do, except that then it complains that my cache somehow doesn't exist. I get a
prefect.exceptions.missingresult: State data is missing.
error
r

Ryan Peden

02/15/2023, 11:51 PM
I'll see if I can find what's causing that error. It seems environment-specific because when I add caching on that function on my laptop, it works. Can you run
prefect version
and paste the output here?
f

Federico Zambelli

02/23/2023, 9:14 AM
@Ryan Peden hey, sorry I was on holiday 😅 . My prefect version is
2.7.11
running on python
3.10
(in linux ubuntu on wsl2)
@Ryan Peden a small update: I tried it today again without changing any code and for some magical reason this time it worked. The only difference, however, is that this time instead of clicking "Retry" in the task run (see screen 1), I ran it from the deployment. In fact, if I do click retry, even after successfully completing a run that used the cache, I get the same error that I described above (see screen 2). Is this a bug or is there something im not understanding in the "retry" functionality from orion UI?