Sean Talia
03/29/2021, 4:28 PMSean Talia
03/29/2021, 4:29 PMPrefectSecret
) that prefect thinks is downstream of a Parameter
that I'm using only because it's the first place that the PrefectSecret
is actually called; it's used in several other downstream tasks, but because of this first place it's used, Prefect thinks that the Secret task only ought to be run (and therefore the secret only retrieved) if the Parameter
is True
, whereas in my use case it's going to be False
~95% of the timeDylan
Dylan
Sean Talia
03/29/2021, 4:43 PMSean Talia
03/29/2021, 4:47 PMpassword = PrefectSecret(name="PASSWORD")
# dbt source
with case(source_snapshot_freshness(), True):
result1 = task1(env={"PASSWORD": password})
result2 = task2(env={"PASSWORD": password})
result2.set_upstream(result1)
okay this is in essence what it looks likeSean Talia
03/29/2021, 4:47 PMresult2.set_upstream(result1)
is perhaps part of what's causing the issue hereSean Talia
03/29/2021, 4:48 PMskip_on_upstream_skip=False,
trigger=all_finished
Dylan
with Flow() as flow:
context block?Sean Talia
03/29/2021, 4:51 PMflow
context) but I can post the relevant stuffDylan
Sean Talia
03/29/2021, 4:54 PMSean Talia
03/29/2021, 4:54 PMdbt_run_task = DbtShellTask(
name="run_dbt",
log_stderr=True,
return_all=True,
skip_on_upstream_skip=False,
trigger=all_finished
)
dbt_snapshot_freshness_task = DbtShellTask(
name="snapshot_freshness_dbt",
)
with Flow(...) as flow:
source_snapshot_freshness = Parameter("source_snapshot_freshness", default=False)
snowflake_password = PrefectSecret(
name="PREFECT_TEST_SNOWFLAKE_PW"
, skip_on_upstream_skip=False
, trigger=all_finished
)
# dbt source
with case(source_snapshot_freshness(), True):
dbt_snapshot_freshness_result = dbt_snapshot_freshness_task(
env={"SNOWFLAKE_PASSWORD": snowflake_password},
command="<...>"
)
# dbt run
dbt_run_command = set_dbt_run_command(
debug,
full_refresh,
models,
strict,
target,
)
dbt_run_result = dbt_run_task(
env={"SNOWFLAKE_PASSWORD": snowflake_password},
command=dbt_run_command,
)
Sean Talia
03/29/2021, 4:56 PMParameter
/ case block is not really an upstream dependency of the PrefectSecret
, I don't care what ultimately the result is there:Sean Talia
03/29/2021, 4:57 PMSean Talia
03/29/2021, 4:57 PMPrefectSecret
available for lot of other stuffDylan
snowflake_password
?Sean Talia
03/29/2021, 5:00 PMSean Talia
03/29/2021, 5:01 PM, skip_on_upstream_skip=False
, trigger=all_finished
should not really be a part of the task hereSean Talia
03/29/2021, 5:01 PMSean Talia
03/29/2021, 5:01 PMDylan
Dylan
dbt_run_task
later on isnât picking up the dependency for that secretDylan
Dylan
Sean Talia
03/29/2021, 5:06 PMelse
block and define snowflake_password = PrefectSecret(name="PREFECT_TEST_SNOWFLAKE_PW")
in there as well or something?Dylan
with Flow(...) as flow:
source_snapshot_freshness = Parameter("source_snapshot_freshness", default=False)
snowflake_password = PrefectSecret(
name="PREFECT_TEST_SNOWFLAKE_PW"
, skip_on_upstream_skip=False
, trigger=all_finished
)
# dbt source
with case(source_snapshot_freshness(), True):
dbt_snapshot_freshness_result = dbt_snapshot_freshness_task(
env={"SNOWFLAKE_PASSWORD": snowflake_password},
command="<...>"
)
with case(source_snapshot_freshness(), False):
# dbt run
dbt_run_command = set_dbt_run_command(
debug,
full_refresh,
models,
strict,
target,
)
dbt_run_result = dbt_run_task(
env={"SNOWFLAKE_PASSWORD": snowflake_password},
command=dbt_run_command,
)
Dylan
merge
control flow utilitySean Talia
03/29/2021, 5:07 PMdbt_run_task
is the one that I always want to have runSean Talia
03/29/2021, 5:07 PMsource_snapshot_freshness
is FalseDylan
Sean Talia
03/29/2021, 5:08 PMdbt_snapshot_freshness_result
, and then use that merged result as an upstream dependency of dbt_run_task
Sean Talia
03/29/2021, 5:08 PMDylan
Dylan
source_snapshot_freshness
is False, do just thisâSean Talia
03/29/2021, 5:17 PMelse
block + `merge`"Dylan
Sean Talia
03/29/2021, 5:31 PM