Hugo Slepicka

    Hugo Slepicka

    11 months ago
    Hi, I have a flow that was registered with a local Prefect server. If I invoke
    flow.run()
    , instead of it running on the registered server with the local agent it is running on my notebook. I could not find on the docs how to trigger the execution of the registered flow using the local server. Could you point me to an example or docs (which I likely missed)? What I am looking for is a way to execute the registered flow on my server via Python.
    Kevin Kho

    Kevin Kho

    11 months ago
    Hey @Hugo Slepicka, a couple of ways. You can do to the UI of server and trigger the Flow Run there. From your notebook, you can use the Prefect
    Client
    and do
    Client.create_flow_run
    . There are also two tasks for
    StartFlowRun
    and
    create_flow_run
    that can be found here
    There are Tasks but tasks can be invoked by calling the
    run()
    method.
    You can also look at how the call was made in the source code if you want.
    Hugo Slepicka

    Hugo Slepicka

    11 months ago
    So I would replace my:
    state = flow.run(executor=executor)
    with:
    from prefect.tasks.prefect.flow_run import StartFlowRun
    state = StartFlowRun().run(flow_name="Distgen->Astra&Impact", project_name="lume-orchestration")
    ?
    Kevin Kho

    Kevin Kho

    11 months ago
    That will work yeah
    Hugo Slepicka

    Hugo Slepicka

    11 months ago
    @Kevin Kho and how can I monitor it for finished status and fetch results back?
    The run succeeded with the StartFlowRun task… now I just have that last question about fetch state while it runs to wait or not for the task to finish and how to fetch results back. Thank you!
    Kevin Kho

    Kevin Kho

    11 months ago
    So that is two separate things. To wait for the state, you can do
    wait=True
    , for fetching results back, you need to use the GraphQL API. So a while ago I mentioned there is also a
    create_flow_run
    task. There are three tasks which go together.
    create_flow_run
    ,
    wait_for_flow_run
    and
    get_task_run_result
    . All are in the task library.
    create_flow_run
    and
    wait_for_flow_run
    were meant to decouple the
    StartFlowRun(…,wait=True)
    . Basically to fetch results you want the
    get_task_run_result
    And then you want to use 
    task.run()
     for these if they are outside of a flow
    Hugo Slepicka

    Hugo Slepicka

    11 months ago
    @Kevin Kho what is the task_slug?
    Kevin Kho

    Kevin Kho

    11 months ago
    The slug is the identifier cuz a task can be used multiple times so often there will be a number for multiple invocations. If you click inside the task there in the UI you might find the slug.
    Basically the identifier for the task that has the results you want to pull
    You can also specify it when you create the task. Docs
    Hugo Slepicka

    Hugo Slepicka

    11 months ago
    In the case of this screenshot the
    task_slug
    would be:
    brainy-ocelot
    and etc?
    Kevin Kho

    Kevin Kho

    11 months ago
    Yes I think those are the slugs
    Hugo Slepicka

    Hugo Slepicka

    11 months ago
    Thanks for the help. I ended up going with this route here:
    from prefect.backend import FlowRunView
    flow_run = FlowRunView.from_flow_run_id(state)
    all_tasks = flow_run.get_all_task_runs()
    g, a, i = [all_tasks[x].get_result() for x in range(3)]
    Here is what I was doing: https://github.com/hhslepicka/lume-prefect-demo/blob/main/lume-flow-server.ipynb Let me know if you have suggestions! Thanks again for the guidance.