https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • c

    Cole Murray

    11/03/2022, 11:36 PM
    Hi all, I’m experiencing an issue with passing a large parameter in a flow that leads to the UI becoming unresponsive and crashing. There is discussion here specific to dataframes, https://github.com/PrefectHQ/prefect/issues/6962, and a fix in PR that will use orjson, but the issue with the UI crashing is not specific to dataframes, and any large parameter (e.g. array with 1MM elements) would cause the same issue. How Prefect is designed, the parameters passed to a flow are retrieved as part of the flow runs api. If a parameter produces a large string when serialized, this will led to a massive response coming back and either causing a timeout, or crashing the server dependent on the size. A workaround here is to avoid passing large parameters, write to disk/s3 and then pass the URL as the parameter. This has the downside increasing runtime and creating an IO bottleneck. Curious on the communities / Prefect teams thoughts about this paradigm? Wanted to start a discussion here as it is technically not a bug and working as intended, but has side-effects in standard use-cases. Additional information added in reply.
    a
    m
    • 3
    • 10
  • b

    Ben Muller

    11/04/2022, 2:40 AM
    Hey Prefect, I have a task in 2,0 that I want to test, in that task I use the
    get_run_logger()
    . In order to test that function I am using
    .fn()
    but I can not test it because
    get_run_logger()
    failed every time because
    E           prefect.exceptions.MissingContextError: There is no active flow or task run context.
    Besides possibly using an optional kwarg in every single task that I have, how can I get around this to test my tasks logic properly?
    ✅ 1
    n
    c
    • 3
    • 13
  • s

    Slackbot

    11/04/2022, 3:37 AM
    This message was deleted.
  • i

    Ikkyu Choi

    11/04/2022, 6:01 AM
    Hi, I have a question .. I got a some flow id on cli (i.e., prefect get flows), and then i try to get flow’s status like is_running. So, I use FlowRunView.from_flow_run_id(‘my flow id’) but I got a ValueError: No flow runs found while querying for flow runs where ~~. Could you any one help me?
    m
    • 2
    • 2
  • d

    Deepanshu Aggarwal

    11/04/2022, 6:41 AM
    hi! i was trying to use the default dask task runner on our self hosted agent and orion. im running into
    AttributeError: 'coroutine' object has no attribute 'type'
    and the flow run crashes. any idea why this happened. adding detailed logs in the comment
    k
    j
    +8
    • 11
    • 43
  • b

    Ben Muller

    11/04/2022, 8:13 AM
    Hey Prefect, I am completely lost on this one, I am trying to deploy a flow from a docker container and getting this error 🧵
    k
    • 2
    • 5
  • m

    Mathijs Carlu

    11/04/2022, 9:25 AM
    Hi all, I'm currently running Prefect 2.6.5 on Kubernetes, running all flows in Kubernetes Jobs with flows baked in docker images. I seem to run into task run crashes which cause my flow run to crash, while I have retrying on these specific tasks. The task that reports the crash is a mapped task, over 12 inputs. The task is executed in a loop, with sleeping in between loop iterations. It mostly manages to execute a certain amount of iterations before crashing (like 30ish). Does anyone have an idea on how I could solve this, or what exactly is going wrong? Stack trace in thread
    s
    f
    +3
    • 6
    • 12
  • s

    Shruti Hande

    11/04/2022, 9:45 AM
    Getting this error while execution of flow sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) unable to open database file Also when i changed backend database to postgres this error comes up: sqlalchemy.exc.ProgrammingError: (sqlalchemy.dialects.postgresql.asyncpg.ProgrammingError) <class 'asyncpg.exceptions.Unde finedFunctionError'>: function gen_random_uuid() does not exist HINT: No function matches the given name and argument types. You might need to add explicit type casts.
    a
    j
    • 3
    • 14
  • n

    Nic

    11/04/2022, 10:55 AM
    Can anyone supply me with the correct format for environment variables in the docker container block?
    ✅ 1
    a
    • 2
    • 2
  • t

    Tony Yun

    11/04/2022, 11:51 AM
    When we have to loop a task to run multiple times, how can we avoid memory issue? I found that each time a task run, the memory steadily going up but not going down even if the variable is overwritten every time. (i’m using Prefect 2)
    m
    n
    • 3
    • 4
  • v

    Vadym Dytyniak

    11/04/2022, 1:26 PM
    Hello. Have few questions about the flow runs in Prefect 2: 1. How to restart flow run from the start(as I understand retry will restart only failed tasks)? 2. How to cancel the flow run(Currently see only 'Delete' option)? Thanks
    ✅ 1
    m
    • 2
    • 2
  • m

    Miremad Aghili

    11/04/2022, 1:34 PM
    Is there a way to cancel the flows from cli (prefect 1)
    ✅ 1
    k
    • 2
    • 2
  • g

    Giuliano Mega

    11/04/2022, 4:22 PM
    Hey folks, me again. 🙂 I'm trying to get a grip as to how Prefect behaves when you have somewhat fine-grained tasks and try to enforce a concurrency limit on them. I'm running it on a GKE cluster with autopilot. My understanding is that tag-based concurrency limits are enforced clusterwise, meaning there must be some central mechanism for coordination, is that assumption correct? Then, I'm trying the following flow, which is the kind of thing I'll have to run when doing external API calls to enrich thousands of data items:
    import time
    
    from prefect import flow, task
    
    from lib.prefect import get_run_logger
    
    
    @task
    def get_inputs():
        return range(1, 600)
    
    
    @task(tags=['sequential-scraper'])
    def process_input(item):
        logger = get_run_logger()
        <http://logger.info|logger.info>(f'Processing item {item}')
        time.sleep(1)
        <http://logger.info|logger.info>(f'Processed item {item}')
    
    
    @flow(description='Example of a very simple, API scraping-like Pipeline. ' 
                      'Feel free to run it as it does not do anything harmful.')
    def sample_pipeline():
        new_tokens = get_inputs()
        process_input.map(new_tokens)
    And what I see essentially is my task runner (Job) taking a very long time until it gets wiped out without running any tasks at all, and the UI (Prefect cloud) leaves the flow in an eternal "running" state. Enabling debugging shows a lot of:
    16:18:26.175 | DEBUG   | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
    16:18:26.868 | DEBUG   | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
    16:18:27.375 | DEBUG   | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
    16:18:28.344 | DEBUG   | prefect.engine - Received wait instruction for 30s: Concurrency limit for the sequential-scraper tag has been reached
    Am I misusing this in any way?
    m
    k
    • 3
    • 16
  • a

    Amogh Kulkarni

    11/04/2022, 5:19 PM
    Hi Team. I frequently see this error in our flows. It occurs randomly across flows. I am not sure where to investigation from. Can you provide any insights into the error?
    Crash detected! Request to <http://prefect-url:4200/api/task_runs/e26b6472-281f-4af1-933c-82516953f4a7/set_state> failed.
    m
    • 2
    • 8
  • s

    Slackbot

    11/04/2022, 5:26 PM
    This message was deleted.
  • j

    JP

    11/04/2022, 6:21 PM
    I’m seeing following exception irregularly on our prefect agent ec2
    Nov 04 18:03:30 ip-172-31-30-84 prefect[242862]: 18:03:30.621 | ERROR   | prefect.agent - Invalid input ConnectionInputs.RECV_PING in state ConnectionState.CLOSED
    Nov 04 18:03:30 ip-172-31-30-84 prefect[242862]: Traceback (most recent call last):
    Nov 04 18:03:30 ip-172-31-30-84 prefect[242862]:   File "/home/prefect/insights-prefect-flows/venv/lib/python3.10/site-packages/h2/connection.py", line 224, in process_input
    Nov 04 18:03:30 ip-172-31-30-84 prefect[242862]:     func, target_state = self._transitions[(self.state, input_)]
    Nov 04 18:03:30 ip-172-31-30-84 prefect[242862]: KeyError: (<ConnectionState.CLOSED: 3>, <ConnectionInputs.RECV_PING: 14>)
    Full trace in attachment This started to happen as soon as we upgraded from prefect 2.4.0 to 2.6.5 We are using Prefect Cloud. Is this a known issue? 😕
    Untitled.txt
    ✅ 1
    m
    j
    • 3
    • 20
  • s

    Sowmiya Anand

    11/04/2022, 7:51 PM
    Hi everyone, I am trying to test the new run_deployment() that was released with 2.5 version, I have upgraded prefect in my local to 2.6.6 however I am getting this error --> ImportError: cannot import name 'run_deployment' from 'prefect.client.utilities' any inputs?
    ✅ 1
    r
    • 2
    • 1
  • j

    James Vaughan

    11/04/2022, 7:59 PM
    Hey folks! I'm working in Presto 1.0 trying to sort out how to debug this error:
    Version group 92f0a1ae-7793-41bd-a662-d4cabbe103bf has no unarchived flows
    This same error crops up when I use the Flow ID itself. I'm trying to do this with CURL just to play with the UI and build some code out as I can't just use the python lib due to some restrictions. Can someone check out the code in the thread and let me know if you have any thoughts here?
    • 1
    • 4
  • a

    Alexandru Anghel

    11/04/2022, 8:34 PM
    Hi guys, What's the proper way of running a one time only schedule flow in Orion? I can't find the DatesClock class anymore in Orion. In Prefect 1.X i was using something like this to start the flow 10 seconds after the registration:
    def get_schedule(schedule: str):
        return Schedule(clocks=[DatesClock([pendulum.now().add(seconds=10)])]) if schedule == "one time" else CronSchedule(cron=schedule)
    Thanks!
    ✅ 1
    m
    n
    • 3
    • 10
  • b

    benson

    11/05/2022, 1:38 AM
    Hello! New to prefect. I am using a free version of prefect cloud. However one of my work queues is “unhealthy” and scheduled run is “late”. Wondering how to remedy, and what an “unhealthy” queue means.
    ✅ 1
    j
    • 2
    • 8
  • s

    Slackbot

    11/05/2022, 5:53 AM
    This message was deleted.
  • c

    chicago-joe

    11/06/2022, 12:24 AM
    any help on deleting a block type that I registered?
    An exception occurred.
    ➜  flows prefect blocks type delete listblock
    Cannot delete Block Type 'listblock'!
    I can verify the block type slug is correct
    b
    • 2
    • 1
  • f

    Faheem Khan

    11/06/2022, 5:59 AM
    Prefect 2.6.6, local. Any idea why my flows starting at the same time, although I have put a different schedule(start time)? Cheers
    b
    j
    • 3
    • 8
  • t

    Tim Galvin

    11/06/2022, 6:29 AM
    Hi all -- I am reading the docs for setting up deployments and hoping to set one up for my demo flow. The crux of my problem is that I have a set up similar to this:
    def main(
        sbid,
        my_args,
        cluster,
    ):
    
        dask_runner = get_dask_runner(cluster)
    
        # Define flow
        @flow(
            name=f"Processing holography -- {sbid}",
            task_runner=dask_runner,
        )
        def my_flow():
            logger = get_run_logger()
    
            task_super_awesome_worker()
    
        my_flow()
    This is a simplified example - but in short I am creating my flow function encapsulated in another function. I am going this as some attributed of the flow's task runner need to be defined at runtime depending on the SLURM cluster it is being executed on, account running the flow, compute resources for the slurm job etc (specified via the
    dask_jobqeue.SLURMCluster
    module/class). On the CLI my attempts to set the entrypoint
    prefect deployment build /path/to/my/flow_script.py:my_flow
    results in an error that amounts to
    my_flow
    not being in
    flow_script.py
    . If I try to replace
    my_flow
    with
    main
    , I get an error about
    main
    not being a
    Flow
    . In the docs I do not see an example of how to do this. So, I am wondering how does one do something like this?
    m
    • 2
    • 3
  • y

    YD

    11/06/2022, 6:00 PM
    Hi (Prefect migration question) In Prefect 1.0 I could see all the agents and could setup an “Automation” that monitors agent status and sends an alert if an agent stops working. How would you do those two things in Prefect 2.0? What is the equivalent to the triggers functionality from Prefect 1.0 https://docs-v1.prefect.io/api/latest/triggers.html#functions (is it the tasks
    .submit()
    ,
    .wait()
    , `.result()`… ?) In Prefect 1.0 the “task” had “timeout” and “retry_delay”, so that I was able to limit the run time of a task, but wait some time before trying to run it again. (this is useful since sometimes a task get stack due to some cluster resource issue, and I want to let some time pass before rerunning it) (https://docs-v1.prefect.io/api/latest/core/task.html) How would you do such a thing with Prefect 2.0 ? (I see that t he flow decorator has those options, but not the task)
    k
    • 2
    • 1
  • r

    Rich Tata

    11/06/2022, 9:42 PM
    So DST caused some havoc in my flows today. After the time change, every flow has been running two times, once at it's normal time, and again one hour in the future. I set them using cron string in my yaml and adjusting the timezone to my local and thought based on documentation that it would auto adjust (pictured). So what's the plan of action here, is it something I can change in the UI, or do I have to re-deploy all of these flows?
    ➕ 1
    ✅ 1
    b
    a
    +2
    • 5
    • 16
  • c

    Carlo

    11/06/2022, 10:28 PM
    Hello. We just upgraded our server/agents from 2.6.4 to -> 2.6.6. We are seeing resetting connections waiting for run_deployment. Thoughts? Thread for details
    👀 1
    ✅ 1
    b
    m
    • 3
    • 15
  • f

    Faheem Khan

    11/06/2022, 11:34 PM
    prefect 2.6.6 local server: I am using run_deployment() to run two flows i.e. Flow A and Flow B. second flow starts after first flow finishes. Whenever I triggers the flows from run_deployment() flow. the flow fails with an error although the Flow A completes without an error.
    👀 1
    ✅ 1
    c
    b
    • 3
    • 10
  • u

    2j

    11/07/2022, 3:18 AM
    Hi all! I’m assessing prefect2 as a potential job orchestration solution. We have a mixture of python / non-python jobs packaged via Docker and we run on k8s. It seems like flows consisting of https://docs-v1.prefect.io/api/latest/tasks/kubernetes.html#runnamespacedjob best suits my needs, but is there a similar concept in prefect 2? Or is prefect2 not really the right tool to orchestrate arbitrary dockerized jobs?
    m
    • 2
    • 2
  • b

    Ben Muller

    11/07/2022, 3:56 AM
    Is there a way to run a flow locally and NOT have it show up on the UI ( 2.0 ) ?
    ✅ 1
    k
    j
    m
    • 4
    • 12
Powered by Linen
Title
b

Ben Muller

11/07/2022, 3:56 AM
Is there a way to run a flow locally and NOT have it show up on the UI ( 2.0 ) ?
✅ 1
I know I could technically run under a non cloud profile, but I also want to be able to access my cloud blocks.
@Khuyen Tran , any help with this?
k

Khuyen Tran

11/07/2022, 4:44 PM
I assume you want to test your flows and don’t want for it to show on the UI? There is currently no way for you to hide the flows on Prefect Cloud. However you can: • tag the flows as
test
and delete all flows with the
test
tag • recreate the blocks in a different backend databases
b

Ben Muller

11/07/2022, 6:17 PM
😔
Is this on the road map?
j

Jeff Hale

11/07/2022, 8:35 PM
I don’t believe so, Ben. I think this the first time I’ve seen this request. If you want to open a feature enhancement issue in the repo, that would be great.
b

Ben Muller

11/07/2022, 8:42 PM
Yeah sure. I can't image I'd be the only person to want this. I'm blown away tbh. Not sure why you'd ever want local runs to show up on the UI at all.
:thank-you: 1
Side note on this. If I had notifications for all flows on (which I do). Then I'd be spamming my entire company when I'm developing locally!
That's very undesirable from my point of view
j

Jeff Hale

11/07/2022, 8:46 PM
I hear you.
👍 1
m

Michael Adkins

11/07/2022, 8:54 PM
Hm, what if there was a nice way to sync your Cloud blocks with a local profile?
🤔 2
b

Ben Muller

11/07/2022, 9:13 PM
Created here
:thank-you: 2
View count: 2