https://prefect.io logo
z

Zachary Loertscher

08/11/2023, 4:11 PM
Hi all! Is there a way to make a flow decorator argument dynamic based on a flow parameter? For example, here are my @flow decorator arguments:
Copy code
@flow(
	name=f'{dev_stage} - Airbyte',
	log_prints=True,
	task_runner=DaskTaskRunner( #allows for parallel execution
		cluster_kwargs={
            "processes": False, # use threads instead of processes
            "n_workers": 8,
            "threads_per_worker": 1 # number of threads per worker
        }
    )
)
I want to make
n_workers
dynamic based on a parameter I pass in at deployment time (i.e. through the CLI command
prefect deployment build --param n_workers=8
. Is this possible? For context, I have a couple of lists of airbyte_connections that I want to run with different
n_workers
due to the size of the tables being loaded. But I don't want to create a whole new flow just to run a different section of airbyte_connections (breaks DRY principles) It seems that parameters are local to a flow, but cannot be used outside of the flow..
t

Tim Galvin

08/11/2023, 4:19 PM
Functions decorated with a
flow
have an
with_options
method added to them where you can overload the parameter. That is the immediate thing that comes to mind. I am not sure how that would integrate with the deployment use case you have here.
thank you 1
z

Zachary Loertscher

08/11/2023, 4:35 PM
Hmmm... perhaps something like this?
Copy code
if __name__ == "__main__":
    job_name = 'redshift' # sys.argv[0] #grabs the first parameter passed in at flow-run time, set in main.yml
    run_frequency = 'daily' # sys.argv[1]
    if job_name == 'redshift':
        redshift_dask_cluster_kwargs = {
            "processes": False, # use threads instead of processes
            "n_workers": 3,
            "threads_per_worker": 1 # number of threads per worker
        }
        flow_airbyte.with_options(
            task_runner=DaskTaskRunner( #allows for parallel execution
            cluster_kwargs=redshift_dask_cluster_kwargs
            )  
        )(job_name, run_frequency)
    else:
        flow_airbyte(job_name, run_frequency)
I'll test this out - the
job_name
comes from the
--params
passed in with
prefect deployment build
and informs how to set the DaskTaskRunner cluster_kwargs. Just unsure if it will run..
This appears to work - thank you @Tim Galvin!
t

Tim Galvin

08/11/2023, 5:33 PM
💥 nice work - glad I could help!