g

    Greg Desmarais

    2 years ago
    I'm working on getting Results to be available for registered flows. I have a server deployed that is executing my flows nicely with a Dask cluster, and can run those flows when they are not registered. When I register a flow and create a flow run, I don't seem to be able to get a hold of any Results.
    My server has the following
    .prefect/config.toml
    debug = "true"
    backend = "server"
    [server]
      [server.ui]
        graphql_url = "<http://10.72.112.29:4200/graphql>"
      [server.database]
        host = "<http://prefect-gdesmarais.cluster-cyx6dhev9iel.us-east-1.rds.amazonaws.com|prefect-gdesmarais.cluster-cyx6dhev9iel.us-east-1.rds.amazonaws.com>"
        port = 5432
        username = "postgres"
        password = "prefect1"
        connection_url = "<postgresql://postgres:prefect1@prefect-gdesmarais.cluster-cyx6dhev9iel.us-east-1.rds.amazonaws.com:5432/prefect_gdesmarais>"
    [flows]
      checkpointing = "true"
      [flows.defaults]
        [flows.defaults.storage]
          default_class = "prefect.engine.results.PrefectResult"
    [tasks]
      checkpointing = "true"
    My startup script for the server is:
    export PREFECT__FLOWS__CHECKPOINTING=true
    export PREFECT__TASKS__CHECKPOINTING=true
    export PREFECT__FLOWS__DEFAULTS__STORAGE__DEFAULT_CLASS=prefect.engine.results.PrefectResult
    
    prefect server start
    My test flow, issued from my client is:
    import prefect
    
    from prefect import Flow, task, Client
    from prefect.engine.executors import DaskExecutor
    from prefect.engine.results import PrefectResult
    from prefect.environments import LocalEnvironment
    from prefect.environments.storage import S3
    from rightsize.distributed.scheduler import PREFECT_COMPOSE_HOST
    
    prefect.context.config.cloud.graphql = '<http://10.72.112.29:4200/graphql>'
    
    
    @task(log_stdout=True, result=PrefectResult())
    def say_hello():
        return f'done'
    
    
    with Flow("Dask ECS Test 2", result=PrefectResult()) as flow:
        say_hello()
    
    bucket = 'celsius-temp-data'
    key = 'datasciences/prefect_flows/dask_ecs_flow_test_2'
    flow.storage = S3(bucket, key=key)
    executor = DaskExecutor(address=f'{PREFECT_COMPOSE_HOST}:38786')
    flow.environment = LocalEnvironment(executor=executor)
    flow_id = flow.register()
    # I'd love for the flow to be populated with more info - e.g. the version (not sure what else)
    print(f'Registered flow id: {flow_id}')
    
    p_client = Client()
    ret = p_client.create_flow_run(flow_id=flow_id)
    print(f'Created flow run: {ret}')
    When I go into the UI for the run, I don't see the results in the task, and I don't know how I would get those results back programmatically.
    Am I missing something obvious?
    I can do things like
    p_client.get_flow_run_info(ret)
    , but I still don't see results in there.
    j

    Julian

    2 years ago
    you should see the output in the dask-worker logs
    when you use a localExecutor you should see 'done' on the agent
    also you can pass the output of a task to a subsequent task. and you can use a logger to log intermediate results, which are then readable from ui
    Chris White

    Chris White

    2 years ago
    Hi Greg - the server UI is currently quite out-of-date; you can query for task results by querying for the
    serialized_state
    of your task runs via the GraphQL API - you’ll see a
    location
    attribute that stores the location of the task run (which in the case of a
    PrefectResult
    will be the JSON representation of the result)
    g

    Greg Desmarais

    2 years ago
    hrm...ok. Any chance there is any sample code on that @Chris White?
    Cause it seems like a long way to go...
    @Julian - thank for the response. In my case I'm trying to retrieve results from various tasks in the flow from a client script.
    @Chris White - any comment on which parts of my code snippets are unneeded? For example, if I set a result= on my flow, the tasks should default to that (unless they override it) right?
    And are my settings in my
    config.toml
    correct? Which makes me think my environment variables are complete duplicate?
    And @Chris White - the UI is secondary - what I really want to be able to do is run a registered flow from a client and block until it is done, then return the results.
    (btw - the help is appreciated - you guys must be slammed)
    When I run
    info = p_client.get_flow_run_info(ret)
    with
    ret
    being my flow run id obtained from
    p_client.create_flow_run(flow_id=flow_id)
    , and
    flow_id
    from
    flow_id = flow.register()
    , I see all sorts of info, but no mention of results (other than blanks) and no mention of serialized_stateor location(from my quick scan). I know I'm getting info on the flow run itself, but I hoped it would let me drill down into the tasks.
    Chris White

    Chris White

    2 years ago
    yea, Flow-level results will become the default result type for all tasks (unless you override it). You could do:
    from prefect import Client; c = Client()
    
    data = c.get_flow_run_info("your-flow-run-id")
    locations = [tr.state._result.location for tr in data.task_runs]
    g

    Greg Desmarais

    2 years ago
    Hey - cool...I didn't dig down that particular path. Looks like it worked for a PrefectResult - I can explore other result types. I feel like a Redis type service with timeouts on the objects might be good for small results, S3 for big results. Maybe a result class that can decide based on some characteristic of the result itself.
    Any thoughts on essentially 'joining' the flow like a thread? Many systems support a callback or something when a flow is done, which in many cases is just a polling process. I am going to implement a poll on a join type method and wait for the flow to reach some terminal state, but if you have a better idea, I'm all ears.
    Chris White

    Chris White

    2 years ago
    Not sure what you mean but Flows can have state handlers that are called on every state change that you can use to perform logic when the flow enters certain types of states