Hey All, I'm setting up a new instance of Prefect ...
# ask-community
t
Hey All, I'm setting up a new instance of Prefect and I seem to be misunderstanding something about flows. I've got a bunch of methods with @task decorators and then I set them up in what should be a single a flow. When I load this into Prefect though I get two split DAGs that seem to function independently. Flow setup here and picture of the result attached.
Copy code
with Flow(flow_name) as flow:
    <http://logger.info|logger.info>(f"{flow_name} Task Flow initiated, running in {file_path}")
    df = pull_oracle_data_via(oracle_query_sql=oracle_query_sql_path,prod=use_oracle_prod)
    df = set_data_types(df)
    upload_to_table(df, destination_table = data_destination_table_name)

    if summary_view_name is not None and history_table_name is not None:      
        <http://logger.info|logger.info>("Initiating history upload process.")
        summary_df,summary_data_empty = pull_summary_data_via(sql=f"SELECT * FROM {summary_view_name}")
        if summary_data_empty:
            delete_today_from_history_if_exists(df=df,history_table=history_table_name)
            upload_to_history_table(df=summary_df, destination_table=history_table_name, append=True)
    else:
        <http://logger.info|logger.info>("Skipping summary view run: summary view name and/or history table name missing.")
To address this I tried to make the dependencies explicit, by adding "upstream_task" flags to two of the above lines as so:
summary_df,summary_data_empty = pull_summary_data_via(_upstream_tasks_=[upload_to_table],_sql_=_f_"SELECT * FROM {summary_view_name}")
delete_today_from_history_if_exists(_upstream_tasks_=[pull_summary_data_via],_df_=df,_history_table_=history_table_name)
This doesn't seem to fix the issue though; when I run the flow, later tasks still seem to initiate before the Oracle pull, which should occur before everything. Anyone see what I'm doing wrong? The documentation would seem to indicate that feeding result data from one task to another would make dependencies work correctly, but that doesn't seem to be happening here.
k
A couple of things that might help. You can assign the task instance to a variable like this:
Copy code
with Flow(...) as flow:
    a = task_a()
    b = task_b(upstream_tasks=[a])
right now I think you are doing:
Copy code
b = task_b(upstream_tasks=[task_a])
which can work if there is only one instance of task_a. Then also you can’t really use the Python if on task output. You need to use the
case
task. The
if
evaluates during the build time while the
case
evaluates during run time. You can use
if
in the Flow block if you are using it on a variable that isn’t a task
t
Thanks, that's helpful. If I assign a task to a variable as above, do I then also call it separately to pass in arguments? The example at https://docs.prefect.io/core/tutorial/02-etl-flow.html#second-step had led me to believe I just called the functions as I was; do I just add the above lines or do they change my structure here? And I hadn't heard of the case task yet, I'll go research that, thanks!
k
No need to call separately:
Copy code
with Flow(...) as flow:
    a = task_a(x, y, z)
    b = task_b(t, u, v, upstream_tasks=[a])
t
Great! I'll add that then, thanks! On the Case instances, is there a way to execute a check on the return object? I'm trying to proceed only if my dataframe isn't empty, but the case function compares to a value; looks like I wouldn't be able to execute a function on the result there. Is there another way?
Also, one more on the flow, in the above task_a doesn't seem to be passing a result as input to task_b, does that happen automatically here? That automatic passing of data between is what I was using from the link above.
k
You need to use a task but you can also do:
Copy code
with case(task(lambda df: (df.shape > 0))(df), True):
    ...
👍 1
If it passes data, you shouldn’t need the
upstream_tasks
. Will look again at your code
I think you are running into problems with the
if
in the DAG construction
t
That's my confusion. And I'd note one additional difficulty; in one case here I'm uploading data to a DB, and then downloading from a view based on that data. In that instance the dependency isn't visible to the code so I probably do need to create that link manually, but it should be only on that step I think.
@Kevin Kho, finally got to my question here. First, in the above, it struck me that all I was really trying to accomplish with the second if was to stop on failure, so I added an assert statement in the underlying function to cause it to fail on that condition and removed the
if
. Think that simplifies it nicely, and I believe fits in better to the prefect paradigm (assuming I'm understanding it better). That means my code is now this:
Copy code
with Flow(flow_name) as flow:
    <http://logger.info|logger.info>(f"{flow_name} Task Flow initiated, running in {file_path}")
    df = pull_oracle_data_via(oracle_query_sql=oracle_query_sql_path,prod=use_oracle_prod)
    df = set_data_types(df)
    upload_to_table(df, destination_table = data_destination_table_name)
    
    <http://logger.info|logger.info>("Initiating history upload process.")
    summary_df = pull_summary_data_via(upstream_tasks=[upload_to_table],sql=f"SELECT * FROM {summary_view_name}")
    delete_today_from_history_if_exists(upstream_tasks=[pull_summary_data_via],df=df,history_table=history_table_name)
    upload_to_history_table(df=summary_df, destination_table=history_table_name, append=True)
When I load it though, the schematic that results is the one attached. Oddly it seems to create a separate, parallel path that executes immediately (and thus fails) based on the dependency on
upload_to_table
rather than putting those tasks later in the first process. This means that instead of waiting until that point in the process to begin, the flow begins both paths simultaneously. Do you see what I'm doing wrong here?
k
Yeah this will work if you store the reference to a task:
Copy code
with Flow(flow_name) as flow:
    u = upload_to_table(df, destination_table = data_destination_table_name)
    ...
     summary_df = pull_summary_data_via(upstream_tasks=[u],sql=f"SELECT * FROM {summary_view_name}")
Calling a task in the flow block creates a copy (you can use tasks multiple times) so you want to point it to the instance.
t
Oh, I see! And presumably then, even when I'm getting real output from a task, I can use that output as the upstream_task too? as in
_upstream_tasks_=[summary_df]
in the above? That does indeed seem to work, thanks!
k
You can denote that as upstream even if there is not data dependency yep
t
Picture attached; success! Thanks so much! Think my confusion was that when a function doesn't return something, I should still assign it to a variable and use that variable as the upstream task, but when a function does return a value, I could use that return value in the same way. I didn't get that at first; kinda cool.
👍 1
@Kevin Kho Follow-up question on this. The above works for me but required some tweaks/changes, particularly removing my
if
statements. Then I saw https://www.prefect.io/orion/ which would seem to mean I wouldn't even need to make those tweaks; is it still early beta or would it be close enough to production level that I could start using it in my setup here?
k
No it’s not meant for production use cases yet
t
Gotcha. The release article (https://www.prefect.io/blog/announcing-prefect-orion/) mentions early 2022 though, is that still the target? If so, I should probably at least keep an eye on it and think about transition plans, and if not then I should drop that and focus more on building out the current approach.
k
I think so and we will help with transition plans whenever it’s ready
👍 1
t
Awesome. Excited to see that; looks like it will address basically all the minor annoyances I've had so far.