Pooyan
11/13/2023, 12:39 AM@task(name='my_task', description='This will trigger a kubernetes spark job', retries=5, retry_delay_seconds=exponential_backoff(backoff_factor=600))
def my_task(processing_parameters):
call_spark_end_point(processing_parameters)
return
@flow(name='my_flow')
def my_flow(processing_parameters):
my_task(processing_parameters)
return
Marvin
11/13/2023, 12:40 AMmy_task
to complete before it returns.
To achieve this, you can use Prefect's wait_for_task
function. This function will block the flow until the specified task has completed. Here's how you can modify your code to use `wait_for_task`:
from prefect import task, flow, wait_for_task
@task(name='my_task', description='This will trigger a kubernetes spark job', retries=5, retry_delay_seconds=exponential_backoff(backoff_factor=600))
def my_task(processing_parameters):
call_spark_end_point(processing_parameters)
return
@flow(name='my_flow')
def my_flow(processing_parameters):
task_result = my_task(processing_parameters)
wait_for_task(task_result)
return
In this modified code, wait_for_task
is called with the result of my_task
. This will cause the flow to block until my_task
has completed.
Please note that wait_for_task
is a blocking operation, and it will prevent the flow from doing anything else until the specified task has completed. If you have other tasks that can be run in parallel, you might want to consider running them before you call wait_for_task
.Nate
11/13/2023, 12:42 AMPooyan
11/13/2023, 12:44 AMwait_for_task
🙃Nate
11/13/2023, 1:53 AMPooyan
11/13/2023, 2:14 AM