Greg Desmarais
07/28/2020, 8:04 PMGreg Desmarais
07/28/2020, 8:05 PM.prefect/config.toml
debug = "true"
backend = "server"
[server]
[server.ui]
graphql_url = "<http://10.72.112.29:4200/graphql>"
[server.database]
host = "<http://prefect-gdesmarais.cluster-cyx6dhev9iel.us-east-1.rds.amazonaws.com|prefect-gdesmarais.cluster-cyx6dhev9iel.us-east-1.rds.amazonaws.com>"
port = 5432
username = "postgres"
password = "prefect1"
connection_url = "<postgresql://postgres:prefect1@prefect-gdesmarais.cluster-cyx6dhev9iel.us-east-1.rds.amazonaws.com:5432/prefect_gdesmarais>"
[flows]
checkpointing = "true"
[flows.defaults]
[flows.defaults.storage]
default_class = "prefect.engine.results.PrefectResult"
[tasks]
checkpointing = "true"
Greg Desmarais
07/28/2020, 8:05 PMexport PREFECT__FLOWS__CHECKPOINTING=true
export PREFECT__TASKS__CHECKPOINTING=true
export PREFECT__FLOWS__DEFAULTS__STORAGE__DEFAULT_CLASS=prefect.engine.results.PrefectResult
prefect server start
Greg Desmarais
07/28/2020, 8:06 PMimport prefect
from prefect import Flow, task, Client
from prefect.engine.executors import DaskExecutor
from prefect.engine.results import PrefectResult
from prefect.environments import LocalEnvironment
from prefect.environments.storage import S3
from rightsize.distributed.scheduler import PREFECT_COMPOSE_HOST
prefect.context.config.cloud.graphql = '<http://10.72.112.29:4200/graphql>'
@task(log_stdout=True, result=PrefectResult())
def say_hello():
return f'done'
with Flow("Dask ECS Test 2", result=PrefectResult()) as flow:
say_hello()
bucket = 'celsius-temp-data'
key = 'datasciences/prefect_flows/dask_ecs_flow_test_2'
flow.storage = S3(bucket, key=key)
executor = DaskExecutor(address=f'{PREFECT_COMPOSE_HOST}:38786')
flow.environment = LocalEnvironment(executor=executor)
flow_id = flow.register()
# I'd love for the flow to be populated with more info - e.g. the version (not sure what else)
print(f'Registered flow id: {flow_id}')
p_client = Client()
ret = p_client.create_flow_run(flow_id=flow_id)
print(f'Created flow run: {ret}')
Greg Desmarais
07/28/2020, 8:10 PMGreg Desmarais
07/28/2020, 8:10 PMGreg Desmarais
07/28/2020, 8:11 PMp_client.get_flow_run_info(ret)
, but I still don't see results in there.Julian
07/28/2020, 8:53 PMJulian
07/28/2020, 8:56 PMJulian
07/28/2020, 8:58 PMChris White
serialized_state
of your task runs via the GraphQL API - you’ll see a location
attribute that stores the location of the task run (which in the case of a PrefectResult
will be the JSON representation of the result)Greg Desmarais
07/28/2020, 9:30 PMGreg Desmarais
07/28/2020, 9:30 PMGreg Desmarais
07/28/2020, 9:31 PMGreg Desmarais
07/28/2020, 9:32 PMGreg Desmarais
07/28/2020, 9:32 PMconfig.toml
correct? Which makes me think my environment variables are complete duplicate?Greg Desmarais
07/28/2020, 9:35 PMGreg Desmarais
07/28/2020, 9:35 PMGreg Desmarais
07/28/2020, 9:42 PMinfo = p_client.get_flow_run_info(ret)
with ret
being my flow run id obtained from p_client.create_flow_run(flow_id=flow_id)
, and flow_id
from flow_id = flow.register()
, I see all sorts of info, but no mention of results (other than blanks) and no mention of `serialized_state`or `location`(from my quick scan). I know I'm getting info on the flow run itself, but I hoped it would let me drill down into the tasks.Chris White
from prefect import Client; c = Client()
data = c.get_flow_run_info("your-flow-run-id")
locations = [tr.state._result.location for tr in data.task_runs]
Greg Desmarais
07/29/2020, 3:46 AMGreg Desmarais
07/29/2020, 3:47 AMChris White