Sergiy Krutsenko
12/18/2020, 5:32 PMFlow_a: Created 3 chunks
Flow_b: Got result from previous flow None
import prefect as pf
from prefect import Client, task, Parameter, Flow
from prefect.engine.results import LocalResult
from prefect.environments.storage import Local
from prefect.utilities.configuration import set_temporary_config
from prefect.tasks.prefect import FlowRunTask
from prefect.environments import LocalEnvironment
import socket
def get_server_config(server, port):
return {
"cloud.api": "http://{}:{}".format(server, port),
"cloud.graphql": "http://{}:{}/graphql".format(server, port),
"backend": "server",
}
def get_logger():
return pf.context.get('logger')
@task()
def create_chunks(inputs):
logger = get_logger()
chunks = ['a', 'b', 'c']
<http://logger.info|logger.info>('Created %d chunks', len(chunks))
return chunks
@task()
def accept_results(result):
logger = get_logger()
<http://logger.info|logger.info>('Got result from previous flow %s', result)
return result
def main():
hostname = socket.gethostname()
labels = [hostname]
env = LocalEnvironment(labels=labels)
with set_temporary_config(get_server_config('xxx', '4200')):
with Flow('flow_a', storage=Local(), environment=env,
result=LocalResult(dir='c:/temp/flows/flow_a',
location='{flow_run_id}_{task_name}_{map_index}.txt')
) as flow_a:
inputs = Parameter('inputs', required=True)
create_chunks(inputs)
with Flow('flow_b', storage=Local(), environment=env,
result=LocalResult(dir='c:/temp/flows/flow_b',
location='{flow_run_id}_{task_name}_{map_index}.txt')
) as flow_b:
result = Parameter('result', required=True)
accept_results(result)
fa = FlowRunTask(flow_name="flow_a", project_name="Test", wait=True)
fb = FlowRunTask(flow_name="flow_b", project_name="Test", wait=True)
with Flow('flow_c', storage=Local(), environment=env,
result=LocalResult(dir='c:/temp/flows/flow_c',
location='{flow_run_id}_{task_name}_{map_index}.txt')
) as flow_c:
a = fa(parameters={'inputs': {}})
b = fb(upstream_tasks=[a], parameters={'result': a.result})
client = Client()
id_a = client.register(flow_a, project_name="Test")
id_b = client.register(flow_b, project_name="Test")
id_c = client.register(flow_c, project_name="Test")
client.create_flow_run(flow_id=id_c)
if __name__ == "__main__":
main()
nicholas
Sergiy Krutsenko
12/18/2020, 6:50 PMnicholas
Sergiy Krutsenko
12/18/2020, 9:14 PMwith Flow('flow_c') as flow:
a = flow_a(parameters={...})
b = flow_b(upstream_tasks=[a], parameters={"p":a.result})
nicholas
flow_a
? If you wouldn't mind sharing your example code as you're working with it now, that'd be helpful 🙂Sergiy Krutsenko
12/18/2020, 9:48 PMwith Flow('flow_a', storage=Local(), result=LocalResult(dir='c:/temp/flow_a', location='{flow_run_id}_{task_name}_{map_index}.txt')) as flow: input = Parameter('input_params', required=True)
create_chunks(input)
nicholas
Sergiy Krutsenko
12/21/2020, 3:04 PMnicholas
Sergiy Krutsenko
12/23/2020, 3:45 PMnicholas
wait
kwarg on the StartFlowRunTask
?Sergiy Krutsenko
12/23/2020, 5:12 PMnicholas
Sergiy Krutsenko
12/23/2020, 6:24 PMnicholas
StartFlowRun
task returns the flow run id, you can use that in a downstream task to fetch results:
from prefect import task, Flow
from prefect.tasks.prefect import StartFlowRun
@task
def get_id(input):
# State message for this task are returned in the format:
# <<flow run id>> finished in state << state >>
# so we can grab the id easily from that message
id = input.state.message.split(" ", 1)[0]
print(id)
return id
with Flow("Orchestrator") as flow_c:
a = StartFlowRun(
project_name="<<project>>",
wait=True,
)(flow_name="flow_a")
get_id(a)
# output:
# d2605193-3eb3-45ef-9976-dcbe7446186ee
nicholas
get_id
task on any StartFlowRun
tasks to get the flow run ids you need, and you can access the state itself for any sort of conditional logic (maybe if the run failed you don't want to access any results from it)Sergiy Krutsenko
12/24/2020, 2:38 PMnicholas