Tim Galvin
11/06/2022, 6:29 AMdef main(
sbid,
my_args,
cluster,
):
dask_runner = get_dask_runner(cluster)
# Define flow
@flow(
name=f"Processing holography -- {sbid}",
task_runner=dask_runner,
)
def my_flow():
logger = get_run_logger()
task_super_awesome_worker()
my_flow()
This is a simplified example - but in short I am creating my flow function encapsulated in another function. I am going this as some attributed of the flow's task runner need to be defined at runtime depending on the SLURM cluster it is being executed on, account running the flow, compute resources for the slurm job etc (specified via the dask_jobqeue.SLURMCluster
module/class).
On the CLI my attempts to set the entrypoint
prefect deployment build /path/to/my/flow_script.py:my_flow
results in an error that amounts to my_flow
not being in flow_script.py
. If I try to replace my_flow
with main
, I get an error about main
not being a Flow
. In the docs I do not see an example of how to do this.
So, I am wondering how does one do something like this?main
with a @flow
without any bells and whistles and run with that? Or, is there something else that is better supported?Mason Menges
11/07/2022, 4:43 PM@task
def some_task():
print("some_task")
@flow
def other_flow():
some_task()
@flow
def main_flow():
dask_runner = get_dask_runner(cluster)
other_flow.with_options(task_runner=dask_runner)(some_params)
main_flow()
Tim Galvin
11/08/2022, 1:19 AM.with_options
method though, which would help stream line my codez. At the time it felt like a simple flow
wrap was a little to cheeky and not really solving anything -- but this flow/subflow set up is perfectly supported by prefect. I should not have been worried 😛