Louis Eisenberg
01/05/2022, 9:42 PMelectric-albatross
) to the app_sync_flow_claim_lock.R
script. (The reason is that I want the claim operation to fail only if the lock is already held and the holder is not this run. So the claim script needs to know the name of the current run.)
with Flow("App sync") as flow:
app_claim_lock = ShellTask(
command = "sudo make run-interactive scripts/deploy/app_sync_flow_claim_lock.R",
task_args = dict(
name = "app sync flow claim lock",
max_retries=5,
retry_delay=datetime.timedelta(minutes=10),
)
)
Kevin Kho
prefect.context.flow_run_name
as with any of the things in the context . But this has to be done inside a task because tasks are deferred while the stuff in the Flow block is not (graph is built eagerly). So I think you need to
1. Get the name from a task
2. Pass it to the .run method of the ShellTask
For Tasks made with the Task class like the ones in the Task Library, you can call the run with a second parenthesis
with Flow(..) as flow:
ShellTask(init stuff here)(run stuff here)
or
shell = ShellTask(init stuff here)
with Flow(..) as flow:
shell(run stuff here)
Louis Eisenberg
01/05/2022, 10:06 PMFLOW_RUN_NAME
available as an env var in the spawned process?
shell = ShellTask(
env=dict(FLOW_RUN_NAME=context.flow_run_name),
)
with Flow("App sync") as flow:
app_claim_lock = shell(
command = "sudo make run-interactive scripts/deploy/app_sync_flow_claim_lock.R",
task_args = dict(
name = "app sync flow claim lock",
max_retries=5,
retry_delay=datetime.timedelta(minutes=10),
)
)
Kevin Kho
shell = ShellTask(
env=dict(FLOW_RUN_NAME=context.flow_run_name),
)
gets evaluated at registration time but this flow_run_name
only exists at run time. Also I don’t think you can do
from prefect import context
You need to do:
import prefect
prefect.context.flow_run_name
So flow_run_name
needs to be pulled in a task because task execution is deferred to run time, and then you can pass it in the run
of the ShellTaskKevin Kho
shell = ShellTask()
@task
def get_flow_run_name():
return prefect.context.flow_run_name
with Flow("App sync") as flow:
flow_run_name = get_flow_run_name()
app_claim_lock = shell(
command = "sudo make run-interactive scripts/deploy/app_sync_flow_claim_lock.R",
env=dict(FLOW_RUN_NAME=flow_run_name),
task_args = dict(
name = "app sync flow claim lock",
max_retries=5,
retry_delay=datetime.timedelta(minutes=10),
)
)
Louis Eisenberg
01/05/2022, 10:11 PMKevin Kho