Noam polak
12/13/2021, 7:31 AMAnna Geller
Noam polak
12/13/2021, 9:22 AMAnna Geller
Noam polak
12/13/2021, 9:24 AMNoam polak
12/13/2021, 9:24 AMNoam polak
12/13/2021, 9:26 AMstart_flow_run = StartFlowRun(flow_name=FS_FQ_FLOW, project_name="default")
internal_input_data_updated = update_internal_input_data(internal_input_data)
flow.set_dependencies(start_flow_run, [internal_input_data_updated, exists, run_config])
start_flow_run(
upstream_tasks=[company_ex_id, exists, internal_input_data_updated],
parameters={
"input_data": input_data,
"internal_input_data": internal_input_data_updated,
"request_id": request_id,
"fq_run_id": fq_run_id,
},
run_config=run_config,
)
Noam polak
12/13/2021, 9:29 AMwith Flow(
"flow_name",
run_config=KubernetesRun(
job_template_path="flows_path/kubernetes_job_template.yaml",
image=f"<http://gcr.io/some_project:{DOCKER_IMAGE_TAG}|gcr.io/some_project:{DOCKER_IMAGE_TAG}>"
),
storage=GCS(bucket=BUCKET_NAME),
state_handlers=[post_to_handler, update_state_handler],
terminal_state_handler=require_success_terminal_state_handler,
) as flow:
input_data = Parameter("input_data", required=True)
internal_input_data = Parameter("internal_input_data", required=True)
fq_run_id = Parameter("fq_run_id", required=True)
request_id = Parameter("request_id", required=True)
notification_recipient = Parameter(
"notification_recipient", default=None
)
currency = input_data["key"]
# flow tasks....
Anna Geller
flow.set_dependencies(start_flow_run, [internal_input_data_updated, exists, run_config])
#2: Here you’re calling it correctly
start_flow_run(
upstream_tasks=[company_ex_id, exists, internal_input_data_updated],
parameters={
"input_data": input_data,
"internal_input_data": internal_input_data_updated,
"request_id": request_id,
"fq_run_id": fq_run_id,
},
run_config=run_config,
)
What would be better and easier is when you instantiate the StartFlowRun
class before the Flow constructor:
start_flow_run = StartFlowRun(flow_name=FS_FQ_FLOW, project_name="default")
with Flow(FLOW_NAME) as flow:
internal_input_data_updated = update_internal_input_data(internal_input_data)
start_flow_run_task = start_flow_run(
upstream_tasks=[company_ex_id, exists, run_config, internal_input_data_updated],
parameters={
"input_data": input_data,
"internal_input_data": internal_input_data_updated,
"request_id": request_id,
"fq_run_id": fq_run_id,
},
run_config=run_config,
)
Alternatively, you can replace StartFlowRun with `create_flow_run`:
from prefect.tasks.prefect import create_flow_run
with Flow(FLOW_NAME) as flow:
internal_input_data_updated = update_internal_input_data(internal_input_data)
start_flow_run_task = create_flow_run(flow_name=FS_FQ_FLOW, project_name="default",
upstream_tasks=[company_ex_id, exists, run_config, internal_input_data_updated],
parameters={
"input_data": input_data,
"internal_input_data": internal_input_data_updated,
"request_id": request_id,
"fq_run_id": fq_run_id,
},
run_config=run_config,
)
Anna Geller
Noam polak
12/13/2021, 10:16 AM