Stéphan Taljaard
10/31/2022, 1:20 PMprefect.context
, notably the flow_name
, flow_run_name
, and flow_run_id
. Is there a way to pass the entire flow run context to a task? Or, is there a way to access the flow run context from within a task?
If I uncomment the middle line in my flow function (see in the thread), the flow run seems to become unresponsive (I guess it's because it's waiting for "this flow run"'s state, but the flow is still running...)from prefect import flow, task, get_run_logger
from prefect.context import get_run_context
@task
def log_task(something):
logger = get_run_logger()
<http://logger.info|logger.info>(something)
@flow
def main():
flow_run_context = get_run_context()
# log_task(flow_run_context) # If I uncomment this, the flow does not complete
log_task("The flow is done")
main()
ctx = get_run_context()
some_task(some, other, params, ..., ctx.flow.name, ctx.flow_run.id, ctx.flow_run.name, ...)
But I don't necessarily want to pass many arguments this way.
I can get_run_context()
within the task gives the task run context, but I can't get the flow name and other info from that.flow_run_id
from the TaskRunContext. Then use that as input for Client to check the flow_run_name, flow_name, etc.
@task
def stuff():
flow_run_id = get_run_context().task_run.flow_run_id
async with prefect.client.orion.get_client() as client:
flow_run = await client.read_flow_run(flow_run_id)
I think this way it's possible to end up having to chain API calls.
Just wanted to confirm what the intention was when it was developed?Nate
10/31/2022, 2:04 PMBut I don't necessarily want to pass many arguments this way.out of curiosity, what qualms do you have with doing it this way?
Stéphan Taljaard
10/31/2022, 2:33 PMout of curiosity, what qualms do you have with doing it this way?Personal preference, I guess. I don't like "extracting" may variables, passing them as args, causing long lines of codes in two places
Nate
10/31/2022, 2:59 PMfrom prefect import flow, task
from prefect.context import get_run_context
from pydantic import BaseModel
from uuid import UUID
class ImportantFlowRunContext(BaseModel):
name: str
id: UUID
flow_id: UUID
@task
def task_needing_context(flow_context: ImportantFlowRunContext):
print(
f"This task is running in a flow run called {flow_context.name}, "
f"which has ID: {flow_context.id} "
f"and where the flow has ID: {flow_context.flow_id}"
)
@flow
def some_flow():
important_context = ImportantFlowRunContext(
**dict(get_run_context().flow_run)
)
task_needing_context(flow_context=important_context)
if __name__ == "__main__":
some_flow()
Stéphan Taljaard
10/31/2022, 3:28 PM