https://prefect.io logo
#prefect-community
Title
# prefect-community
c

Christopher Stokes

09/20/2019, 9:13 PM
Hello folks. I feel like I'm missing something pretty basic. I have the following code that runs
CreateRidAlert
task twice and throws warnings:
Copy code
create_alert = CreateAlert()
create_rid_alert = CreateRidAlert()

with Flow('Alert Flow') as flow:
    alert_json = Parameter(name="alert_json", required=True)
    alert = create_alert(alert_json)
    rid_alert = create_rid_alert(alert)
    ifelse(has_rid(alert), rid_alert, None)
    rid = extract_rid(rid_alert)
I'd like to remove the
rid_alert = create_rid_alert(alert)
line but then I don't know how to wire up the
ifelse
line with a result to pass to
extract_rid
for data flow. This may not be a clear question.
I'm trying to make a conditional fork in the DAG between alerts with rids and those without
c

Chris White

09/20/2019, 9:29 PM
Hi @Christopher Stokes! The warnings should go away in the latest release of Prefect (0.6.5); however, the double run definitely surprises me. Any chance you could rewrite the structure of your Flow with some “dummy” print tasks so I can recreate on my machine?
👍 1
c

Christopher Stokes

09/20/2019, 9:47 PM
Hmm. This appears to be user error. The warnings claim upstream tasks are run regardless and the viz alludes to that but print statements show otherwise
Copy code
@task
def create_alert(input_param: str) -> str:
    print(f'created alert with param {input_param}')
    return input_param

@task
def always_true(alert: str):
    print('always_true called in ifelse')
    return True

@task
def enrich_alert(alert: str) -> str:
    print(f'enriching_alert {alert}')
    return f'{alert} enriched!'

@task
def post_alert(alert: str) -> None:
    print(f'posting {alert}!')


with Flow('Example flow') as flow:
    input_param = Parameter(name="input_param", required=True)
    alert = create_alert(input_param)
    enriched_alert = enrich_alert(alert)
    ifelse(always_true(alert), enriched_alert, None)
    post_alert(enriched_alert)
state = flow.run(parameters={'input_param': 'test-alert'})
flow.visualize(flow_state=state)
so I think I was just thrown by the warning:
Copy code
prefect/tasks/control_flow/conditional.py:89: PrefectWarning: One of the tasks passed to the switch condition has upstream dependencies: <Task: enrich_alert>. Those upstream tasks could run even if the switch condition fails, which might cause unexpected results.
  prefect.utilities.exceptions.PrefectWarning,
apologies for wasting your time on a Friday!
j

Jeremiah

09/20/2019, 9:49 PM
Yeah, that warning is from about a year ago when we thought “Airflow-style” branching would be predominant, even though Prefect supports the style of branching you’re doing (which is more like “gating”). We removed it about a week ago 🙂
No worries at all, we’re glad to hear how you’re approaching Prefect and it helps us adjust it to make it more easy to use!
c

Chris White

09/20/2019, 9:51 PM
Yea no worries! Glad you were able to figure it out
c

Christopher Stokes

09/20/2019, 9:58 PM
Bumped version from
0.6.4
to
0.6.5
. No more warnings! Appreciated
👊 1
💯 2