Greg Desmarais
07/28/2020, 8:04 PM.prefect/config.toml
debug = "true"
backend = "server"
[server]
[server.ui]
graphql_url = "<http://10.72.112.29:4200/graphql>"
[server.database]
host = "<http://prefect-gdesmarais.cluster-cyx6dhev9iel.us-east-1.rds.amazonaws.com|prefect-gdesmarais.cluster-cyx6dhev9iel.us-east-1.rds.amazonaws.com>"
port = 5432
username = "postgres"
password = "prefect1"
connection_url = "<postgresql://postgres:prefect1@prefect-gdesmarais.cluster-cyx6dhev9iel.us-east-1.rds.amazonaws.com:5432/prefect_gdesmarais>"
[flows]
checkpointing = "true"
[flows.defaults]
[flows.defaults.storage]
default_class = "prefect.engine.results.PrefectResult"
[tasks]
checkpointing = "true"
export PREFECT__FLOWS__CHECKPOINTING=true
export PREFECT__TASKS__CHECKPOINTING=true
export PREFECT__FLOWS__DEFAULTS__STORAGE__DEFAULT_CLASS=prefect.engine.results.PrefectResult
prefect server start
import prefect
from prefect import Flow, task, Client
from prefect.engine.executors import DaskExecutor
from prefect.engine.results import PrefectResult
from prefect.environments import LocalEnvironment
from prefect.environments.storage import S3
from rightsize.distributed.scheduler import PREFECT_COMPOSE_HOST
prefect.context.config.cloud.graphql = '<http://10.72.112.29:4200/graphql>'
@task(log_stdout=True, result=PrefectResult())
def say_hello():
return f'done'
with Flow("Dask ECS Test 2", result=PrefectResult()) as flow:
say_hello()
bucket = 'celsius-temp-data'
key = 'datasciences/prefect_flows/dask_ecs_flow_test_2'
flow.storage = S3(bucket, key=key)
executor = DaskExecutor(address=f'{PREFECT_COMPOSE_HOST}:38786')
flow.environment = LocalEnvironment(executor=executor)
flow_id = flow.register()
# I'd love for the flow to be populated with more info - e.g. the version (not sure what else)
print(f'Registered flow id: {flow_id}')
p_client = Client()
ret = p_client.create_flow_run(flow_id=flow_id)
print(f'Created flow run: {ret}')
p_client.get_flow_run_info(ret)
, but I still don't see results in there.Julian
07/28/2020, 8:53 PMChris White
serialized_state
of your task runs via the GraphQL API - you’ll see a location
attribute that stores the location of the task run (which in the case of a PrefectResult
will be the JSON representation of the result)Greg Desmarais
07/28/2020, 9:30 PMconfig.toml
correct? Which makes me think my environment variables are complete duplicate?info = p_client.get_flow_run_info(ret)
with ret
being my flow run id obtained from p_client.create_flow_run(flow_id=flow_id)
, and flow_id
from flow_id = flow.register()
, I see all sorts of info, but no mention of results (other than blanks) and no mention of `serialized_state`or `location`(from my quick scan). I know I'm getting info on the flow run itself, but I hoped it would let me drill down into the tasks.Chris White
from prefect import Client; c = Client()
data = c.get_flow_run_info("your-flow-run-id")
locations = [tr.state._result.location for tr in data.task_runs]
Greg Desmarais
07/29/2020, 3:46 AMChris White