Federico Zambelli
02/15/2023, 11:03 PMtarget
functionality that seemed to fit my case, but it doesn't exist in prefect 2.0. I tried in so many different ways but I can't figure out how. Reason I'm saving locally as an intermediate step is because I run OOM otherwise.
My code (simplified) goes like this:
@task
def download(url, filename):
file_path = f'/absolute/path/to/{filename}.csv'
with open(file_path, 'wb') as file:
res = requests.get(url+filename)
file.write(res.content)
return file_path
@task
def write_to_bq(file_path):
df = pd.read_csv(file_path)
df.to_gbq(...)
@flow
def download_all(url_list):
paths = []
for url in url_list:
file_path = download(url)
paths.append(file_path)
return paths
@flow
def upload_all(paths):
for path in paths:
write_to_bq(path)
@flow
def main(url_list):
paths = download_all(url_list)
upload_all(paths)
Ryan Peden
02/15/2023, 11:22 PMFederico Zambelli
02/15/2023, 11:25 PMpath
rather than the file data itself (see the task download
)dataframe
, but a bytes blob given that I download it using requests
Ryan Peden
02/15/2023, 11:38 PMfile_path
?
If so, then
from prefect.tasks import task_input_hash
And then
@task(cache_key_fn=task_input_hash)
def download(url, download):
...the rest of your code
Should just return the file path immediately without re-downloading if you've previously called download
with a specific url + filename combo.
Is that what you're trying to do? Apologies if I'm misunderstanding.Federico Zambelli
02/15/2023, 11:40 PMprefect.exceptions.missingresult: State data is missing.
errorRyan Peden
02/15/2023, 11:51 PMprefect version
and paste the output here?Federico Zambelli
02/23/2023, 9:14 AM2.7.11
running on python 3.10