https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Arun Giridharan

    09/07/2022, 6:10 PM
    Is there a way to not run some tasks on scheduled flows, but run it on non-scheduled flows?
    ✅ 1
    m
    • 2
    • 3
  • c

    Chris Reuter

    09/07/2022, 7:00 PM
    @justabill and @Jeff Hale are going live on PrefectLive now! Join us on YouTube or Twitch.
    👍 3
    📺 2
  • c

    Chris Gunderson

    09/07/2022, 7:01 PM
    Hi Team - I'm getting an error when attempting to use this library in another class from the task. I'm not sure why it is failing
    from prefect.blocks.system import *
    sraDatabaseSecretName = String.load("sra-database") #This is the name of the secret Traceback (most recent call last): File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/client.py", line 1268, in read_block_document_by_name response = await self._client.get( File "/opt/pysetup/.venv/lib/python3.8/site-packages/httpx/_client.py", line 1751, in get return await self.request( File "/opt/pysetup/.venv/lib/python3.8/site-packages/httpx/_client.py", line 1527, in request return await self.send(request, auth=auth, follow_redirects=follow_redirects) File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/client.py", line 279, in send response.raise_for_status() File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/client.py", line 225, in raise_for_status raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.cause prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url 'http://ephemeral-orion/api/block_types/slug/string/block_documents/name/sra-database?include_secrets=true' Response: {'detail': 'Block document not found'} For more information check: https://httpstatuses.com/404
    n
    • 2
    • 16
  • m

    Mike Vanbuskirk

    09/07/2022, 7:14 PM
    Is it not possible to run ECS tasks with public IP disabled if the private subnet has a NATGW?
    ✅ 1
    m
    a
    • 3
    • 8
  • j

    Joshua Massover

    09/07/2022, 7:55 PM
    While running with a kubernetes agent, I see my flows being killed "randomly". I see various things: • in the agent:
    [2022-09-07 19:14:16,051] DEBUG - agent | Deleting job prefect-job-c10e0512
    • the killing event in my k8s cluster
    apiVersion: v1
    count: 1
    eventTime: null
    firstTimestamp: "2022-09-07T19:13:23Z"
    involvedObject:
      apiVersion: v1
      ...
      kind: Pod
      name: prefect-job-c10e0512-9wm4j
      ...
    kind: Event
    lastTimestamp: "2022-09-07T19:13:23Z"
    message: Stopping container prefect-container-prepare
    ...
    reason: Killing
    ....
    type: Normal
    • i can see via metrics that i am not oom'ing or doing anything that seems like it should trigger the job being killed • a single flow is running on its own node controlled via the kubernetes cluster autoscaler • i don't see any reason why the cluster autoscaler would be killing this node, and safe-to-evict is set to false. • my application logs always just end, there's nothing suspicious in the logs • there aren't obvious patterns to me. it's not the same job, it's not happening after x amount of minutes. • i've switched to threaded heartbeats, and then most recently turned off heartbeats entirely, and it hasn't fixed it 1. there's a chicken/egg that i'm not sure about. in the agent log, is it issuing a request to the k8s cluster to kill a job? or is it deleted after the kubernetes job kills it for some reason? 2. Any suggestions for how to debug a killed flow in a kubernetes cluster using cluster autoscaling? I can see that it's being killed by the event, it's a herculean task to figure out why it's killed.
    ✅ 1
    r
    • 2
    • 5
  • j

    John Mizerany

    09/07/2022, 10:03 PM
    We are trying to find a way to import custom modules alongside our flows. I found this article to see if there was a way to do this, and tried by using
    sys.path.append
    to include the module in our PYTHONPATH but that did not work. We are using Git Remote storage but it seems the agent we are using is not able to pick up on the custom files/modules we wrote in the subdirectory (we are still using prefect cloud 1.0 and the UI when we create a run gives us
    Failed to load and execute flow run: ModuleNotFoundError
    )
    n
    • 2
    • 14
  • a

    Ankur Sheth

    09/07/2022, 10:21 PM
    Hi All, I am trying to install prefect 2.0 on kubernetes cluster using helm chart, is there any documentation related to it and i can refer to, as I am having challenges while deploying it on the on-prem kubernetes cluster, I have been successful in deploying version 1 of prefect and also can use dask cluster, any help is appreciated. I am getting error with VolumeBind Filter for the backend database.
    r
    • 2
    • 1
  • e

    Emerson Franks

    09/07/2022, 11:40 PM
    Hello, I recently upgraded my package from 2.2.0 -> 2.3.2. When attempting to run a deployment build command that worked on 2.2.0, I am now getting the error in my following reply. If I go back to 2.2.0 things work fine. My flow is just the health_check that is in documentation. Did I miss a step when upgrading the package?
    👀 1
    ✅ 1
    k
    s
    • 3
    • 8
  • m

    Mark Li

    09/08/2022, 1:52 AM
    Hi All, I’ve been working at deploying the prefecthq/prefect-orion helm chart to an AKS namespace leveraging the PostgresQL database. After conversations with our enterprise AKS team, the AKS team is trying to reduce the amount of databases in the AKS cluster and have strongly suggested we move to a (external from the AKS cluster) managed database. This makes sense to me in the long-run for productionizing Prefect when needing to redeploy. However, I’m still trying to understand how to set up that connecting from the managed database to the Prefect instance/defining the connection in the helm chart. Tried to look through the documentation for any hints but couldn’t find anything. Would love to hear everyone’s experiences or thoughts with this. Are there better practices for productionizing Prefect or am I headed in the right direction? Thanks!
    ✅ 1
    a
    • 2
    • 8
  • y

    Young Ho Shin

    09/08/2022, 4:28 AM
    Hello all. I am running into various
    sqlalchemy
    errors when running a test flow with many tasks (>10000) locally. Here's the code I'm running: https://gist.github.com/yhshin11/1832bc945446a62c5c6152abb9c1a0a5 It seems like the problem has to do with the fact that there are too many tasks that are trying to write to the Orion database at the same time. I tried switching to a Postgres database as described in the [docs](https://docs.prefect.io/concepts/database/), and also adding concurrency limit of 10. Neither seems to fix the issues. Any ideas about how to fix this? Here's an example of the kind of errors I'm getting:
    sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30.00 (Background on this error at: <https://sqlalche.me/e/14/3o7r>)
    ✅ 1
    h
    a
    • 3
    • 2
  • z

    Zac Hooper

    09/08/2022, 4:31 AM
    Hey all, Is there anyway to remotely remove/stop a local agent? A colleague has started a local agent with the same label as our regular agents and has been unreachable today to stop it so some of our flows are failing when sent there. I'm hoping there's a solution other than changing the label on all the flows. We're still on Prefect V1. Thanks 🙂
    ✅ 1
    r
    • 2
    • 2
  • a

    Andreas Nord

    09/08/2022, 9:06 AM
    Hi! What is the proper way of scheduling a set of flows to run at for example midnight, but only one flow running at a time. So in a sequence (where order does not matter). I tried by just scheduling all the flows at the same time, one flow runs and the rest fails. I am running with Docker and the error message does not seem to be working properly:
    Untitled.txt
    r
    • 2
    • 3
  • b

    Brad

    09/08/2022, 9:08 AM
    Hey team - I'm having some troubles running the orion agent under supervisor..
    ✅ 1
    a
    • 2
    • 7
  • d

    David Peláez

    09/08/2022, 9:58 AM
    Hi dear community, I am new at prefect, and I wanted to use the REST API with the prefect cloud. But the documentation does not specify which is the base URL and how to authenticate to it. Could you please clarify that for me. Thanks in advance
    a
    • 2
    • 1
  • v

    Vadym Dytyniak

    09/08/2022, 10:47 AM
    Hello. I am trying to apply deployment and I see that the whole project where I have a lot of flows will be uploaded to s3(my storage block). Why I need all this files there? I expect I have to upload only one file with flow code, right?
    a
    • 2
    • 7
  • b

    Bradley Collins

    09/08/2022, 12:30 PM
    Hi everyone. We currently have a task which should take roughly 8 hours to finish running. This pipeline (wrapped in a task) is still in development and now and again it will not terminate. From what I can tell we can set a timeout in Prefect on the long running task but in this case we do not want to stop the task but rather send out a Slack notification to enable manual intervention. Is there a pattern/feature in Prefect 1.0 that allows for this?
    r
    • 2
    • 1
  • a

    Anirudh MV

    09/08/2022, 12:38 PM
    Hi all, I’m getting error
    No such command 'backend'.
    Please help me fix this. Could this be because my prefect client and prefect server might be running different versions?
    r
    • 2
    • 1
  • e

    Eli Treuherz

    09/08/2022, 2:04 PM
    I’ve got a backfilling flow that is supposed to set off several thousand tasks, which ideally I’d have on the ConcurrentTaskRunner. When running locally on a powerful laptop this works fine, but on an Agent its CPU goes right to its limit but only a few tasks actually complete before it just stops doing anything. Is this to be expected at the minute or a surprise? Would this go better if I broke it up into lots of subflows instead of lots of tasks?
    r
    • 2
    • 1
  • r

    Raghuram M

    09/08/2022, 2:11 PM
    Hello Prefect Community, Great to be here! I am running a flow on prefect2 with a few tasks which have dependencies on local modules. We are running the tasks on a dask cluster and are using a s3 block storage. The issue I run into is recreating the local dependencies on the dask workers. Any suggestions on how you go about ensuring that the script environment is available to the dask worker?
    :watching: 1
    m
    b
    • 3
    • 11
  • f

    Felipe Fernandez

    09/08/2022, 2:54 PM
    Hello community, has anyone had issues implementing Prefect with Selenium? My case is really special and I can only use web scraping to get the data I need.
    ✅ 1
    r
    • 2
    • 3
  • d

    Dat Tran

    09/08/2022, 3:07 PM
    hey is there any “easy” tutorial on how to use Prefect 2.0 with AWS EKS, so far I haven’t found any and it’s kind of discovery mode for me.
    ✅ 1
    a
    • 2
    • 7
  • i

    Igor Morgunov

    09/08/2022, 3:28 PM
    Hi All, In the UI is it possible to somehow display the value that was mapped to the child task instead of
    Mapped Child X
    ? I know I can get it from logs, but would be cool if I could see it in this screen
    ✅ 1
    a
    • 2
    • 1
  • s

    Slackbot

    09/08/2022, 3:35 PM
    This message was deleted.
  • j

    Jeffery Newburn

    09/08/2022, 4:56 PM
    Howdy. We are using ORION and we have 2 agent servers running against the same worker queue. It appears they are both picking up the same jobs instead of delegating only 1. Anyone have something basic we might be missing?
    ✅ 1
    j
    • 2
    • 2
  • v

    Venkat Ramakrishnan

    09/08/2022, 5:07 PM
    Basic question about Prefect cloud. Please bear with me. So far have been using Prefect locally as follows: 1. pip install prefect 2. For the deployment, use a Python script inside which import Prefect and use the 'Deployment' 3. Then run 'prefect orion start' and it shows the scheduled runs and status at 127.0.0.1:4200 4. Then run 'prefect agent start' with queue name to serve the scheduled runs. Now looking at Prefect cloud, and have the following question: Should I do 'pip install prefect' anymore? If not and if prefect cloud takes care of it, how then would the Python program know about the 'Deployment', i.e. where do I import 'Deployment' from? Or should I stop the Python part altogether and schedule the deployment from Prefect Cloud? If yes, I presume that in the Prefect Cloud, I will have to specify the script that needs to be run at the scheduled time? In this case, my script will exist in an AWS EC2 instance, how do I give access to it to the Prefect Cloud? Thanks.
    ✅ 1
    j
    • 2
    • 4
  • r

    Roger Webb

    09/08/2022, 5:19 PM
    With Prefect 1.0, if I have multiple projects in my Prefect system... and I want to create a role that ... for instance, can only run flows based in Project A .. but not project B (Or even SEE Project B) where is that in the Customer Role section?
    ✅ 1
    j
    • 2
    • 1
  • m

    Mark Li

    09/08/2022, 5:59 PM
    Hi All, I deployed the helm chart prefecthq/prefect-orion. For some reason, my pod is going into the CrashLoopBackOff state. Looking into the logs, I get this:
    Error: No such command 'orion'.
    I’m assuming this failure is coming from when it’s calling ‘prefect orion start’ Does anyone know what could be causing prefect to not recognize the orion command?
    r
    • 2
    • 1
  • v

    Venkat Ramakrishnan

    09/08/2022, 6:02 PM
    I created an API key in Prefect Cloud, and then I tried logging to the cloud account using the following command, and I get this error: (venv) D:\final>prefect cloud login --key <key-value> Unable to authenticate with Prefect Cloud. Please ensure your credentials are correct. Apart from configuring the API Key in the cloud, do I need to do anything else (in Cloud, or in the client machine) ? Thanks.
    ✅ 1
    j
    • 2
    • 1
  • a

    Andrew Pruchinski

    09/08/2022, 6:17 PM
    Good Afternoon! I have a question regarding using a snowflake query with wildcard
    %
    and pandas.read_sql. There seems to be an issue during compilation during a prefect flow run where it's complaining about the wildcard. We can run the queries outside of a prefect run no problem. When executing in prefect, we are getting formatting errors. Errors listed in threads
    ✅ 1
    j
    • 2
    • 8
  • m

    Marc Lipoff

    09/08/2022, 6:30 PM
    Im looking to implement a real-time (ish) pipeline in v1. An external system drops s3 files every minute or so. I'd like to poll s3 on a fixed-time-basis to see what files have been modified since the last poll. And if there are new files, grab them, and put them in the database. I used this as inspiration. What I did was first create "chunks" (or windows) to look at. So a list of n windows, with the tuple of the start/end time of the window. Then, mapping over each window, run a task that: (a) if the end of the window hasn't passed yet, throw RETRY signal with start_time=window_end_time, or (b) find the s3 files that were modified within the window. What seems to work pretty good, EXCEPT... when running the flow, the tasks that are in a pending state (from (a) above) are utilizing the available slots (flow is setup with num_threads=8). So essentially, my pipeline is not progressing to push data to a db until the last window has passed (not the intention).
    a
    • 2
    • 5
Powered by Linen
Title
m

Marc Lipoff

09/08/2022, 6:30 PM
Im looking to implement a real-time (ish) pipeline in v1. An external system drops s3 files every minute or so. I'd like to poll s3 on a fixed-time-basis to see what files have been modified since the last poll. And if there are new files, grab them, and put them in the database. I used this as inspiration. What I did was first create "chunks" (or windows) to look at. So a list of n windows, with the tuple of the start/end time of the window. Then, mapping over each window, run a task that: (a) if the end of the window hasn't passed yet, throw RETRY signal with start_time=window_end_time, or (b) find the s3 files that were modified within the window. What seems to work pretty good, EXCEPT... when running the flow, the tasks that are in a pending state (from (a) above) are utilizing the available slots (flow is setup with num_threads=8). So essentially, my pipeline is not progressing to push data to a db until the last window has passed (not the intention).
Exceprts of the code: tasks in flow...
last_modified_begin, last_modified_end, partition = make_chunks(
        target_start_time,
        to_timedelta(poll_freq),
        to_timedelta(total_duration_to_run),
    )

    # lists the files in the chunk (based on modified timestamp)
    files = S3ListGracefulAndWait(bucket=s3_bucket_name.run()).map(
        prefix=unmapped("fargate"),
        last_modified_begin=last_modified_begin,
        last_modified_end=last_modified_end,
    )

    df = read_files.map(files)
    ### and then take the dataframe and push to database, ...
and the s3 task class
class S3ListGracefulAndWait(S3List):
    def run(
        self,
        prefix: str,
        partition: str,
        last_modified_begin: datetime.datetime,
        last_modified_end: datetime.datetime,
    ) -> list[str]:

        # using the partitions is important because of the number of files. without at least pre-filtering on dt, the s3list takes way too long
        prefix += "/dt=" + partition

        if last_modified_end < now_():  # has passed
            try:
                log().info("Starting to list s3 files...")
                res = super().run(
                    prefix=prefix,
                    last_modified_begin=datetime_to_string(last_modified_begin),
                    last_modified_end=datetime_to_string(last_modified_end),
                )
                log().info(
                    f"S3ListGracefulAndWait run prefix={prefix} last_modified_begin={last_modified_begin} last_modified_end={last_modified_end}. Result={res}"
                )
                if len(res) == 0:
                    raise signals.SKIP(
                        f"No files available for dt={partition} {last_modified_begin} to {last_modified_end}"
                    )
                return [f"s3://{self.bucket}/{x}" for x in res]
            except Exception as e:
                log().error(e, exc_info=True)
                raise signals.SKIP(
                    f"Failed to get s3 files for dt={partition} {last_modified_begin} to {last_modified_end}. {e}"
                )
        else:
            raise signals.RETRY(
                message=f"Going to retry at {last_modified_end}",
                start_time=last_modified_end,
            )
While this the way I thought was best to do it, I'm open to other ideas. I want the flow to be able to poll regularly for new files. And if there is a new file, proceed asynchronously with processing the file (ie, dont wait until the previous file is processed to do the next poll)
a

Anna Geller

09/09/2022, 1:43 AM
Hi Marc, we have a couple of blog posts on that for Prefect 2, check if you're interested https://annageller.medium.com/serverless-real-time-data-pipelines-on-aws-with-prefect-ecs-and-github-actions-1737c80da3f5
And https://annageller.medium.com/event-driven-data-pipelines-with-aws-lambda-prefect-and-github-actions-b3d9f84b1309
View count: 3