Nicolas Ouporov
03/19/2024, 2:39 AMNate
03/19/2024, 3:25 AMreturn_state
here in combination with retries
import httpx
from prefect import flow, task
from prefect.states import Failed
@task(retries=1)
def read_file(url: str):
try:
r = httpx.get(url)
r.raise_for_status()
except httpx.HTTPStatusError as e:
return Failed(message=f"got a {e.response.status_code}")
@flow
def ocr():
state = read_file("<http://google.com/api>", return_state=True)
if state.is_failed():
if "404" in state.message: # or whatever you need to check for
print("doing something specific + trying again")
read_file("<https://www.prefect.io>")
else:
raise RuntimeError(state.message)
return "all good"
Nicolas Ouporov
03/19/2024, 4:10 AMNicolas Ouporov
03/19/2024, 11:15 PM@flow
async def run_ocr_in_batches(
all_file_contents: List[bytes], metadata: List[dict], batch_size: int = 10
):
tasks = []
client_count = len(document_intelligence_clients)
for i, batch in enumerate(chunks(zip(all_file_contents, metadata), batch_size)):
batch_list = list(batch)
client = document_intelligence_clients[i % client_count]
for file_content, metadata in batch_list:
# Schedule the task for execution and store the future
future = run_ocr.submit(
client, file_content, metadata["file_path"], metadata["page_count"], i % client_count
)
tasks.append((future, metadata["file_path"]))
# Await the futures and gather results
results = []
for future, file_path in tasks:
ocr_result = future.result()
processed_result = process_ocr_results(ocr_result, file_path)
results.append(processed_result)
return results
Nate
03/19/2024, 11:16 PMreturn_state
and retries
should work the same way, just have to await
your async
stuffNate
03/19/2024, 11:18 PMNicolas Ouporov
03/19/2024, 11:18 PMNate
03/19/2024, 11:23 PMIn [1]: from prefect import flow, task, unmapped
18:21:47.252 | DEBUG | prefect.profiles - Using profile 'pong'
In [2]: @task
...: def foo(dynamic, static):
...: print(f"got {dynamic} and {static}")
...: return dynamic
...:
In [3]: @flow(log_prints=True)
...: def f():
...: futures = foo.map(range(10), unmapped(42))
...: more_futures = foo.map(futures, unmapped("baz")) # futures get resolved by us
...: print([f.result() for f in more_futures])
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Nate
03/19/2024, 11:23 PMmap
calls submit
for each item in an iterable input, and you can have unmapped
or static
parameters that don't get mapped overNicolas Ouporov
03/19/2024, 11:24 PMNate
03/19/2024, 11:24 PMNicolas Ouporov
03/19/2024, 11:24 PMNicolas Ouporov
03/20/2024, 12:34 AM@task(_retries_=2, _retry_delay_seconds_=[1, 30], _retry_jitter_factor_=0.5)
when the error code is 429 and use the custom logic as you suggested when the error code is 500?Nate
03/20/2024, 1:11 AMretry_condition_fn
Nate
03/20/2024, 1:12 AMNicolas Ouporov
03/20/2024, 1:15 AM@task(retries=1)
def read_file(url: str):
try:
r = httpx.get(url)
r.raise_for_status()
except httpx.HTTPStatusError as e:
return Failed(message=f"got a {e.response.status_code}")
@flow
def ocr():
state = read_file("<http://google.com/api>", return_state=True)
if state.is_failed():
if "404" in state.message: # or whatever you need to check for
print("doing something specific + trying again")
read_file("<https://www.prefect.io>")
else:
raise RuntimeError(state.message)
return "all good"
Nate
03/20/2024, 1:17 AMNicolas Ouporov
03/20/2024, 1:23 AMNate
03/20/2024, 1:26 AMNate
03/20/2024, 1:27 AMNicolas Ouporov
03/20/2024, 1:27 AMNate
03/20/2024, 1:29 AMNicolas Ouporov
03/20/2024, 1:31 AM