Christopher Stokes
09/20/2019, 9:13 PMCreateRidAlert
task twice and throws warnings:
create_alert = CreateAlert()
create_rid_alert = CreateRidAlert()
with Flow('Alert Flow') as flow:
alert_json = Parameter(name="alert_json", required=True)
alert = create_alert(alert_json)
rid_alert = create_rid_alert(alert)
ifelse(has_rid(alert), rid_alert, None)
rid = extract_rid(rid_alert)
I'd like to remove the rid_alert = create_rid_alert(alert)
line but then I don't know how to wire up the ifelse
line with a result to pass to extract_rid
for data flow. This may not be a clear question.Chris White
09/20/2019, 9:29 PMChristopher Stokes
09/20/2019, 9:47 PM@task
def create_alert(input_param: str) -> str:
print(f'created alert with param {input_param}')
return input_param
@task
def always_true(alert: str):
print('always_true called in ifelse')
return True
@task
def enrich_alert(alert: str) -> str:
print(f'enriching_alert {alert}')
return f'{alert} enriched!'
@task
def post_alert(alert: str) -> None:
print(f'posting {alert}!')
with Flow('Example flow') as flow:
input_param = Parameter(name="input_param", required=True)
alert = create_alert(input_param)
enriched_alert = enrich_alert(alert)
ifelse(always_true(alert), enriched_alert, None)
post_alert(enriched_alert)
state = flow.run(parameters={'input_param': 'test-alert'})
flow.visualize(flow_state=state)
prefect/tasks/control_flow/conditional.py:89: PrefectWarning: One of the tasks passed to the switch condition has upstream dependencies: <Task: enrich_alert>. Those upstream tasks could run even if the switch condition fails, which might cause unexpected results.
prefect.utilities.exceptions.PrefectWarning,
Jeremiah
09/20/2019, 9:49 PMChris White
09/20/2019, 9:51 PMChristopher Stokes
09/20/2019, 9:58 PM0.6.4
to 0.6.5
. No more warnings! Appreciated