is there a way to explicitly remove a task from an...
# ask-community
s
is there a way to explicitly remove a task from another task's set of upstream dependencies?
I have a task (actually a
PrefectSecret
) that prefect thinks is downstream of a
Parameter
that I'm using only because it's the first place that the
PrefectSecret
is actually called; it's used in several other downstream tasks, but because of this first place it's used, Prefect thinks that the Secret task only ought to be run (and therefore the secret only retrieved) if the
Parameter
is
True
, whereas in my use case it's going to be
False
~95% of the time
d
Hi @Sean Talia! That is pretty interesting. Can you share a little code with me? Your Flow schematic would also be helpful
My first thought is that configuration seems a bit strange
s
certainly, just give me a sec
👍 1
Copy code
password = PrefectSecret(name="PASSWORD")

# dbt source
with case(source_snapshot_freshness(), True):
  result1 = task1(env={"PASSWORD": password})

result2 = task2(env={"PASSWORD": password})

result2.set_upstream(result1)
okay this is in essence what it looks like
i realize that the
result2.set_upstream(result1)
is perhaps part of what's causing the issue here
although in my task2 initialization I do have:
Copy code
skip_on_upstream_skip=False,
trigger=all_finished
d
Can you post your full
with Flow() as flow:
context block?
s
it's a little gnarly (i also do some task initialization outside of the
flow
context) but I can post the relevant stuff
d
That’s totally okay 😄
s
okay I think this has all of it:
Copy code
dbt_run_task = DbtShellTask(
    name="run_dbt",
    log_stderr=True,
    return_all=True,
    skip_on_upstream_skip=False,
    trigger=all_finished
)

dbt_snapshot_freshness_task = DbtShellTask(
    name="snapshot_freshness_dbt",
)

with Flow(...) as flow:
  source_snapshot_freshness = Parameter("source_snapshot_freshness", default=False)

  snowflake_password = PrefectSecret(
        name="PREFECT_TEST_SNOWFLAKE_PW"
      , skip_on_upstream_skip=False
      , trigger=all_finished
  )

  # dbt source
  with case(source_snapshot_freshness(), True):
      dbt_snapshot_freshness_result = dbt_snapshot_freshness_task(
          env={"SNOWFLAKE_PASSWORD": snowflake_password},
          command="<...>"
      )

  # dbt run
  dbt_run_command = set_dbt_run_command(
      debug,
      full_refresh,
      models,
      strict,
      target,
  )
  dbt_run_result = dbt_run_task(
      env={"SNOWFLAKE_PASSWORD": snowflake_password},
      command=dbt_run_command,
  )
like really what I'm trying to do is tell prefect "hey, this
Parameter
/ case block is not really an upstream dependency of the
PrefectSecret
, I don't care what ultimately the result is there:
sometimes i need that case block to execute, but usually not
but regardless of whether or not it executes i need that
PrefectSecret
available for lot of other stuff
d
Just out of curiosity, why are you setting a trigger on the
snowflake_password
?
s
oh sorry yes that part is just from me messing around to try to get this to work
the:
Copy code
, skip_on_upstream_skip=False
      , trigger=all_finished
should not really be a part of the task here
i just wanted to see if it would work
but yeah, please ignore that
d
Okay
So I think the real problem here is that the
dbt_run_task
later on isn’t picking up the dependency for that secret
I think that’s happening because Prefect is expecting a second case statement
s
okay i think you pre-empted my question which was going to be, is the easier way to take care of this just to add an
else
block and define
snowflake_password = PrefectSecret(name="PREFECT_TEST_SNOWFLAKE_PW")
in there as well or something?
d
If you instead have something like:
Copy code
with Flow(...) as flow:
  source_snapshot_freshness = Parameter("source_snapshot_freshness", default=False)

  snowflake_password = PrefectSecret(
        name="PREFECT_TEST_SNOWFLAKE_PW"
      , skip_on_upstream_skip=False
      , trigger=all_finished
  )

  # dbt source
  with case(source_snapshot_freshness(), True):
      dbt_snapshot_freshness_result = dbt_snapshot_freshness_task(
          env={"SNOWFLAKE_PASSWORD": snowflake_password},
          command="<...>"
      )

 with case(source_snapshot_freshness(), False):
    # dbt run
    dbt_run_command = set_dbt_run_command(
        debug,
        full_refresh,
        models,
        strict,
        target,
    )
    dbt_run_result = dbt_run_task(
        env={"SNOWFLAKE_PASSWORD": snowflake_password},
        command=dbt_run_command,
    )
I think if you want to merge the conditional branches back you need to explicitly use the
merge
control flow utility
s
well the
dbt_run_task
is the one that I always want to have run
like i essentially just want to do nothing if
source_snapshot_freshness
is False
d
Yup, totally makes sense
s
I could have some dummy task execute, merge the dummy task result with the
dbt_snapshot_freshness_result
, and then use that merged result as an upstream dependency of
dbt_run_task
but in considering that i was like i have to be doing something wrong here
d
So you shouldn’t have to have dummy tasks I don’t think
You should be able to say “if
source_snapshot_freshness
is False, do just this”
s
sorry when you say, "you should be able to...", do you mean "you ought be able to, but right now you can't", or "i believe what you're trying to do will work by using an
else
block + `merge`"
d
The latter haha
s
alright i'll give this a try