Kaleb King
06/06/2024, 3:33 PMfrom prefect import task, flow
from extract import extract_data
@task
def extract_task(secret_block_name, sql_file_path, log_file):
return extract_data(secret_block_name, sql_file_path, log_file)
@flow(name="Test Extraction Flow")
def test_flow():
sql_file_path = 'queries/customer.sql'
log_file = 'logs/customer_test.log'
secret_block_name = 'esser-openedge-production-connection-string'
return extract_task(secret_block_name, sql_file_path, log_file)
if __name__ == "__main__":
test_flow()
Marvin
06/06/2024, 3:34 PMtest_flow
to take input parameters:
1. Define the parameters in the test_flow
function.
2. Update the function call in the main
section to pass the arguments dynamically.
Here's your modified code:
python
from prefect import task, flow
from extract import extract_data
@task
def extract_task(secret_block_name, sql_file_path, log_file):
return extract_data(secret_block_name, sql_file_path, log_file)
@flow(name="Test Extraction Flow")
def test_flow(secret_block_name: str, sql_file_path: str, log_file: str):
return extract_task(secret_block_name, sql_file_path, log_file)
if __name__ == "__main__":
# Example arguments
secret_block_name = 'esser-openedge-production-connection-string'
sql_file_path = 'queries/customer.sql'
log_file = 'logs/customer_test.log'
# Pass parameters to the flow
test_flow(secret_block_name, sql_file_path, log_file)
This allows you to call test_flow
with different parameters. More detailed documentation on passing parameters to flows can be found here.
For an example of a Prefect flow that takes parameters, you can refer to this code example.