Matic Pecovnik
04/03/2024, 11:53 AM@task
def request_signal_download_task(sig_down_configuration: SignalPackageRequest) -> SignalPackageDto:
return request_signal_download(sig_down_configuration)
@task
def monitor_signal_download_task(signal_package: SignalPackageDto) -> SignalPackageDto:
return monitor_signal_download(signal_package)
@task
def check_signal_download_status(signal_package: SignalPackageDto) -> None:
if signal_package.processStatus == ProcessStatus.DONE:
return
message = get_signal_download_message(signal_package)
raise SignalDownloadError(message)
@flow
def signals_package_flow(signals_params: list[SignalPackageRequest]) -> list[SignalPackageDto]:
signal_packages: list[SignalPackageDto] = []
for signal_param in signals_params:
signal_package = request_signal_download_task(signal_param)
signal_package = monitor_signal_download_task(signal_package)
check_signal_download_status(signal_package)
signal_packages.append(signal_package)
return signal_packages
how can I rewrite this code to perform the sequences of tasks in the for loop in parallelMarvin
04/03/2024, 11:53 AMI have the following snippet:```@task( cache_key_fn=task_input_hash, name="Request Signal Package Download", persist_result=True, result_serializer=RESULT_SERIALIZER, result_storage=RESULT_STORAGE, ) def request_signal_download_task(sig_down_configuration: SignalPackageRequest) -> SignalPackageDto: return request_signal_download(sig_down_configuration) @task( cache_key_fn=task_input_hash, name="Monitor Signal Package Download", persist_result=True, result_serializer=RESULT_SERIALIZER, result_storage=RESULT_STORAGE, ) def monitor_signal_download_task(signal_package: SignalPackageDto) -> SignalPackageDto: return monitor_signal_download(signal_package) @task(cache_key_fn=task_input_hash, name="Check Signal Package Download Status") def check_signal_download_status(signal_package: SignalPackageDto) -> None: if signal_package.processStatus == ProcessStatus.DONE: return message = get_signal_download_message(signal_package) raise SignalDownloadError(message) @flow( name="Signal Package Download Subflow", persist_result=True, result_serializer=RESULT_SERIALIZER, result_storage=RESULT_STORAGE, ) def signals_package_flow(signals_params: list[SignalPackageRequest]) -> list[SignalPackageDto]: signal_packages: list[SignalPackageDto] = [] for signal_param in signals_params: signal_package = request
Matic Pecovnik
04/03/2024, 11:54 AM@task
def request_signal_download_task(sig_down_configuration: SignalPackageRequest) -> SignalPackageDto:
return request_signal_download(sig_down_configuration)
@task
def monitor_signal_download_task(signal_package: SignalPackageDto) -> SignalPackageDto:
return monitor_signal_download(signal_package)
@task
def check_signal_download_status(signal_package: SignalPackageDto) -> None:
if signal_package.processStatus == ProcessStatus.DONE:
return
message = get_signal_download_message(signal_package)
raise SignalDownloadError(message)
@flow
def signals_package_flow(signals_params: list[SignalPackageRequest]) -> list[SignalPackageDto]:
signal_packages: list[SignalPackageDto] = []
for signal_param in signals_params:
signal_package = request_signal_download_task(signal_param)
signal_package = monitor_signal_download_task(signal_package)
check_signal_download_status(signal_package)
signal_packages.append(signal_package)
return signal_packages
how can I rewrite this code to perform the sequences of tasks in the for loop in parallelMarvin
04/03/2024, 11:55 AMasyncio
library. You would need to define your tasks and the flow with async/await. Here's an example of how to do it:
python
from prefect import task, flow, get_run_logger
from prefect.tasks import unmapped
from typing import List
@task
async def request_signal_download_task(sig_down_configuration: SignalPackageRequest) -> SignalPackageDto:
return await request_signal_download(sig_down_configuration)
@task
async def monitor_signal_download_task(signal_package: SignalPackageDto) -> SignalPackageDto:
return await monitor_signal_download(signal_package)
@task
async def check_signal_download_status(signal_package: SignalPackageDto) -> None:
if signal_package.processStatus == ProcessStatus.DONE:
return
message = get_signal_download_message(signal_package)
raise SignalDownloadError(message)
@flow
async def signals_package_flow(signals_params: List[SignalPackageRequest]) -> List[SignalPackageDto]:
tasks = []
for signal_param in signals_params:
signal_package = request_signal_download_task(signal_param)
tasks.append(signal_package)
signal_packages = await asyncio.gather(*tasks)
monitor_tasks = []
for signal_package in signal_packages:
monitored_package = monitor_signal_download_task(signal_package)
monitor_tasks.append(monitored_package)
signal_packages = await asyncio.gather(*monitor_tasks)
await asyncio.gather(*(check_signal_download_status(signal_package) for signal_package in signal_packages))
return signal_packages
# Add import statement
import asyncio
# Entry point of your script
if __name__ == "__main__":
asyncio.run(signals_package_flow(signals_params))
Explanation of changes:
- Converted @task
decorated functions to async def
to allow for asynchronous execution.
- Utilized asyncio.gather
to wait for multiple asynchronous tasks to complete, allowing us to run the for-loop sequences in parallel.
- Added async
to the @flow
decorated function and removed the loop which was collecting the results again (since that's now handled by asyncio.gather
).
This example assumes that request_signal_download
and monitor_signal_download
are asynchronous functions or can be made asynchronous, as they will need to support the await
keyword.
For more information and coding patterns with Prefect, I've extracted this information from the code examples available at Prefect GitHub repository. You can find the raw code examples here.