I couldn't find this in the docs, but I'm hoping i...
# ask-community
l
I couldn't find this in the docs, but I'm hoping it's a simple question: is there a way to pass the name of the current run as an argument to a task? In a flow like the one below, I want to pass the name of the current run (e.g.
electric-albatross
) to the
app_sync_flow_claim_lock.R
script. (The reason is that I want the claim operation to fail only if the lock is already held and the holder is not this run. So the claim script needs to know the name of the current run.)
Copy code
with Flow("App sync") as flow:
    app_claim_lock = ShellTask(
        command = "sudo make run-interactive scripts/deploy/app_sync_flow_claim_lock.R",
        task_args = dict(
            name = "app sync flow claim lock",
            max_retries=5,
            retry_delay=datetime.timedelta(minutes=10),
        )
    )
k
Hi @Louis Eisenberg, you can access this with
prefect.context.flow_run_name
as with any of the things in the context . But this has to be done inside a task because tasks are deferred while the stuff in the Flow block is not (graph is built eagerly). So I think you need to 1. Get the name from a task 2. Pass it to the .run method of the ShellTask For Tasks made with the Task class like the ones in the Task Library, you can call the run with a second parenthesis
Copy code
with Flow(..) as flow:
    ShellTask(init stuff here)(run stuff here)
or
Copy code
shell = ShellTask(init stuff here)
with Flow(..) as flow:
    shell(run stuff here)
l
Thanks! I want to make sure I'm understanding your advice properly. So I could do something like this and then I would have
FLOW_RUN_NAME
available as an env var in the spawned process?
Copy code
shell = ShellTask(
    env=dict(FLOW_RUN_NAME=context.flow_run_name),
)

with Flow("App sync") as flow:
    app_claim_lock = shell(
        command = "sudo make run-interactive scripts/deploy/app_sync_flow_claim_lock.R",
        task_args = dict(
            name = "app sync flow claim lock",
            max_retries=5,
            retry_delay=datetime.timedelta(minutes=10),
        )
    )
k
No because
Copy code
shell = ShellTask(
    env=dict(FLOW_RUN_NAME=context.flow_run_name),
)
gets evaluated at registration time but this
flow_run_name
only exists at run time. Also I don’t think you can do
from prefect import context
You need to do:
import prefect
prefect.context.flow_run_name
So
flow_run_name
needs to be pulled in a task because task execution is deferred to run time, and then you can pass it in the
run
of the ShellTask
I think it will be like this
Copy code
shell = ShellTask()

@task
def get_flow_run_name():
    return prefect.context.flow_run_name

with Flow("App sync") as flow:
    flow_run_name = get_flow_run_name()
    app_claim_lock = shell(
        command = "sudo make run-interactive scripts/deploy/app_sync_flow_claim_lock.R",
        env=dict(FLOW_RUN_NAME=flow_run_name),
        task_args = dict(
            name = "app sync flow claim lock",
            max_retries=5,
            retry_delay=datetime.timedelta(minutes=10),
        )
    )
l
Oh, OK, I get it now. Sorry to make you spell it out! Thanks again.
k
No worries at all! It can be a bit unintuitive