Noam polak
11/09/2021, 3:52 PMwith Flow(
flow_name,
run_config=run_config_provider(flow_name),
result=results_provider(os.environ["GCP_PROJECT"]),
storage=storage_provider(flow_name),
) as flow:
portfolio = fetch_portfolio(portfolio_uuid)
parser_results, results_log, csv_log, *input_row_size* = run_parser(
portfolio, excel_bucket_url, workflow_id
)
.... more tasks...
# run the next flow
with case(parser_results, "completed"):
start_flow_run = StartFlowRun(
flow_name=automation_flow_name,
project_name="project_name",
)
start_flow_run(
upstream_tasks=[updated, *input_row_size*],
parameters={
"portfolio_uuid": portfolio_uuid,
"input_row_size": *input_row_size*,
},
)
portfolio_uuid = string
input_row_size = int
When I run the flow it fails when attempting to run the new flow
here is the error message (in the message thread):
What did I do that was wrong?Kevin Kho
Noam polak
11/09/2021, 3:55 PMNoam polak
11/09/2021, 3:56 PMKevin Kho
@task(log_stdout=True)
def get_types(portfolio_uuid, input_row_size):
print(type(portfolio_uuid))
print(type(input_row_size))
return
Noam polak
11/09/2021, 4:03 PMNoam polak
11/09/2021, 4:25 PMNoam polak
11/09/2021, 4:25 PMNoam polak
11/09/2021, 4:29 PMNoam polak
11/09/2021, 4:29 PM