https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Shivam Bhatia

    07/04/2022, 5:24 AM
    Hi, I am trying to host a prefect 2.0 instance on google cloud platform compute engine. Could someone please share some documentation?
    ✅ 1
    a
    • 2
    • 2
  • k

    komal azram

    07/04/2022, 6:21 AM
    Hi I am trying to run a sync connection task in airbyte using prefect. but getting this error sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) table block_schema_reference already exists. when I sync from airbyte it works.
    ✅ 1
    a
    • 2
    • 9
  • b

    Black Spy

    07/04/2022, 7:37 AM
    Hi, Can you help me to do create dynamic DAG flow based on configuration file(Yaml) and also how to do parallel scheduling at the sametime @Anna Geller
    ✅ 1
    a
    • 2
    • 1
  • t

    Tom Matthews

    07/04/2022, 9:16 AM
    Context: I have a number of tasks that are essentially API calls which can be run in parallel and take ~ 1 second each but every second of latency saved is very important. I’d like to use Prefect in production and take advantage of the native async support, but async support is only available in prefect 2.0 and that is not recommended in production. Is there any recommendations here, would it be safe to use prefect 2 in Production, or is there anyway to use asyncio in prefect 1.0 (I couldn’t find any documentation for this), or would it be a good idea to just use the DaskExecutor in prefect 1.0 with the
    threads
    scheduler? 🙏
    ✅ 1
    r
    a
    • 3
    • 12
  • m

    Michele Rossi

    07/04/2022, 9:33 AM
    morning! I have a small installation problem. Prefect requires Python 3.7+ and that's installed on our AWS Linux machines. Problem is, the default Python associated with the 'python' command is Python2. If you try to make Python3 the default incredibly you end up breaking YUM which relies on Python2. What's the best way to solve this problem? In short I need prefect to rely on Python 3 without altering the rest of my machines. I am no Python expert as you have probably guessed 🙂
    ✅ 1
    m
    t
    +2
    • 5
    • 10
  • j

    Joshua Greenhalgh

    07/04/2022, 9:36 AM
    Can I just say how much of a pleasure it is to work with the GQL endpoints - it seems to be all very well designed - I wonder if the code for how you setup the API is publicly available? I need to write my own and looking for insparation...
    ✅ 1
    a
    • 2
    • 4
  • r

    Robin Weiß

    07/04/2022, 1:20 PM
    Hey community! I am currently trying to deploy a K8s Orion setup. After quite some starting problems, I thought I had finally made it. Unfortunately, now I see very weird behaviour: • The agent pod keeps restarting in a CrashLoop. The error message is very lengthy HTTP Read Timeout. Abbreviated message is
    ...
    File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 834, in read_work_queue_by_name
    ...
    httpx.ReadTimeout
    An exception occurred.
    • The agent gives these weird log messages:
    MarkLateRuns took 26.306307 seconds to run, which is longer than its loop interval of 5.0 seconds.
    FlowRunNotifications took 30.444981 seconds to run, which is longer than its loop interval of 4 seconds.
    MarkLateRuns took 30.619028 seconds to run, which is longer than its loop interval of 5.0 seconds.
    My guess is that something is really slowing the container down so that it runs into connection timeout issues as it doesn’t reply in time. Does anyone have any idea where to look further? The error message unfortunately gives me zero insights on the matter 😞 Thanks!
    ✅ 1
    a
    j
    • 3
    • 8
  • r

    Rajvir Jhawar

    07/04/2022, 1:29 PM
    In prefect 2.0 the current agent solution isn't super price effective for a large number of queues. For instance if i use k8 autopilot in GCP I get charged per hour and since agents are always polling that gets pricey. I can run the agents in VM that is still expensive. Each agent running cost about 100-125 MB in memory. If i had 50 queues then i need 50 agents running which is about (50 * 150MB) we looking at a 8 GB VM to run these agents in). And double that if needed additional 50 queues. I would be looking at solution like cloud run (serverless 60 min max run time) to run the agents on. I was wondering if prefect would allow for queue watcher or alerting/webhook feature down the road?
    ✅ 1
    a
    • 2
    • 8
  • s

    Surya

    07/04/2022, 4:01 PM
    Hi All, We have installed python 3.8, prefect 1.2.2 (2022.01 helm chart version) and dask gateway 2022.4.0 on the Google kubernetes engine cluster We have resgistered the prefect flow as well. The flow is appearing in the prefect UI.When we click "quick run", the flow run fails after couple of mins with the below error. The prefect is accessing dask gateway and running jobs. Could you please suggest.
    ✅ 1
    j
    a
    • 3
    • 3
  • s

    Shivam Bhatia

    07/05/2022, 5:42 AM
    Hi, I am trying to self host prefect 2.0 on a gcp cloud engine vm. I have installed prefect on the vm and set PREFECT_API_URL as "http😕/<external-ip>:4200/api" I tried to connect to the prefect instance from my local machine by setting the PREFECT_API_URL on my machine and running this code:
    $ # start python REPL with native await functionality
    $ python -m asyncio
    >>> from prefect.client import get_client
    >>> async with get_client() as client:
    ...     response = await client.hello()
    ...     print(response.json())
    👋
    and I got this error:
    Traceback (most recent call last):
      File "/usr/lib/python3.9/concurrent/futures/_base.py", line 440, in result
        return self.__get_result()
      File "/usr/lib/python3.9/concurrent/futures/_base.py", line 389, in __get_result
        raise self._exception
      File "<console>", line 1, in <module>
      File "/home/shivam/.local/lib/python3.9/site-packages/prefect/client.py", line 111, in get_client
        ctx = prefect.context.get_settings_context()
      File "/home/shivam/.local/lib/python3.9/site-packages/prefect/context.py", line 272, in get_settings_context
        raise MissingContextError("No settings context found.")
    prefect.exceptions.MissingContextError: No settings context found.
    Am I missing something in the setup?
    ✅ 1
    m
    a
    • 3
    • 8
  • g

    Georgi Yanev

    07/05/2022, 7:38 AM
    Hey there, I'm looking for advice or examples of how to dial with secrets(passwords, keys, etc) in prefect 2.0. Can anyone give me a hint where to look for?
    ✅ 1
    r
    a
    • 3
    • 3
  • r

    Robin Weiß

    07/05/2022, 8:05 AM
    Hey there! I am starting to go crazy trying to get the K8s setup to work 😅 I switched from self-hosted API to Cloud 2.0 since I just couldn’t make persistence with K8s PVCs work. Unfortunately, now I get the following error when I start my agent in K8s:
    prefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url '<https://api-beta.prefect.io/api/accounts/XXXXXXX/workspaces/XXXXXXXX/work_queues/name/k8s>'
    Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['path', 'account_id'], 'msg': 'value is not a valid uuid', 'type': 'type_error.uuid'}, {'loc': ['path', 'workspace_id'], 'msg': 'value is not a valid uuid', 'type': 'type_error.uuid'}, {'loc': ['path', 'account_id'], 'msg': 'value is not a valid uuid', 'type': 'type_error.uuid'}, {'loc': ['path', 'workspace_id'], 'msg': 'value is not a valid uuid', 'type': 'type_error.uuid'}, {'loc': ['path', 'workspace_id'], 'msg': 'value is not a valid uuid', 'type': 'type_error.uuid'}], 'request_body': None}
    For more information check: <https://httpstatuses.com/422>
    An exception occurred.
    That error is just so weird that I have no ideas where to look. Any help would be greatly appreciated :) The agent is in a crashloop because of this. I have followed https://discourse.prefect.io/t/how-to-deploy-a-prefect-2-0-agent-to-a-local-kubernetes-cluster-and-connect-it-to-cloud-2-0-backend/979 exactly. The only difference is that I updated to
    prefecthq/prefect:2.0b7-python3.9
    because the b6 image gave me version incompatibility errors when using the CLI tool.
    ✅ 1
    a
    • 2
    • 1
  • r

    redsquare

    07/05/2022, 8:14 AM
    what is your PREFECT_API_URL, you sure it ends with a uuid
    🙌 1
    ✅ 1
    r
    • 2
    • 1
  • s

    Shivam Bhatia

    07/05/2022, 8:16 AM
    Hey, can I run my flows as a vertex ai custom job in prefect 2.0?
    ✅ 1
    a
    • 2
    • 1
  • f

    Florian Kühnlenz

    07/05/2022, 1:05 PM
    Is it possible to have a Kubernetes Agent run a Flow in a different namespace? Can this be set via the job template? (In 1.0)
    ✅ 1
    r
    a
    • 3
    • 14
  • u

    Urban Škudnik

    07/05/2022, 1:44 PM
    Hey everyone! 👋 We're doing a prototype to test out prefect and plan to go with cloud version for staging and production usage, but need to do local development story too and we want to have everything managed by docker-compose locally (and potentially for CI). Unless I'm missing something, https://hub.docker.com/r/prefecthq/prefect assumes you're going to install prefect on host machine and then
    prefect
    would start it's containers? Is that correct? If so, what's the best path to have everything as a docker-compose file that developers can spin up on request? Ar there any samples for that use case?
    ✅ 1
    👋 2
    a
    • 2
    • 5
  • t

    Tom Klein

    07/05/2022, 2:27 PM
    Hello - We have a NodeJS script that we wish to execute as a task, the entire code (and all the NodeJS) dependencies are located on a docker image on ECR, we’re trying out an experimental alternative to running a Kubernetes namespaced job, which would be - to run the image itself as the image of the entire flow (and have the command shell to run the NodeJS script as one of the tasks of the flow) the docker image uses a docker base image of
    node:12-alpine
    (which doesn’t seem to have
    pip
    and possibly not
    python
    either) i tried to add:
    RUN apk update
    RUN apk add py-pip
    RUN pip install prefect[github,aws,kubernetes,snowflake]
    first steps finished fine, but the prefect installation seems to be taking forever (over 25 minutes already) and also has a lot of weird warning messages like:
    Collecting snowflake-connector-python>=1.8.2                                                                                                                     
      Downloading snowflake_connector_python-1.8.7-py2.py3-none-any.whl (168 kB)                                                                                     
      Downloading snowflake_connector_python-1.8.6-py2.py3-none-any.whl (161 kB)                                                                                     
      Downloading snowflake_connector_python-1.8.5-py2.py3-none-any.whl (159 kB)                                                                                     
      Downloading snowflake_connector_python-1.8.4-py2.py3-none-any.whl (161 kB)                                                                                     
      Downloading snowflake_connector_python-1.8.3-py2.py3-none-any.whl (158 kB)                                                                                     
      Downloading snowflake_connector_python-1.8.2-py2.py3-none-any.whl (157 kB)                                                                                     
    INFO: pip is looking at multiple versions of six to determine which version is compatible with other requirements. This could take a while.
    or :
    INFO: This is taking longer than usual. You might need to provide the dependency resolver with stricter constraints to reduce runtime. If you want to abort this 
    run, you can press Ctrl + C to do so. To improve how pip performs, tell us what happened here: <https://pip.pypa.io/surveys/backtracking>
    any ideas?
    ✅ 1
    k
    a
    n
    • 4
    • 45
  • d

    datamongus

    07/05/2022, 3:27 PM
    Does anyone know of any prefect tasks for singer taps ?
    ✅ 1
    k
    a
    • 3
    • 2
  • a

    Austin Anderson

    07/05/2022, 4:49 PM
    Quick note that there is a typo in the documentation - concurrency limit command examples should have dashes ("concurrency-limit") instead of underscores ("concurrency_limit")
    ✅ 1
    k
    m
    • 3
    • 2
  • k

    Kevin

    07/05/2022, 5:28 PM
    Hi! I am working on setting up a simple CI/CD process that registers new flows each time a push is made to main. My folder structure is inspired by the gitlab data team so at the root of my project i have a folder orchestrate. Within that folder i have a folder tasks - within this there are subdirectories that contain custom modules that store tasks we use throughout our flows. While registering flows via the GitHub action, the Prefect CI is complaining about not being able to find a module in the tasks folder. When I run this locally, I do not get any issues. I am pretty sure it is related to my PYTHONPATH setup within the GitHub flow but I have not had any luck troubleshooting. Has anyone out there run into a similar issue?
    ✅ 1
    k
    k
    a
    • 4
    • 10
  • c

    Chris Reuter

    07/05/2022, 5:54 PM
    A couple of virtual events happening this week! https://prefect-community.slack.com/archives/C036FRC4KMW/p1657042607147229
    🙌 1
  • i

    Ifeanyi Okwuchi

    07/05/2022, 7:14 PM
    Hello everyone, I have a task which keeps failing because the
    .map()
    is trying to run the task more times than necessary and the 4th time, the index is a string. .
    product_categories
    is a list containing 3 elements,
    flow_config
    is a dictionary and
    product_category_variables
    is also a dictionary. When the task runs in prefect cloud there are three successful task runs indexed 0, 1, 2 but then it tries to do another run with the index as a string and it fails saying
    Task 'set_dynamic_config_settings['dataset_bucket_path']': Starting task run...
    TypeError: list indices must be integers or slices, not str
    The task
    set_dynamic_config_settings
    looks like this within the flow definition
    with Flow(name="flow-name") as: 
    
       product_category_variables = get_run_variables(
            is_zero_nyp=is_zero_nyp_param,
            bucket_base=flow_config["bucket_base"],
            ltv_product_categories=product_categories,
            return_type="vars",
           )
    
       final_config = set_dynamic_config_settings.map(
            cfg=unmapped(flow_config),
            product_category_variables=unmapped(product_category_variables),
            product_category=product_categories,
            upstream_tasks=[unmapped(product_category_variables)],
           )
    ✅ 1
    k
    • 2
    • 7
  • m

    Mansour Zayer

    07/05/2022, 7:30 PM
    Can someone point me to the right direction please? I'm using Prefect 1.2.2 I have a flow (flow_A) that makes an API call, then flow_B will Transform some of that data, and I want these 2 flow to remain independent while running flow_B as close to flow_A as possible. What is the correct way to ensure that flow_B only starts after flow_A is finished? (I don't want any of these flows to trigger each other, I just want flow_B to wait on flow_A). I tried using
    wait_for_flow_run
    , but I get asked for a
    flow_run_id
    , while I don't have a
    flow_run_id
    , I only have a
    flow_id
    for the parent flow flow_A
    ✅ 1
    k
    • 2
    • 3
  • c

    Constantino Schillebeeckx

    07/05/2022, 9:17 PM
    will an agent query for flows across multiple tenants?
    ✅ 1
    k
    • 2
    • 3
  • m

    Mars

    07/05/2022, 9:33 PM
    Hi, how do I enable debug logging for boto+prefect in my flow when running locally with
    flow.run()
    ? I’ve tried adding this code from the boto3 docs to my flow:
    import boto3
    boto3.set_stream_logger()  # sets logging.DEBUG
    However, ^ that code hasn’t changed the
    flow.run()
    output. For background, I’m trying to use localstack as an AWS stand-in for local pipeline development. The
    S3Download
    task is giving me errors and I want to debug the boto3 connection to find out if the S3 service URL, bucket, and key are correct.
    ✅ 1
    k
    • 2
    • 29
  • j

    Jeff Kehler

    07/06/2022, 3:54 AM
    Is it possible to yield results from a task into another task? I have a task that pulls data from an API in pages, and rather than accumulating ALL of the data into a list before passing it to the next task, it would be much more memory efficient to yield each page at a time.
    ✅ 1
    k
    • 2
    • 5
  • a

    Andreas

    07/06/2022, 10:38 AM
    Hi! In Prefect 2.0, when creating a flow_run for a deployment using API's
    prefect.client.create_flow_run_from_deployment()
    is it possible to set the name for the flow_run? I know that there is such an option when calling
    create_flow_run(name="My flow run name")
    that takes as input a flow_model, what about when creating a run from a deployment though?
    ✅ 1
    a
    m
    • 3
    • 3
  • s

    Surya

    07/06/2022, 12:16 PM
    Hi All, We have installed the prefect server helm version 2022 and dask gateway 2022.4.0 on the google kubernetes engine cluster and registered the flow , logged into the pefect UI and triggered the flow. Then, we encountered the below error Prefect is using the trafiek dask gateway to spin up dask workers and dask scheduler as part of job run. The prefect apollo (Load balancer external IP) is connecting to dask gateway trafiek proxy. Please correct me if i am wrong. prefect UI is the backend service for the Google kubernetes ingress component( IP ) which is leading to the prefect apollo service. We suspect the issue could be source IP ranges in firewall rule which accesses the dask gateway trafiek LB service IP. Please suggest what can be the source IP range in the above firewall rule as per the architecture of the prefect. Can you please let us know which component of the prefect server will communicate with dask trafiek proxy service to deploy and run the flows so that we can consider it for the source IP. Please let us know if any inputs are required. Attaching the error screenshot below.
    ✅ 1
    a
    • 2
    • 7
  • t

    Tom Klein

    07/06/2022, 1:35 PM
    Hello, maybe this is kind of a silly question but - assuming we have a big CSV that we then split into multiple CSVs (as a task), and then, we wanna run a set of tasks for each of these “mini-CSVs” (either in parallel or sequentially, probably sequentially though) — we’re trying to understand what’s the beach approach here (continued in thread…)
    ✅ 1
    a
    s
    k
    • 4
    • 122
  • j

    Josh Paulin

    07/06/2022, 2:04 PM
    Hello. Is it possible to use
    prefect.context.parameters
    when setting a
    Result
    location?
    k
    • 2
    • 1
Powered by Linen
Title
j

Josh Paulin

07/06/2022, 2:04 PM
Hello. Is it possible to use
prefect.context.parameters
when setting a
Result
location?
k

Kevin Kho

07/06/2022, 2:24 PM
Yes if you use a callable like this
🙏 1
View count: 1