https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • v

    Vadym Dytyniak

    11/30/2022, 2:46 PM
    Hi. Is it correct statement that Prefect 2 map function does not guarantee the order? Example:
    l1 = task1.map([1, 2, 3])
    l2 = task2.map(l1)
    
    task3.map(l1=l1, l2=l2)
    Prefect does not guarantee that in
    task3
    list
    l1
    elements will be passed with corresponding
    l2
    elements?
    a
    • 2
    • 7
  • a

    Anna Geller

    11/30/2022, 5:46 PM
    Teaser: look what just got merged 🎉 starting from the next Prefect release, you'll be able to do:
    @flow(log_prints=True)
    👀 11
    :party-parrot: 10
    :blob-attention-gif: 6
    🙌 6
    :marvin: 1
    💙 1
  • n

    Nate Roberts

    11/30/2022, 6:28 PM
    Hi there, I’m experiencing the same problem as this previous post. When testing a slack incoming web hook, the test fails with no error message. What would be the best way to further triage this issue?
    k
    b
    • 3
    • 5
  • a

    Ashley Felber

    11/30/2022, 7:06 PM
    Hello, I have an agent set up on ECS and built a deployment using the ECStask block to run the flow as a Fargate task. I tried to run the deployment and the flow run failed. I have no information as to why though. The logs are completely empty, it says "This run didn't generate Logs". There is also no logs in AWS for the task I was intending to run so it seems like it failed before the task even started.
    m
    • 2
    • 16
  • k

    Kevin McCraney

    11/30/2022, 8:15 PM
    Hey folks, my teammates and I are trying to migrate a monorepo of tasks & flows from Prefect 1 to Prefect 2. As part of that, we're naturally doing some testing. We were previously using the
    is_successful()
    method in an assert in our testing, and to move that to Prefect 2, we were thinking we might call
    get_run_context()
    to get the flow run state and return some value (likely 'Completed' from the
    state_name
    parameter). We've observed that calling
    get_run_context()
    within a flow's context causes the flow to hang. Has anyone observed this, or does anyone have any suggestions for remedy?
    m
    • 2
    • 9
  • s

    Simon Macklin

    11/30/2022, 8:20 PM
    Hey prefect when creating a flow which would run in kubernetes. v1 adds automatically flow_id to the pod labels. But V2 doesn’t appear to so. How would I add flow meta data to the flow pod ?
    :kubernetes: 1
    ✅ 1
    m
    • 2
    • 9
  • s

    Sean Conroy

    11/30/2022, 9:34 PM
    Following up on Kevin's question above, what is the equivalent for the
    is_successful()
    method from Prefect 1 for Prefect 2? In other words, is testing the state name = 'Completed' the best way to check for success?
    myflow_state = my_flow(return_state=True)
    
    # Test if state is successfull:
    if not myflow_state.name == 'Completed':
        raise Exception("myflow failed")
    ✅ 1
    b
    • 2
    • 2
  • m

    Michael Kahan

    11/30/2022, 10:00 PM
    Hey all - Has anybody used
    prefect-airbyte
    to trigger syncs on Airbyte Cloud? Or is it only for open source hosted instances?
    ✅ 1
    k
    • 2
    • 3
  • b

    Boggdan Barrientos

    11/30/2022, 10:07 PM
    Hi! I'm running a flow that run two subflows. One subflow execute a task that uses
    run_deployment
    for execute two deployments that has to be completed the next subflow. I'm having issues in the execution of the seconds subflows, the agent dies after one hour of waiting. How can I apply this logic successfully? I need to execute n deployments and when the n deployments has been completed, execute another flow. Image 1: The deployments runs properly. Image 2: The tasks that runs the deployments never ends.
    k
    • 2
    • 8
  • w

    wonsun

    12/01/2022, 4:09 AM
    Hi all~ I am trying to migrate to the prefect 2.0 version!! :marvin: In prefect 1.0, the flow version was well displayed as a number in the upper right corner of the UI(below image, left), but does the orion UI only show the flow version id for each flow? (below image, right) I thought it was much more intuitive to show it in numbers like prefect 1.0. Is it i can't find it at the Orion? Or was it intended what just show flow version id?
    ✅ 1
    k
    • 2
    • 1
  • k

    Khyaati Jindal

    12/01/2022, 5:01 AM
    Hi guy, I am running a agent from my ec2 machine , and after a while, the agent throws this error
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/home/ubuntu/5c-ed-sumary-telegram-bot/env/lib/python3.10/site-packages/prefect/agent.py", line 154, in get_and_submit_flow_runs
        queue_runs = await self.client.get_runs_in_work_queue(
      File "/home/ubuntu/5c-ed-sumary-telegram-bot/env/lib/python3.10/site-packages/prefect/client/orion.py", line 763, in get_runs_in_work_queue
        response = await <http://self._client.post|self._client.post>(
      File "/home/ubuntu/5c-ed-sumary-telegram-bot/env/lib/python3.10/site-packages/httpx/_client.py", line 1848, in post
        return await self.request(
      File "/home/ubuntu/5c-ed-sumary-telegram-bot/env/lib/python3.10/site-packages/httpx/_client.py", line 1533, in request
        return await self.send(request, auth=auth, follow_redirects=follow_redirects)
      File "/home/ubuntu/5c-ed-sumary-telegram-bot/env/lib/python3.10/site-packages/prefect/client/base.py", line 160, in send
        await super().send(*args, **kwargs)
      File "/home/ubuntu/5c-ed-sumary-telegram-bot/env/lib/python3.10/site-packages/httpx/_client.py", line 1620, in send
        response = await self._send_handling_auth(
      File "/home/ubuntu/5c-ed-sumary-telegram-bot/env/lib/python3.10/site-packages/httpx/_client.py", line 1648, in _send_handling_auth
    File "/home/ubuntu/5c-ed-sumary-telegram-bot/env/lib/python3.10/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects
        response = await self._send_single_request(request)
      File "/home/ubuntu/5c-ed-sumary-telegram-bot/env/lib/python3.10/site-packages/httpx/_client.py", line 1722, in _send_single_request
        response = await transport.handle_async_request(request)
      File "/home/ubuntu/5c-ed-sumary-telegram-bot/env/lib/python3.10/site-packages/httpx/_transports/default.py", line 352, in handle_async_request
        with map_httpcore_exceptions():
      File "/usr/lib/python3.10/contextlib.py", line 153, in __exit__
        self.gen.throw(typ, value, traceback)
      File "/home/ubuntu/5c-ed-sumary-telegram-bot/env/lib/python3.10/site-packages/httpx/_transports/default.py", line 77, in map_httpcore_exceptions
        raise mapped_exc(message) from exc
    httpx.LocalProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state ConnectionState.CLOSED
    Any idea why this is happening ? I have deployed lot of deployments before, and never had such a issue
    m
    b
    • 3
    • 7
  • r

    Russell Brooks

    12/01/2022, 6:09 AM
    The RemoteFileSystem example for MinIO doesn’t work when I try to build deployment with it as storage. I suspect their also needs to be a folder at the end of basepath or it will complain about malformed xml. Also once I add my folder I get past the xml error, but it tells me the specified bucket does not exist. How do I find out what’s going on? Been blocked on this for some hours now. As context the same MinIO bucket and folder worked in Prefect 1.0 . In Prefect 1.0 I used S3(bucket=S3_BUCKET, key=f”scripts/{flow_name}”, client_options=dict(endpoint_url=S3_ENDPOINT_URL, aws_access_key_id=KEY, aws_secret_access_key=SECRET_KEY). And this worked. Any ideas how to get that working in 2.0?
    m
    f
    • 3
    • 8
  • a

    Arnoldas Bankauskas

    12/01/2022, 12:26 PM
    Hi Question: did you run your Python (ETL) app at the same container as prefect is living or you create separate container for this ?
    ✅ 1
    a
    • 2
    • 3
  • j

    Joshua Greenhalgh

    12/01/2022, 5:25 PM
    Could someone help me with this please? I want to be able to set SLAs on my flows to kill long running runs - can't seem to work it out from the automations section https://prefect-community.slack.com/archives/CL09KU1K7/p1669375801736019?thread_ts=1668773970.203059&amp;cid=CL09KU1K7
    ✅ 1
    a
    m
    m
    • 4
    • 32
  • j

    Joshua Grant

    12/01/2022, 7:04 PM
    Currently having issues with flows hanging when calling
    get_run_context()
    from within a flow. MRE in 🧵
    ✅ 1
    m
    l
    • 3
    • 18
  • m

    Michael Kahan

    12/01/2022, 7:52 PM
    Hey all, I'm still new to Prefect but haven't been able resolve a deployment error. I'm able to successfully run my main flow file (daily_run.py), but when I run
    deployment build
    I get the following errors (in thread). Perhaps they are related, but at this point I'm stumped. Not sure why it runs but cant be deployed. Any help would be appreciated and I'm happy to provide more info as needed. Thanks!
    ✅ 1
    j
    • 2
    • 6
  • s

    Sam Cook

    12/01/2022, 8:25 PM
    Is there a most correct way to pass mapped results between tasks in a flow? I have a flow that generates a list of items and then repeatedly calls
    map
    to pass those items/results to follow on tasks. Consistently, if I have a large number of items (50+) then the entire process will lock up during the follow on tasks and fail to terminate cleanly. In the UI the root job is marked as
    crashed
    but the tasks are always stuck in a running state. I'm running in a Kubernetes environment as well so the stuck jobs have to be manually cleaned up whenever this occurs as they never reach completion. I think this might possibly be related to @Boggdan Barrientos question from yesterday.
    ✅ 1
    👀 1
    m
    • 2
    • 5
  • b

    Ben Muller

    12/01/2022, 9:45 PM
    Hey prefect - this is a strange one - I got two errors from flows overnight with
    State message: Submission failed. IndexError: list index out of range
    Unfortunately there are no logs in the agent with any failures either 🤷 This did lead me to see some other errors in the agent 🧵
    m
    s
    t
    • 4
    • 20
  • s

    Scott Walsh

    12/01/2022, 11:09 PM
    The
    prefect kubernetes manifest agent
    command seems to only take in one queue. I'd like the agent to read from multiple queues, is there syntax I can change or is this a bug? using prefect==2.6.7
    (scott/CD-562-prefect-poc) prefect kubernetes manifest agent -q test1 -q test2
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: prefect-agent
      namespace: default
      labels:
        app: prefect-agent
    spec:
      selector:
        matchLabels:
          app: prefect-agent
      replicas: 1
      template:
        metadata:
          labels:
            app: prefect-agent
        spec:
          containers:
          - name: agent
            image: prefecthq/prefect:2.6.7-python3.9
            command: ["prefect", "agent", "start", "-q", "test2"]
            imagePullPolicy: "IfNotPresent"
            env:
              - name: PREFECT_API_URL
    m
    m
    • 3
    • 4
  • f

    Faheem Khan

    12/02/2022, 1:06 AM
    prefect 2.7 is it possible to create a bucket on minio using remote file system block. What I am doing at the moment is; when creating a deployment, I create a bucket on mino(from minio UI) first, otherwise I will get an error "bucket doesn't exist"
    ✅ 1
    j
    • 2
    • 1
  • b

    Ben Muller

    12/02/2022, 3:35 AM
    strange one here - I was trying to investigate the state object just by playing around with it in a dummy flow, like so:
    import time
    
    from prefect import flow, get_run_logger
    from prefect.deployments import run_deployment
    
    
    @flow
    def run():
        x = run_deployment(name="dro-gallops-fields/default", idempotency_key=str(int(time.time())), timeout=0)
    
        while x:
            time.sleep(5)
    
            get_run_logger().info(x.state)
            get_run_logger().info(x.state_name)
            get_run_logger().info(x.state_type)
            get_run_logger().info(x.state.message)
            get_run_logger().info(x.state.result)
    
    
    if __name__ == "__main__":
        run()
    This does everything I expect in Prefect Cloud but the logs on the parent flow "run" show something like this ( forever - it never exits the Scheduled state and updates in line with its actual state ? ) : While the subflow run has the flow makred as
    Complete
    - is there a different way to ping the state in prefect 2 ?
    ✅ 1
    m
    • 2
    • 7
  • a

    Andreas Nigg

    12/02/2022, 7:29 AM
    Hey folks - what happens, if a flow runs and prefect 2.0 cloud gets updated? 🤓 🤓 I'm asking because I very rarely but still get the following error when running flows - and it now was 2 times that it somehow was in the same timerange when a release note was published 😅 But obviously that's highly speculative on my side - is there anything I can provide or investigate the issue?
    Crash detected! Execution was interrupted by an unexpected exception: PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<https://api.prefect.cloud/api/accounts/bd169b15-9cf0-41df-9e46-2233ca3fcfba/workspaces/f507fe51-4c9f-400d-8861-ccfaf33b13e4/task_runs/5d8fc4a8-5ce7-444d-8749-135dce75d9be/set_state>'
    Response: {'detail': {'exception_message': 'Internal Server Error'}}
    For more information check: <https://httpstatuses.com/500>
    EDIT: Ok I found a thread in the best-practices-coordination - channel - there it was confirmed that there was a small service interruption. Therefore this question is obsolete.
    ✅ 1
  • d

    Deepanshu Aggarwal

    12/02/2022, 8:30 AM
    hey! is there a way to read the state from flow run object ( returned by run_deployment function) in python?
    b
    • 2
    • 3
  • s

    Sylvain Hazard

    12/02/2022, 9:49 AM
    Hey ! I'm looking at how we are going to deploy our Orion Server on Kubernetes. prefect-helm looks like a really good starting point but we'd like to put an OAuth2 authentication in front of the server for security i.e. making sure only people from our org have access. Is there a recommended way to do this ? Ideally we'd want to deploy flows/blocks from Gitlab CI which means being able to authenticate from terminal.
    ✅ 1
    d
    a
    • 3
    • 3
  • s

    Slackbot

    12/02/2022, 2:14 PM
    This message was deleted.
  • b

    Braun Reyes

    12/02/2022, 3:45 PM
    hola! does anyone know if the snowflake task library connector can support can support private keys with passphrase?
    ✅ 1
    • 1
    • 5
  • x

    Xavier Babu

    12/02/2022, 3:49 PM
    Dear Prefect Community, In Prefect Orion, what is the best way to get the "flow_run_id" (dynamically) in the main flow while running that flow? If you can share any sample code, that would be great,
    ✅ 1
    a
    • 2
    • 2
  • p

    Patrick Alves

    12/02/2022, 4:19 PM
    Hi there, I have dozens of workflows on Prefect Orion (v2.6.5) All my workflows are executed as
    KubernetesJobs
    . I’ve experiencing an issue when I run workflows from UI. 1. I select a Flow -> RUN 2. It schedules a Flow for the next seconds. 3. But the flow keep on pending forever. 4. It actually creates another flow that runs (with no tags, see the images) On the image every squared are actually the same RUN created twice. The original never run, and it creates a new one that runs.
    k
    a
    • 3
    • 13
  • Can I change the task run state from the UI in Prefect 2?
    j

    jack

    12/02/2022, 4:44 PM
    In prefect v1 is there an option to re-run a single task? We had a single task fail, and none of the other tasks depend on it. So other tasks were already in progress and wanted to allow them to keep running, but it would be nice to re-run the failed one.
    ✅ 1
    a
    • 2
    • 4
  • s

    Sean Conroy

    12/02/2022, 5:12 PM
    Any advice on these errors? Seeing this with Flask + Prefect 2.6.8 running in K8's on GCP. Doesn't seem to affect anything, just gives this weird error in the logs:
    Traceback (most recent call last):
    ERROR | asyncio - Task exception was never retrieved
    "future: <Task finished name='Task-36' coro=<<async_generator_athrow without __name__>()> exception=KeyError(139836083032272)>"
    File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/asyncutils.py", line 263, in on_shutdown yield
    GeneratorExit
    During handling of the above exception, another exception occurred:
    File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/asyncutils.py", line 267, in on_shutdown
        EVENT_LOOP_GC_REFS.pop(key)
    KeyError: 139836083032272"
    b
    m
    • 3
    • 18
Powered by Linen
Title
s

Sean Conroy

12/02/2022, 5:12 PM
Any advice on these errors? Seeing this with Flask + Prefect 2.6.8 running in K8's on GCP. Doesn't seem to affect anything, just gives this weird error in the logs:
Traceback (most recent call last):
ERROR | asyncio - Task exception was never retrieved
"future: <Task finished name='Task-36' coro=<<async_generator_athrow without __name__>()> exception=KeyError(139836083032272)>"
File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/asyncutils.py", line 263, in on_shutdown yield
GeneratorExit
During handling of the above exception, another exception occurred:
File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/asyncutils.py", line 267, in on_shutdown
    EVENT_LOOP_GC_REFS.pop(key)
KeyError: 139836083032272"
b

Bianca Hoch

12/02/2022, 5:18 PM
Hello Sean, looks like we may have a GitHub issue open that somewhat mirrors the behavior you're seeing here. This is currently being investigated.
s

Sean Conroy

12/02/2022, 5:19 PM
OK good to know - I'll keep my eye on this for a fix - thanks so much!
🙌 1
m

Michael Adkins

12/02/2022, 6:29 PM
Hey! Would you mind throwing the traceback and any additional details into https://github.com/PrefectHQ/prefect/issues/7709
:upvote: 1
:gratitude-thank-you: 1
This is related to our cleanup of async database engines, which is really sketchy because Python does not actually provide a way to perform cleanup on event loop closure.
Are you running your API separately from your flows? ie with
prefect orion start
?
s

Sean Conroy

12/02/2022, 6:32 PM
Gotcha. Ok so I think unfortunately what I posted above is all the traceback I'm seeing for this... The API is getting created by Flask, I'm not calling
prefect orion start
anywhere.
import os
import json
from flask import Flask, request
import logging
import traceback
from prefect.orion.schemas.states import StateType

app = Flask(__name__)

#################################################
#
#  Flask routes
#
#################################################
@app.route('/', methods=['POST'])
def default_route_main():

    force_exception = location_success = False
    try:
        # Run flow
        flow_state = my_flow(return_state=True)

        force_exception = flow_state.result()
        flow_success = flow_state.type == (StateType.COMPLETED)
This is the basics of the Flask wrapper
m

Michael Adkins

12/02/2022, 6:44 PM
If you point to a existing API, the database connection won’t be managed inside of Flask which should be more stable
s

Sean Conroy

12/02/2022, 6:45 PM
Gotcha...happy to give that a shot. Sorry for the dumb question - but do you have an example somewhere I could look at for how to do that?
m

Michael Adkins

12/02/2022, 6:48 PM
Basically you run
prefect orion start
then set the
PREFECT_API_URL
to localhost in your config.. let me see
https://docs.prefect.io/tutorials/deployments/#run-a-prefect-orion-server
s

Sean Conroy

12/02/2022, 6:49 PM
Thank you!
m

Michael Adkins

12/02/2022, 6:50 PM
For Kubernetes, I think there are better examples
👀 1
Like, you can run this on another pod.
Here’s a helm chart: https://github.com/PrefectHQ/prefect-helm
s

Sean Conroy

12/02/2022, 6:51 PM
Ok yeah - that will help - thanks
m

Michael Adkins

12/02/2022, 6:52 PM
👍 also
prefect kubernetes manifest orion
View count: 1