Hey guys, I have a question regarding triggering ...
# prefect-community
b
Hey guys, I have a question regarding triggering
CloudFlowRunner
flows. Locally I have a flow setup and registered to the cloud backend. I can trigger the flow via the web-ui, however when running locally I get the following:
Copy code
>> cloud_flow = CloudFlowRunner(flow)
>> flow_state = cloud_flow.run(return_tasks=flow.tasks)
>> flow_state
[2020-07-14 09:37:58] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'Map Reduce'
<Failed: "Could not retrieve state from Prefect Cloud">
Am I missing something here?
j
Hi @Bernard Greyling when running your flow locally you should be using a local
FlowRunner
(mainly through calling `flow.run`) When running with a backend (core’s server or Cloud) you will want to use Agents to deploy your flow runs. In this case you might want to be using the Local Agent
b
Hi Josh. thanks for your response. I still don't follow. I have a kubernetes agent that can run the flow if scheduled via the web ui. Is there a straightforward way to trigger/schedule my local flow on that cluster?
j
The best way to schedule your local flow on that cluster would be to register it with the backend then start a flow run
b
That's the one! So what your'e saying is the following ?
Copy code
>> flow_id = flow.register('My Project')

>> state = prefect.core.client.create_flow_run(flow_id=flow_id)
>> state

<state object>
j
Ah here’s what you would want to do:
Copy code
flow_id = flow.register('My Project')

flow_run_id = prefect.core.client.create_flow_run(flow_id=flow_id)

# call this whenever to get current run info including state
result = prefect.core.client.get_flow_run_info(flow_run_id=flow_run_id)

result.state
b
Champion!
Hey @josh Put this aside for a while. Got it going with:
Copy code
from prefect.client.client import Client

# init client
client = Client()

# register flow
flow_id = flow.register('My Project')

# run and retrieve state
flow_run_id = client.create_flow_run(flow_id=flow_id)
result = client.get_flow_run_info(flow_run_id=flow_run_id)
result.state