Tom Shaffner
11/22/2021, 4:23 PMwith Flow(flow_name) as flow:
<http://logger.info|logger.info>(f"{flow_name} Task Flow initiated, running in {file_path}")
df = pull_oracle_data_via(oracle_query_sql=oracle_query_sql_path,prod=use_oracle_prod)
df = set_data_types(df)
upload_to_table(df, destination_table = data_destination_table_name)
if summary_view_name is not None and history_table_name is not None:
<http://logger.info|logger.info>("Initiating history upload process.")
summary_df,summary_data_empty = pull_summary_data_via(sql=f"SELECT * FROM {summary_view_name}")
if summary_data_empty:
delete_today_from_history_if_exists(df=df,history_table=history_table_name)
upload_to_history_table(df=summary_df, destination_table=history_table_name, append=True)
else:
<http://logger.info|logger.info>("Skipping summary view run: summary view name and/or history table name missing.")
To address this I tried to make the dependencies explicit, by adding "upstream_task" flags to two of the above lines as so:
summary_df,summary_data_empty = pull_summary_data_via(_upstream_tasks_=[upload_to_table],_sql_=_f_"SELECT * FROM {summary_view_name}")
delete_today_from_history_if_exists(_upstream_tasks_=[pull_summary_data_via],_df_=df,_history_table_=history_table_name)
This doesn't seem to fix the issue though; when I run the flow, later tasks still seem to initiate before the Oracle pull, which should occur before everything. Anyone see what I'm doing wrong? The documentation would seem to indicate that feeding result data from one task to another would make dependencies work correctly, but that doesn't seem to be happening here.Kevin Kho
with Flow(...) as flow:
a = task_a()
b = task_b(upstream_tasks=[a])
right now I think you are doing:
b = task_b(upstream_tasks=[task_a])
which can work if there is only one instance of task_a.
Then also you can’t really use the Python if on task output. You need to use the case
task. The if
evaluates during the build time while the case
evaluates during run time. You can use if
in the Flow block if you are using it on a variable that isn’t a taskTom Shaffner
11/22/2021, 4:32 PMKevin Kho
with Flow(...) as flow:
a = task_a(x, y, z)
b = task_b(t, u, v, upstream_tasks=[a])
Tom Shaffner
11/22/2021, 4:35 PMTom Shaffner
11/22/2021, 4:36 PMKevin Kho
with case(task(lambda df: (df.shape > 0))(df), True):
...
Kevin Kho
upstream_tasks
. Will look again at your codeKevin Kho
if
in the DAG constructionTom Shaffner
11/22/2021, 4:38 PMTom Shaffner
11/29/2021, 8:51 PMif
. Think that simplifies it nicely, and I believe fits in better to the prefect paradigm (assuming I'm understanding it better).
That means my code is now this:
with Flow(flow_name) as flow:
<http://logger.info|logger.info>(f"{flow_name} Task Flow initiated, running in {file_path}")
df = pull_oracle_data_via(oracle_query_sql=oracle_query_sql_path,prod=use_oracle_prod)
df = set_data_types(df)
upload_to_table(df, destination_table = data_destination_table_name)
<http://logger.info|logger.info>("Initiating history upload process.")
summary_df = pull_summary_data_via(upstream_tasks=[upload_to_table],sql=f"SELECT * FROM {summary_view_name}")
delete_today_from_history_if_exists(upstream_tasks=[pull_summary_data_via],df=df,history_table=history_table_name)
upload_to_history_table(df=summary_df, destination_table=history_table_name, append=True)
When I load it though, the schematic that results is the one attached. Oddly it seems to create a separate, parallel path that executes immediately (and thus fails) based on the dependency on upload_to_table
rather than putting those tasks later in the first process. This means that instead of waiting until that point in the process to begin, the flow begins both paths simultaneously. Do you see what I'm doing wrong here?Kevin Kho
with Flow(flow_name) as flow:
u = upload_to_table(df, destination_table = data_destination_table_name)
...
summary_df = pull_summary_data_via(upstream_tasks=[u],sql=f"SELECT * FROM {summary_view_name}")
Calling a task in the flow block creates a copy (you can use tasks multiple times) so you want to point it to the instance.Tom Shaffner
11/29/2021, 9:06 PM_upstream_tasks_=[summary_df]
in the above?
That does indeed seem to work, thanks!Kevin Kho
Tom Shaffner
11/29/2021, 9:39 PMTom Shaffner
12/01/2021, 2:47 PMif
statements. Then I saw https://www.prefect.io/orion/ which would seem to mean I wouldn't even need to make those tweaks; is it still early beta or would it be close enough to production level that I could start using it in my setup here?Kevin Kho
Tom Shaffner
12/01/2021, 2:55 PMKevin Kho
Tom Shaffner
12/01/2021, 2:57 PM