https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Jon Young

    01/11/2023, 2:57 PM
    hey all, in prefect 1, how do i exit a flow if a task fails? when my task fails, the flow continues to run tasks that are not dependent on that task.
  • m

    Matthew Scanlon

    01/11/2023, 3:33 PM
    What is the recommended means by which one can send a slack notification from inside of a task? It looks like the prefect-slack task does not work if called as a function?
    m
    • 2
    • 3
  • e

    eddy davies

    01/11/2023, 3:56 PM
    From reading other messages I am aware the statefulness classes does not work well with class method as tasks or flows. I have however written my python code using classes, I like them for keeping my code in manageable segments. As long as I do not need to pass any self attributes from when task to another I assume I am ok to use a class.
    n
    a
    • 3
    • 5
  • j

    Jason Noxon

    01/11/2023, 4:04 PM
    Question: did prefect2 get rid of prefect.config?
  • i

    Ilya Galperin

    01/11/2023, 4:28 PM
    Hi all - we’re seeing sporadic
    BrokenPipeError: [Errno 32] Broken pipe
    crashes in one of our flows on 2.7.7, running on DaskTaskRunner. This flow runs ~1000 tasks, occasionally one of these tasks will enter a Crashed state with this error and cause our flow to enter a Failed state. Retries on these crashed tasks don’t seem to work (I’m guessing Crashed state tasks are excluded from retry logic). Full traceback in the thread. Any ideas? Thank you!
  • j

    Jeremiah

    01/11/2023, 5:29 PM
    Just a reminder, our usually-friendly bot Marvin is here to help answer any questions! Isn’t that right @Marvin?
    :marvin: 1
    m
    a
    m
    • 4
    • 17
  • j

    jack

    01/11/2023, 5:41 PM
    Is there a way to change the location of the temporary working directory that is used by prefect v2?
    c
    • 2
    • 5
  • y

    Yaron Levi

    01/11/2023, 5:53 PM
    Hi guys 👋 Someone have experience with the DaskTaskRunner() and runing tasks in parallel? We can’t make tasks run in parallel. I’ve detailed the problem here: https://discourse.prefect.io/t/tasks-are-not-running-in-parallel-when-using-dasktaskrunner/2180/1
    👀 1
    a
    • 2
    • 20
  • f

    FuETL

    01/11/2023, 6:20 PM
    Hi everyone, can someone enlighten this behaviour to me? I have the
    case
    with flow when is
    true
    and when is
    false
    But i noticed that when the case is
    false
    the
    task_b()
    is always SKIPPED. After some struggling i figure out that the reason that the task was being SKIPPED was because inside the case
    true
    i'm changing the task_result variable (if i rename the task run on false cond). Is this the desired behaviour?
    from prefect import task, Flow, case
    
    
    @task()
    def task_a():
        print("task_a")
        return "task_a"
    
    
    @task()
    def task_condition():
        return False
    
    
    @task()
    def task_b(value: str):
        print("task_b")
        return f"task_b={value}"
    
    
    @task()
    def task_c():
        print("task_c")
    
    
    with Flow("test-flow") as flow:
        task_result = task_a()
        cond = task_condition()
    
        with case(cond, True):
            task_result = task_b(task_result)
            task_c()
    
        with case(cond, False):
            task_result = task_b(task_result)
    
    flow.run()
    m
    • 2
    • 13
  • i

    Ilya Galperin

    01/11/2023, 6:29 PM
    Hi all - we’re seeing sporadic
    BrokenPipeError: [Errno 32]
    Broken pipe crashes in one of our flows on 2.7.7, running on DaskTaskRunner. This flow runs ~1000 tasks, occasionally one of them will enter a Crashed state with this error and cause our flow to enter a Failed state. Retries on these crashed tasks don’t seem to work (I’m guessing Crashed state tasks are excluded from retry logic). Full traceback in the thread. Any ideas? Thank you!
    m
    • 2
    • 7
  • j

    Jeremiah

    01/11/2023, 6:38 PM
    @Marvin give us a joke about data engineering
    m
    a
    • 3
    • 6
  • a

    Anna Geller

    01/11/2023, 6:56 PM
    @Marvin tell a joke about data science
    m
    • 2
    • 1
  • a

    Alexander Kloumann

    01/11/2023, 7:10 PM
    Hi there, I'm using VSCode to build a data ingest pipeline, but finding debugging in Prefect very difficult. When using the debugger I'm unable to "step into" any functions labeled as a Prefect task, and if I try to print out the results from a Prefect task outside of that function, it just gives me a Prefect object of some kind. Is there any way to debug functions labeled Prefect tasks? I'm using Prefect 1.0 as we're trying to get this up and running asap but will switch to 2.0 soon.
    ✅ 1
    m
    • 2
    • 6
  • j

    Jeff Hale

    01/11/2023, 7:30 PM
    Reminder that @Christopher Boyd and I are going to be chatting about K8s and Prefect in 30 minutes on PrefectLive! 🙂 https://prefect-community.slack.com/archives/C036FRC4KMW/p1673379080230649
    🙌 1
    👍 1
    :gratitude-thank-you: 1
  • e

    eddy davies

    01/11/2023, 9:33 PM
    Why am I getting this error running my agent deployment:
    /Users/macbook/.pyenv/versions/3.10.5/lib/python3.10/runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
      warn(RuntimeWarning(msg))
    ✅ 1
    m
    • 2
    • 2
  • a

    Aaron Goebel

    01/11/2023, 10:27 PM
    I am trying to use the orchestrator pattern to chain together
    deployments
    dynamically at run time. I already have a DAG defined with how the deployments should be chained, and I also determine that some outputs of deployment runs should be used as input to other
    run_deployment
    invocations. I'd like to use Prefect for the scheduling of everything, so my approach has been to wrap the
    run_deployment
    in a
    task
    . e.g.:
    @task
    async def run_deployment_task(depl_id: str, parameters: dict):
       client = await get_client()
       run = await client.create_flow_run_from_deployment(deployment_id=depl_id, parameters=parameters)
                run_state = await run.get_state()
                return run_state.result
    So, given a set of deployments, their dependencies, and their parameters, I basically want to run something like this:
    @flow
    async def run_deployments_flow(graph: dict):
        results = {}
        # create tasks
        tasks = {name: run_deployment_task.submit(deployment_id=flow_params['deployment_id'], parameters=flow_params['inputs'])
                 for name, flow_params in graph.items()}
    
        # set dependencies
        for flow_name, flow_params in graph.items():
            for dependency in flow_params.get('dependencies', []):
                tasks[flow_name].wait_for(tasks[dependency])
    
        #await all tasks
    In this way, the taskrunner deals with scheduling, managing concurrency etc. HOWEVER. My issue is that the output of some task runs needs piping into downstream tasks. This doesn't work natively with tasks, so with this approach I'd need to deal with the looping, polling of task status, etc. manually. Is there any way I can use prefect to get this kind of desired result? tldr: want to dynamically chain deployments together where outptus of one may be inputs of another, and I want prefect manage the orchestration of it all.
    ✅ 1
    n
    • 2
    • 8
  • y

    YD

    01/11/2023, 10:37 PM
    Pricing question… Prefect 1.0 was free below some level of usage In Prefect 2.0 the cheapest plan is $39/month Is this correct ?
    ✅ 1
    a
    • 2
    • 2
  • k

    Khyaati Jindal

    01/12/2023, 8:10 AM
    My code runs as a system serivce in a ec2
    c
    • 2
    • 7
  • k

    kent

    01/12/2023, 8:13 AM
    I used prefect2 Suppose you have a FLOW that is scheduled every 5 minutes. from prefect import flow,task
    @task
    def my_favorite_function():
        raise ValueError("This flow immediately fails")
    
    @task
    def one_return():
        return 1
    
    @task
    def tow_return():
        return 2
    
    @flow
    def run_flow():
        my_favorite_function()
        one_return()
        tow_return()
    I want to PEND or skip an upcoming schedule. I read the documentation but couldn't figure out how to implement the specifics.
    c
    • 2
    • 1
  • t

    Tim Galvin

    01/12/2023, 8:21 AM
    In the same way that one can do a
    if 'PATH' in os.environ.keys()
    is there a way to test the names of blocks that have been stored? Is there a best practise use case here? Or is it just a case one has to catch the error if it does not exist
    r
    • 2
    • 5
  • p

    PB

    01/12/2023, 8:45 AM
    I have three questions from someone who works in AWS env: 1. What's the most efficient way of deploying and managing Prefect Server on AWS assuming Prefect Cloud is not an option for now? We are looking for a way with the least overhead due to lack of devops. 2. Any blog posts/sample you would recommend with examples of server+agents deployed on AWS and how does it look like and works there? 3. Can workflow be triggered by s3 CreateObject event? (similar to lambda triggers for example) In general I'm trying to find materials on how can prefect function within AWS only and see which way is better, prefect or mwaa for a small team.
    a
    • 2
    • 8
  • t

    Torstein Molland

    01/12/2023, 9:59 AM
    Hi, We are having an issue where sometimes only two flows are executed concurrently and the remaining flows are stuck in the "pending" state. Our work queue (currently only one) is set up with unlimited concurrency and we are not enforcing any concurrency limits. We are running the flows in AWS using ECS. I noticed that sometimes our memory usage is high at about 80%. Could this be the reason the concurrency is sometimes limited? Thank you kindly for your time 😊
    c
    • 2
    • 1
  • j

    James Zhang

    01/12/2023, 10:03 AM
    Hi all, just wondering if there’s an easy way to define flow run name instead of just having the random generated names? and not using
    run_deployment(flow_run_name="xxx")
    because I want to start the flow run on the UI…
    j
    • 2
    • 2
  • n

    Nimesh Kumar

    01/12/2023, 10:58 AM
    Hi everyone, Can anyone please tell me how I can implement an on_failure callback on a task in prefect? I am using prefect 2.7.7
    ✅ 1
  • j

    Jo Tryti

    01/12/2023, 12:22 PM
    Is there a way to add authentication to a self-hosted orion server? We have a simple setup where we self-host prefect in azure. We have the orion server in an Azure AppService, a psql database hosted on a Flexible Server and one agent running in a Container Instance. Everything is working great, but we want to enable authentication for the App Service. As in most of our self-hosted in-house application we just want to enable the AAD authentication on the App service but when doing so the agent can't communicate with the API since its not authorized. Does anyone know a way to get the agent to use its service principal to authenticate with the API?
    c
    • 2
    • 4
  • a

    Ankit Choudhary

    01/12/2023, 12:25 PM
    Hello, did someone setup prefect 2 with bitbucket as storage option? Is that even a thing?
    j
    • 2
    • 3
  • d

    Denis

    01/12/2023, 2:08 PM
    Hello everyone, Is there a alternative to clocks, filter and adjustments when creating complex schedules in prefect 2.
    ✅ 1
    a
    • 2
    • 13
  • s

    Sean Talia

    01/12/2023, 2:22 PM
    hello! Has anyone ever had success with Prefect v1 in creating a flow that dynamically creates a number of
    StartFlowRun
    tasks at runtime? My scenario is that I want a flow (call it
    orchestratorFlow
    ) to orchestrate the execution of several instances of another flow that I have (call that one
    ecsFlow
    ). The issue at hand is, I don't know how many instances of my
    ecsFlow
    I need to kick off until
    orchestratorFlow
    runtime; it could be 1, 8, or 50+ (each flow run of
    ecsFlow
    would be run with a different set of parameters), and would be determined based on a parameter passed to
    orchestratorFlow
    . I've tried to follow the Prefect v1 map paradigm by doing
    StartFlowRun.map([<config_parameters>])
    , where
    [<config_parameters>]
    is a dynamically generated list of flow run kwargs returned by an upstream task, but that's not doing the trick. Thank you for any help!
    👀 1
    m
    j
    • 3
    • 11
  • p

    Paul Lucas

    01/12/2023, 3:16 PM
    Hi all Is there anyway to change the email address for the user in our prefect cloud instance? We have it set to a personal email for testing purposes but would like to change it to a process email that isn’t tied to an individual. Thanks
    ✅ 1
    b
    • 2
    • 1
  • m

    Mrityunjoy Das

    01/12/2023, 9:42 AM
    Hello, I am using Prefect-dask: DaskTaskrunner for concurrent task processing. But somehow it always runs 2 task concurrently. Other 5 task are in pending stage. Is it because of the resource of the server or do I need to change any configuration? Thanks in advance
    c
    • 2
    • 1
Powered by Linen
Title
m

Mrityunjoy Das

01/12/2023, 9:42 AM
Hello, I am using Prefect-dask: DaskTaskrunner for concurrent task processing. But somehow it always runs 2 task concurrently. Other 5 task are in pending stage. Is it because of the resource of the server or do I need to change any configuration? Thanks in advance
c

Christopher Boyd

01/12/2023, 3:25 PM
Hi @Mrityunjoy Das - I think this needs a little bit more detail to properly say. How many resources are available? Are you using a local dask cluster, or connecting to an existing dask cluster? what does your dask configuration look like for # workers / threads? How are you submitting the work to dask (e.g. the actual task code doing the submission)?
View count: 1