Zachary Loertscher
08/11/2023, 4:11 PM@flow(
name=f'{dev_stage} - Airbyte',
log_prints=True,
task_runner=DaskTaskRunner( #allows for parallel execution
cluster_kwargs={
"processes": False, # use threads instead of processes
"n_workers": 8,
"threads_per_worker": 1 # number of threads per worker
}
)
)
I want to make n_workers
dynamic based on a parameter I pass in at deployment time (i.e. through the CLI command prefect deployment build --param n_workers=8
.
Is this possible?
For context, I have a couple of lists of airbyte_connections that I want to run with different n_workers
due to the size of the tables being loaded. But I don't want to create a whole new flow just to run a different section of airbyte_connections (breaks DRY principles)
It seems that parameters are local to a flow, but cannot be used outside of the flow..Tim Galvin
08/11/2023, 4:19 PMflow
have an with_options
method added to them where you can overload the parameter. That is the immediate thing that comes to mind. I am not sure how that would integrate with the deployment use case you have here.Zachary Loertscher
08/11/2023, 4:35 PMif __name__ == "__main__":
job_name = 'redshift' # sys.argv[0] #grabs the first parameter passed in at flow-run time, set in main.yml
run_frequency = 'daily' # sys.argv[1]
if job_name == 'redshift':
redshift_dask_cluster_kwargs = {
"processes": False, # use threads instead of processes
"n_workers": 3,
"threads_per_worker": 1 # number of threads per worker
}
flow_airbyte.with_options(
task_runner=DaskTaskRunner( #allows for parallel execution
cluster_kwargs=redshift_dask_cluster_kwargs
)
)(job_name, run_frequency)
else:
flow_airbyte(job_name, run_frequency)
I'll test this out - the job_name
comes from the --params
passed in with prefect deployment build
and informs how to set the DaskTaskRunner cluster_kwargs. Just unsure if it will run..Tim Galvin
08/11/2023, 5:33 PM