I'm working on getting Results to be available for...
# prefect-server
g
I'm working on getting Results to be available for registered flows. I have a server deployed that is executing my flows nicely with a Dask cluster, and can run those flows when they are not registered. When I register a flow and create a flow run, I don't seem to be able to get a hold of any Results.
My server has the following
.prefect/config.toml
Copy code
debug = "true"
backend = "server"
[server]
  [server.ui]
    graphql_url = "<http://10.72.112.29:4200/graphql>"
  [server.database]
    host = "<http://prefect-gdesmarais.cluster-cyx6dhev9iel.us-east-1.rds.amazonaws.com|prefect-gdesmarais.cluster-cyx6dhev9iel.us-east-1.rds.amazonaws.com>"
    port = 5432
    username = "postgres"
    password = "prefect1"
    connection_url = "<postgresql://postgres:prefect1@prefect-gdesmarais.cluster-cyx6dhev9iel.us-east-1.rds.amazonaws.com:5432/prefect_gdesmarais>"
[flows]
  checkpointing = "true"
  [flows.defaults]
    [flows.defaults.storage]
      default_class = "prefect.engine.results.PrefectResult"
[tasks]
  checkpointing = "true"
My startup script for the server is:
Copy code
export PREFECT__FLOWS__CHECKPOINTING=true
export PREFECT__TASKS__CHECKPOINTING=true
export PREFECT__FLOWS__DEFAULTS__STORAGE__DEFAULT_CLASS=prefect.engine.results.PrefectResult

prefect server start
My test flow, issued from my client is:
Copy code
import prefect

from prefect import Flow, task, Client
from prefect.engine.executors import DaskExecutor
from prefect.engine.results import PrefectResult
from prefect.environments import LocalEnvironment
from prefect.environments.storage import S3
from rightsize.distributed.scheduler import PREFECT_COMPOSE_HOST

prefect.context.config.cloud.graphql = '<http://10.72.112.29:4200/graphql>'


@task(log_stdout=True, result=PrefectResult())
def say_hello():
    return f'done'


with Flow("Dask ECS Test 2", result=PrefectResult()) as flow:
    say_hello()

bucket = 'celsius-temp-data'
key = 'datasciences/prefect_flows/dask_ecs_flow_test_2'
flow.storage = S3(bucket, key=key)
executor = DaskExecutor(address=f'{PREFECT_COMPOSE_HOST}:38786')
flow.environment = LocalEnvironment(executor=executor)
flow_id = flow.register()
# I'd love for the flow to be populated with more info - e.g. the version (not sure what else)
print(f'Registered flow id: {flow_id}')

p_client = Client()
ret = p_client.create_flow_run(flow_id=flow_id)
print(f'Created flow run: {ret}')
When I go into the UI for the run, I don't see the results in the task, and I don't know how I would get those results back programmatically.
Am I missing something obvious?
I can do things like
p_client.get_flow_run_info(ret)
, but I still don't see results in there.
j
you should see the output in the dask-worker logs
when you use a localExecutor you should see 'done' on the agent
also you can pass the output of a task to a subsequent task. and you can use a logger to log intermediate results, which are then readable from ui
c
Hi Greg - the server UI is currently quite out-of-date; you can query for task results by querying for the
serialized_state
of your task runs via the GraphQL API - you’ll see a
location
attribute that stores the location of the task run (which in the case of a
PrefectResult
will be the JSON representation of the result)
g
hrm...ok. Any chance there is any sample code on that @Chris White?
Cause it seems like a long way to go...
@Julian - thank for the response. In my case I'm trying to retrieve results from various tasks in the flow from a client script.
@Chris White - any comment on which parts of my code snippets are unneeded? For example, if I set a result= on my flow, the tasks should default to that (unless they override it) right?
And are my settings in my
config.toml
correct? Which makes me think my environment variables are complete duplicate?
And @Chris White - the UI is secondary - what I really want to be able to do is run a registered flow from a client and block until it is done, then return the results.
(btw - the help is appreciated - you guys must be slammed)
When I run
info = p_client.get_flow_run_info(ret)
with
ret
being my flow run id obtained from
p_client.create_flow_run(flow_id=flow_id)
, and
flow_id
from
flow_id = flow.register()
, I see all sorts of info, but no mention of results (other than blanks) and no mention of `serialized_state`or `location`(from my quick scan). I know I'm getting info on the flow run itself, but I hoped it would let me drill down into the tasks.
c
yea, Flow-level results will become the default result type for all tasks (unless you override it). You could do:
Copy code
from prefect import Client; c = Client()

data = c.get_flow_run_info("your-flow-run-id")
locations = [tr.state._result.location for tr in data.task_runs]
g
Hey - cool...I didn't dig down that particular path. Looks like it worked for a PrefectResult - I can explore other result types. I feel like a Redis type service with timeouts on the objects might be good for small results, S3 for big results. Maybe a result class that can decide based on some characteristic of the result itself.
Any thoughts on essentially 'joining' the flow like a thread? Many systems support a callback or something when a flow is done, which in many cases is just a polling process. I am going to implement a poll on a join type method and wait for the flow to reach some terminal state, but if you have a better idea, I'm all ears.
c
Not sure what you mean but Flows can have state handlers that are called on every state change that you can use to perform logic when the flow enters certain types of states