Hello! What is a replacement for prefect.signals f...
# prefect-community
y
Hello! What is a replacement for prefect.signals from 1.0? I used it a lot, especially for skipping unnecessary tasks. Should I do it in a native pythonic way, like passing None as a result of a task to show that there are no need to do something in downstream tasks? The second question is about manually setting downstream/upstream tasks. I was able to do it with
.set_upstream()
method in 1.0. But it looks like there are no more such method. I.e. there are two tasks: loading data into some BigQuery table and then merging it with another table. Obviously these tasks need to be run in exact order. But there are no need to pass any results from the first task to the second, so it wasn’t obvious for Prefect 1.0 that there is some kind of order. That’s why I used
.set_upstream()
. What should i do now? And the third question is about
.map()
for tasks. So, now i should just use standard pythonic map? Will it be parallelized in the same manner as it was in Prefect 1.0 with Concurrent/Parallel task runner?
copy to answer your question inline: In Orion, you can raise a custom exception directly in the flow, which will result in a
Failed
task run:
Copy code
from prefect import task

@task
def signal_task(message):
    if message == 'stop_immediately!':
        raise RuntimeError(message='Got a signal to end the task run!')
Alternatively, your flow may return a specific state 3:
Copy code
from prefect import task
from prefect.orion.schemas.states import Failed

@task
def signal_task(message):
    if message == 'stop_immediately!':
        return Failed(message='Stopping the task run immediately!')
y
okay, and what about setting upstream tasks and mapping them?
a
There are also migration topics for that Answers here: wait_for for dependencies and mapping exists in 2.0 too
👀 1
m
@Anna Geller Do't suppose you could point us in the direction of the upstream task documentation?
a
It may be missing in the docs, but check out the arguments of a task in your IDE or API reference for a task - I believe the current syntax is task2.submit(wait_for=[task1])
👍 1