Kaleb King
07/05/2024, 4:51 PMconnect
, extract
, load
, and transform
. Each task should wait to execute until the previous task is complete. connect
takes in multiple arguments from the flow and returns multiple variables. extract
and takes in multiple arguments including one variable from connect
and returns multiple variables. load
takes in multiple arguments including one variable from connect
and extract
, but doesn't return any variables. transform
takes in multiple arguments including one variable from connect
and extract
, but doesn't return any variables.Marvin
07/05/2024, 4:52 PMpython
from prefect import flow, task
@task
def connect(arg1, arg2):
# logic for connecting
return result1, result2
@task
def extract(arg1, arg2, connect_result):
# logic for extracting
return extract_result1, extract_result2
@task
def load(arg1, arg2, connect_result, extract_result):
# logic for loading
pass
@task
def transform(arg1, arg2, connect_result, extract_result):
# logic for transforming
pass
@flow
def my_flow(arg1, arg2):
conn_result1, conn_result2 = connect(arg1, arg2)
extract_result1, extract_result2 = extract(arg1, arg2, conn_result2)
load(arg1, arg2, conn_result1, extract_result1)
transform(arg1, arg2, conn_result1, extract_result1)
if __name__ == "__main__":
my_flow(arg1, arg2)
In this example:
- connect
is the first task, taking in multiple arguments and returning multiple values.
- extract
depends on the output of connect
.
- load
and transform
both depend on the outputs of connect
and extract
.
Source: Code ExampleKaleb King
07/05/2024, 4:54 PMwait_for
keyword?Marvin
07/05/2024, 4:54 PMwait_for
keyword can be used to create task dependencies within a flow. This keyword allows tasks to wait for the completion of other tasks before starting execution. Here is how you can use it:
### Code Example using wait_for
python
from prefect import task, flow
@task
def connect(arg1, arg2):
return "connect_result"
@task
def extract(arg1, arg2, connect_result):
return "extract_result"
@task
def load(arg1, arg2, connect_result, extract_result):
print("Loading...")
@task
def transform(arg1, arg2, connect_result, extract_result):
print("Transforming...")
@flow
def my_flow(arg1, arg2):
connect_result = connect(arg1, arg2)
extract_result = extract(arg1, arg2, wait_for=[connect_result])
load(arg1, arg2, connect_result, wait_for=[connect_result, extract_result])
transform(arg1, arg2, connect_result, wait_for=[connect_result, extract_result])
if __name__ == "__main__":
my_flow("arg1_value", "arg2_value")
### Quick Explanation
- connect
runs first.
- extract
will only start after connect
completes.
- load
and transform
will only start after both connect
and extract
complete.
Source: GitHub Pull Request - wait_for KeywordKaleb King
07/05/2024, 4:59 PMprefect.exceptions.ParameterBindError: Error binding parameters for function 'extract': missing a required argument: 'connect_result'.
Function 'extract' has signature 'arg1, arg2, connect_result' but received args: ('arg1_value', 'arg2_value') and kwargs: [].
Marvin
07/05/2024, 4:59 PMextract
task. You need to properly pass the results of the previous tasks as arguments. Here is a corrected example demonstrating how to create dependencies between tasks using wait_for
and properly passing arguments:
### Corrected Code Example
python
from prefect import flow, task
@task
def connect(arg1, arg2):
# logic for connecting
return "connect_result"
@task
def extract(arg1, arg2, connect_result):
# logic for extracting
return "extract_result"
@task
def load(arg1, arg2, connect_result, extract_result):
# logic for loading
pass
@task
def transform(arg1, arg2, connect_result, extract_result):
# logic for transforming
pass
@flow
def my_flow(arg1, arg2):
conn_result = connect(arg1, arg2)
extract_result = extract(arg1, arg2, conn_result)
load(arg1, arg2, conn_result, extract_result)
transform(arg1, arg2, conn_result, extract_result)
if __name__ == "__main__":
my_flow("arg1_value", "arg2_value")
### Explanation
- connect
runs first and returns conn_result
.
- extract
waits for connect
to complete and then uses its result.
- load
and transform
wait for both connect
and extract
to complete before running.
Source: Basic Task ExampleKaleb King
07/05/2024, 5:00 PMwait_for
keyword.Marvin
07/05/2024, 5:00 PMwait_for
keyword correctly to create the task dependencies:
### Proper wait_for
Usage Example
python
from prefect import task, flow
@task
def connect(arg1, arg2):
return "connect_result"
@task
def extract(arg1, arg2, connect_result):
return "extract_result"
@task
def load(arg1, arg2, connect_result, extract_result):
print("Loading...")
@task
def transform(arg1, arg2, connect_result, extract_result):
print("Transforming...")
@flow
def my_flow(arg1, arg2):
connect_result = connect(arg1, arg2)
extract_result = extract(arg1, arg2, connect_result, wait_for=[connect_result])
load(arg1, arg2, connect_result, extract_result, wait_for=[connect_result, extract_result])
transform(arg1, arg2, connect_result, extract_result, wait_for=[connect_result, extract_result])
if __name__ == "__main__":
my_flow("arg1_value", "arg2_value")
### Explanation
1. connect
runs first.
2. extract
waits for connect
to complete, specified by wait_for=[connect_result]
.
3. load
and transform
wait for both connect
and extract
to complete, specified by wait_for=[connect_result, extract_result]
.
This ensures that the tasks run in sequence, adhering to the dependencies you've outlined.
### Reference
To learn more about using Prefect and the wait_for
keyword efficiently, please take a look at the Prefect documentation and the GitHub Pull Request about wait_for.