https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • v

    Vadym Dytyniak

    11/23/2022, 1:01 PM
    Hi. What is correct approach to mock Blocks in tests(sync and async version)?
    b
    2 replies · 2 participants
  • j

    Jared Robbins

    11/23/2022, 1:16 PM
    “see Dask worker logs from the Prefect UI.” YES!
    🙌 4
  • t

    Tibs

    11/23/2022, 1:30 PM
    Hi, is it possible to add dependency for a subflow, so it will wait for a task to complete? Similar like wait_for when using a task.
    a
    3 replies · 2 participants
  • s

    Slackbot

    11/23/2022, 1:49 PM
    This message was deleted.
  • t

    Tim-Oliver

    11/23/2022, 2:11 PM
    Hello, I am running into an
    asyncio
    exception when using
    DaskTaskRunner
    which leads to tasks hanging and not completing. -->
    b
    k
    8 replies · 3 participants
  • k

    Khuyen Tran

    11/23/2022, 3:28 PM
    In Prefect Live at 3PM Eastern today, Allan Campopiano, a data scientist at Deepnote, will talk about ML learning in the warehouse with Snowpark and Deepnote. Come join us live on Twitch or YouTube.
    🎉 2
    🙌 2
    :party-parrot: 1
    :blob-attention-gif: 2
  • l

    Luca Schneider

    11/23/2022, 4:29 PM
    Hi all, regarding
    PREFECT_LOGGING_EXTRA_LOGGERS
    Has it to be set in the flow, on the agent or on the orion server ? Thanks
    ✅ 1
    r
    5 replies · 2 participants
  • e

    Esdras Lopes Nani

    11/23/2022, 4:30 PM
    Hey everyone! I'm having some problems with my agents (2 agents) , they are not submmiting any flows which has caused a late in my deployments (currently in 2513 Late runs) They are deployed in a EC2 with systemctl. I restarted them, installed last version of prefect (2.6.9) and the logs only shows that (no submission or any error).
    Agent started! Looking for work from queue(s): xxxxx...
    Don't know what else I can do 😅 Thanks!
    r
    a
    20 replies · 3 participants
  • t

    Tim-Oliver

    11/23/2022, 4:59 PM
    I am encountering
    Permission denied
    errors with the github storage blocks since today. Has anyone seen this as well? The error appears if the github repo is already present from a previous run. If the repo is deleted the gitrepo is cloned without any error.
    ✅ 1
    a
    p
    28 replies · 3 participants
  • c

    Chris Marchetti [Datateer]

    11/23/2022, 6:31 PM
    Hello Prefect community. We are getting an
    Unknown Opcode
    error in our prefect flow. I looked up the error and saw several issues relating to mismatched python versions. I have verified that we are using python 3.8 (3.87 and 3.8) for everything. Does anyone have any ideas why this error may be occurring? Other tasks in our flow ran successfully. Only one failed.
    m
    2 replies · 2 participants
  • a

    Ashley Felber

    11/23/2022, 6:40 PM
    Hey @George Coyne thanks for your help. One last question for now, the deployment yaml file is now in the directory of all my code that is used to build the image. Will moving it cause any issues? Does it need to be in a certain place?
    m
    1 reply · 2 participants
  • b

    Braun Reyes

    11/23/2022, 9:01 PM
    In v2, is anyone packaging dependencies in storage along with the flow files? Seems like this could allow for dependencies per flow in a mono-repo without needing a separate docker image per flow. Kind of how aws lambda zip files work.
    r
    f
    5 replies · 3 participants
  • s

    Santiago Gonzalez

    11/23/2022, 9:45 PM
    Hi, This is kind of difficult to explain. I have a flow that performs a job over an EC2 Instance, and the problem is that sometimes it fails and the most of times, it succeed. The job that runs, do the following operations: 1- Download a jar from somewhere. 2- Download some script from github. 3- Execute the script with some sort of arguments. And what it does, is download data from AWS S3, process the files downloaded, and then upload the results. The errors that I usually got are like •
    Main class from jar could not be found
    •
    The output directory does not exists, so it could not be synchronize to AWS S3
    Do you have any idea of why these types of issues happens time to time? BTW: I am using
    boto3
    SSM
    agent to handle EC2 Instances creation, execution and termination
    r
    1 reply · 2 participants
  • r

    Ryan Sattler

    11/24/2022, 4:42 AM
    Hi - it seems when a flow is registered to Prefect Cloud (v1) with a schedule, then re-registered without a schedule (ie flow.schedule = None), the schedule is preserved server-side and will stick around until manually toggled off via the UI. Is this behaviour intended? If it is, is there a way to override this programatically by explicitly setting an empty schedule of some sort?
    r
    3 replies · 2 participants
  • d

    Deepanshu Aggarwal

    11/24/2022, 6:39 AM
    running into an issue where the flow runs show running status but the job has terminated on kubernetes . the agent pod reading from the queue has restarted so cant see the logs . please check the attached screenshots. cc @Taylor Curran
    1 reply · 1 participant
  • d

    Deepanshu Aggarwal

    11/24/2022, 7:02 AM
    seeing this error first time.. anyone else who faced this an was able to fix it ?
    07:01:33.898 | ERROR   | Task run 'run_executor-a1954751-160' - Crash detected! Execution was interrupted by an unexpected exception: AssertionError
    m
    5 replies · 2 participants
  • e

    Eden

    11/24/2022, 7:16 AM
    quick question 🙏🏻 I built a work queue for Concurrency:
    unlimited
    It works perfectly fine. However, when I modify Concurrency into, for example, 3 It failed to run jobs 😞
    m
    3 replies · 2 participants
  • d

    Deepanshu Aggarwal

    11/24/2022, 8:31 AM
    one small question! is there a way to run deployment parallely?
    a
    7 replies · 2 participants
  • i

    iñigo

    11/24/2022, 8:43 AM
    Hi, I am trying to pass a list as a parameter via the UI. But it gets always as a string. Thank you
    d
    2 replies · 2 participants
  • d

    Deepanshu Aggarwal

    11/24/2022, 9:06 AM
    is it just me or parallel task runs (using the default concurrent task runs ) are failing left right and centre today
    a
    3 replies · 2 participants
  • s

    Sylvain Hazard

    11/24/2022, 10:10 AM
    Hey there ! Starting to dip my toes into Prefect 2. Was wondering if there is an equivalent to Task classes we could do in Prefect 1 where the only requirement was to override the
    run
    method. It felt like a good way to encapsulate complex tasks and improve code readability. Creating abstract tasks was also something I did sometimes. Is this behavior gone or has it evolved ? I couldn't find much in the docs regarding this unfortunately.
    a
    2 replies · 2 participants
  • t

    Tim Galvin

    11/24/2022, 12:59 PM
    Hi all -- has anyone seen an error like this before?
    Encountered exception during execution:
    Traceback (most recent call last):
      File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 612, in orchestrate_flow_run
        waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
      File "/software/projects/askaprt/tgalvin/setonix/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 1317, in wait_for_task_runs_and_report_crashes
        if not state.type == StateType.CRASHED:
    AttributeError: 'coroutine' object has no attribute 'type'
    I am running a known version of my workflow on a known dataset, which has worked perfectly fine dozens of times before. It seems to be saying the the
    state
    above is not an
    orion
    model -- rather a coroutine. All my tasks are using the normal
    task
    decorator around normal non-async python functions.
    r
    8 replies · 2 participants
  • b

    Boris Tseytlin

    11/24/2022, 4:28 PM
    Hey guys. What’s the best practice for testing a flow that uses blocks? I am creating a block with credentials for a test minio storage and running
    .save
    on it, but when I try to retrieve it later by
    load
    I get error 404 from Prefect.
    ValueError: Unable to find block document named test-minio-url for block type string
    @pytest.fixture(autouse=True, scope="session")
    def prefect_test_fixture():
        with prefect_test_harness():
            yield
    
    
    @pytest.fixture(scope="session")
    def minio_blocks(prefect_test_fixture):
        minio_creds_block = MinIOCredentials(
            minio_root_user=Config.MINIO_USER,
            minio_root_password=Config.MINIO_PASSWORD,
        )
        minio_creds_block.save("test-minio-creds")
        minio_url_block = String(Config.MINIO_URL)
        minio_url_block.save("test-minio-url")
        return minio_creds_block, minio_url_block
    
    
    @pytest.fixture
    def dummy_mission(minio_blocks):
        minio_creds_block, minio_url_block = minio_blocks
        minio_url = String.load(minio_url_block).value # <- ERROR HERE
        minio_url = minio_url.split("/")[-1:][0]
        minio_creds = MinIOCredentials.load(minio_creds_block)
    ✅ 1
    r
    4 replies · 2 participants
  • s

    Sami Serbey

    11/24/2022, 5:23 PM
    Hello prefect community 🙂 I am curious if anyone here was able to run prefect behind nginx server.
    r
    1 reply · 2 participants
  • r

    redsquare

    11/24/2022, 5:46 PM
    When using s3 storage does a k8s job when running the flow just download the files from the s3 path indicated in the deployment rather than the whole bucket?
    a
    3 replies · 2 participants
  • How to disable Prefect logger for tests?
    d

    davzucky

    11/24/2022, 11:51 PM
    In prefect 2, how can I test a flow or task which is using the
    get_run_logger()
    which is set from the context. You can find sample test code on the thread The test keep failing with the erorr
    prefect.exceptions.MissingContextError: There is no active flow or task run context.
    p
    k
    +1
    16 replies · 4 participants
  • w

    wonsun

    11/25/2022, 7:21 AM
    Hello expert~! I'm using prefect 1.0 and looking method to visualize the task running progress at our own web page. I made a flow that post-processes data searched by a user on the web, and an event to download data from the web page triggers that flow. During processing on prefect, we want to show how far the task has progressed on our web page also, like the prefect UI shows the state of the task being executed. I found similar questions and answers at google, but it is impossible situation to divide the task into one per data. (link) Is there a nice way to check the progress of a long-run task on our web? Is there such a feature in prefect 2.0?
    1️⃣ 1
    k
    1 reply · 2 participants
  • a

    Andrei Tulbure

    11/25/2022, 7:30 AM
    Hi. I need some quick help: We had some prefect 1 flows that were working fine on Monday and since Thursday they just freeze randomly. Liek one works, one just freezes. We are in the process of moving over to prefect 2, but still, we use 1.3.0 for what we have in prod now. I have been trying to debug it since it runs some ECS Tasks but I was not able to (larger machines, check AWS side of things etc). IT`s weird that the code (minus some print staements) worked perfectly fine on Monday. Any suggestions ?
    ✅ 1
    b
    2 replies · 2 participants
  • z

    Zinovev Daniil

    11/25/2022, 10:16 AM
    Hi all! I installed prefect orion on a linux VM. I get an error when I open the web interface.Can't connect to Orion API at https://my_ip/api. Check that it's accessible from your machine. I used different IPs in the set PREFECT_API_URL command. I think I'm not the first with this issue. Help)
    t
    r
    2 replies · 3 participants
  • r

    roady

    11/25/2022, 10:24 AM
    With prefect 2 how can I add dependence between mapped tasks? I want to skip any mapped downstream tasks if the corresponding mapped upstream task fails but without a direct link between the tasks. This is what I have so far:
    # Prefect 2.6.9
    # Python 3.8
    from prefect import flow, task, get_run_logger
    
    @task
    def add_one(x):
        if x==1:
            raise Exception("Raised exception")
        return x+1
    
    @task
    def do_something(dummy):
        get_run_logger().info("Doing something")
        return
    
    @flow
    def mapped_flow_not_dependent():
        a = list([0,2,3])
        b = add_one.map(a, return_state=True)
        c = add_one.map(b, return_state=True)
        d = do_something.map(a, return_state=True, wait_for = [c])
        
        print(c)
        print(d)
        
        return "Flow completes"
    
    if _name_ == "_main_":
        mapped_flow_not_dependent()
    One state in c being failed means none of following do_something tasks run, whereas I would like all of the do_something tasks to run apart from ones where c is failed. I can get the desired behaviour by linking the tasks explicitly: changing the argument of do_something from a to c (and removing the wait_for kwarg).
    t
    p
    +3
    20 replies · 6 participants
Powered by Linen
Title
r

roady

11/25/2022, 10:24 AM
With prefect 2 how can I add dependence between mapped tasks? I want to skip any mapped downstream tasks if the corresponding mapped upstream task fails but without a direct link between the tasks. This is what I have so far:
# Prefect 2.6.9
# Python 3.8
from prefect import flow, task, get_run_logger

@task
def add_one(x):
    if x==1:
        raise Exception("Raised exception")
    return x+1

@task
def do_something(dummy):
    get_run_logger().info("Doing something")
    return

@flow
def mapped_flow_not_dependent():
    a = list([0,2,3])
    b = add_one.map(a, return_state=True)
    c = add_one.map(b, return_state=True)
    d = do_something.map(a, return_state=True, wait_for = [c])
    
    print(c)
    print(d)
    
    return "Flow completes"

if _name_ == "_main_":
    mapped_flow_not_dependent()
One state in c being failed means none of following do_something tasks run, whereas I would like all of the do_something tasks to run apart from ones where c is failed. I can get the desired behaviour by linking the tasks explicitly: changing the argument of do_something from a to c (and removing the wait_for kwarg).
t

Tim-Oliver

11/25/2022, 10:27 AM
I would like to know this too!
👍 1
p

Pekka

11/25/2022, 11:17 AM
Do I understand correctly - you're trying to infer from
c
how
do_something
should proceed with
a
?
r

roady

11/25/2022, 11:22 AM
yes - I only want to do_something for the elements in a (or some other unrelated variable) if the corresponding c state is completed
🤔 1
p

Pekka

11/25/2022, 11:25 AM
I don't remember the details but is there some alternative to map like submit with a Parallel taskrunner?
would that give the option to parameterize the function call?
or maybe change the tasks to flows?
r

roady

11/25/2022, 11:32 AM
I'm not sure about submit - I would like to do it with map if possible. Likewise for changing the tasks to flows! If you replace
d = do_something.map(a, return_state=True, wait_for = [c])
with
d = do_something.map(c, return_state=True)
then the desired behaviour is retrieved. But I specifically want to be able to enforce dependence for mapped tasks which would otherwise not be dependent.
k

Khuyen Tran

11/30/2022, 5:15 PM
Currently, the only way you can force this to happen to have
c
as an argument of
do_something
. If you want this to be something that Prefect supports, I encourage you to create a GitHub issue for this on Prefect GitHub page
r

roady

12/01/2022, 8:35 AM
Thanks @Khuyen Tran. So wait_for only works for tasks which aren't mapped?
k

Khuyen Tran

12/01/2022, 3:47 PM
It worked when you use
task.map()
, didn’t it? I’m not sure if I understood your question
r

roady

12/01/2022, 4:25 PM
Sorry @Khuyen Tran for the confusion. There are three different cases that I think help explain what I mean. 1. For unmapped tasks, you can use
wait_for
to not run downstream tasks if an upstream one does not reach a Completed state, even if the downstream tasks do not take the upstream state as an argument. 2. For mapped tasks which do take an upstream state as an argument, only the corresponding downstream tasks do not run if a given upstream task does not enter a Completed state. 3. For mapped tasks which use
wait_for
, all of the downstream tasks do not run if any one of the upstream tasks enters a failed state. Here's an example of the three different cases which I hope will help you understand what I mean:
# Prefect 2.6.9
# Python 3.8
from prefect import flow, task, get_run_logger

@task
def add_one(x):
    if x==1:
        raise Exception("Raised exception")
    return x+1

@task
def do_something(dummy):
    get_run_logger().info("Doing something")
    return

@flow
def wait_for_mwe():
    # No mapping, using wait_for but not argument
    a = 1
    b = add_one(a, return_state=True)
    c_1 = do_something(a, return_state=True, wait_for = b)

    # Mapping, using argument
    a = list([1,2,3])
    b = add_one.map(a, return_state=True)
    c_2 = do_something.map(b, return_state=True)

    # Mapping, using wait_for but not argument
    a = list([1,2,3])
    b = add_one.map(a, return_state=True)
    c_3 = do_something.map(a, return_state=True, wait_for = b)

    return c_1, c_2, c_3

if __name__ == "__main__":
    c_1, c_2, c_3 = wait_for_mwe()

    # State is NotReady
    print("Expecting NotReady state:")
    print(c_1)

    # Two completed states and one NotReady state
    print("Expecting 1 NotReady and 2 Completed:")
    print(c_2)

    # All states are NotReady! :(
    print("Expecting 1 NotReady and 2 Completed:")
    print(c_3)
a

Anna Geller

12/01/2022, 7:05 PM
have you tried allow_failure?
wait_for = [allow_failure(c)]
example:
from prefect import task, flow, get_run_logger, allow_failure


@task
def ingest_data():
    return 42


@task
def transform_data(x: int) -> int:
    if True:
        raise ValueError("Non-deterministic error has occured.")
    else:
        return x * 42


@task
def clean_up_task():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Cleaning up 🧹")


@flow
def allow_flaky_transformation_to_pass():
    data = ingest_data.submit()
    result = transform_data.submit(data)
    clean_up_task.submit(wait_for=[allow_failure(result)])


if __name__ == "__main__":
    allow_flaky_transformation_to_pass()
🧹 1
👀 1
r

roady

12/02/2022, 11:07 AM
I tried
allow_failure
in my mwe, but it means that all of the mapped downstream tasks run, even if there was a failure in a corresponding upstream task. 😞
p

Peyton Runyan

12/02/2022, 1:40 PM
@Tim-Oliver I answered the question here: https://prefect-community.slack.com/archives/CL09KU1K7/p1669805071115219
🙌 2
:gratitude-thank-you: 1
✅ 1
k

Khuyen Tran

12/02/2022, 4:12 PM
This one should work as expected:
from prefect import flow, task, get_run_logger


@task
def add_one(x):
    if x == 2:
        raise Exception("Raised exception")
    return x + 1

@task 
def add_two(x):
    if x == 2:
        raise Exception("Raised exception")
    return x + 2 

@task
def do_something(dummy):
    get_run_logger().info("Doing something")
    return


@flow
def mapped_flow_not_dependent(a=[1, 2, 3]):

    b = add_one.map(a)
    c = add_two.map(b)
    d = [
        do_something.submit(item)
        for future, item in zip(c, a)
        if future.wait().type == "COMPLETED"
    ]
    return "Flow completes"
🙌 1
✅ 1
Graph:
Or you can do this based on @Anna Geller’s suggestion:
@flow
def mapped_flow_not_dependent(a=[1, 2, 3]):

    b = add_one.map(a)
    c = add_two.map(b)
    d = [
        do_something.submit(item, return_state=True, wait_for=future)
        for item, future in zip(a, c)
    ]
    return "Flow completes"
🙌 1
✅ 1
a

Anna Geller

12/02/2022, 4:16 PM
some more examples:
from prefect import task, flow


@task
def upstream_task(item):
    if item == "c":
        raise Exception("this upstream task failed")
    return str(item) + "+1"


@task
def downstream_task(item):
    return str(item) + "+2"


@flow
def demo():
    items = ["a", "b", "c", "d"]
    first = upstream_task.map(items)
    downstream_task.map(first)  # runs only for a, b, and d. c is in NotReady state


if __name__ == "__main__":
    demo()
❌ 1
from prefect import flow, task, get_run_logger, allow_failure


@task
def extract():
    return [1, 2, 3]


@task
def add_one(x):
    if x == 2:
        raise Exception("Something is not right")
    return x + 1


@task
def add_two(x):
    return x + 2


@task
def cleanup_task():
    get_run_logger().info("Cleaning up e.g. removing temp Ray cluster")


@flow
def map_with_cleanup_task():
    a = extract()
    b = add_one.map(a)
    c = add_two.map(b)
    cleanup_task.submit(wait_for=[allow_failure(c)])


if __name__ == "__main__":
    map_with_cleanup_task()
❌ 1
r

roady

12/05/2022, 10:03 AM
Thanks guys! Edit: Didn't realise at first but those last 2 suggestions don't result in the desired behaviour: in the first one the task takes an upstream task as an argument which I was trying to avoid, and in the second one the cleanup task seems to only be submitted once despite there being two completed upstream tasks.
🙌 1
View count: 2