https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Milenko Beslic

    10/03/2022, 9:59 PM
    Hello, I'm running dbt job with the agent on Kubernetes (Prefect 2.4.5 Python 3.10.5 prefect_dbt 0.2.2) and also using Prefect Cloud. The dbt job finishes, but the flow
    trigger_dbt_cloud_job_run_and_wait_for_completion
    fails with
    AttributeError: 'NoneType' object has no attribute 'call'
    . Here is the log:
    21:33:15.658 | INFO    | Flow run 'charcoal-harrier' - Created subflow run 'hot-civet' for flow 'emails-gold'
    21:33:15.958 | INFO    | Flow run 'hot-civet' - Created subflow run 'cocky-pony' for flow 'Trigger dbt Cloud job run and wait for completion'
    21:33:16.069 | ERROR   | Flow run 'cocky-pony' - Encountered exception during execution:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 589, in orchestrate_flow_run
        result = await flow_call()
      File "/usr/local/lib/python3.10/site-packages/prefect_dbt/cloud/jobs.py", line 287, in trigger_dbt_cloud_job_run_and_wait_for_completion
        run_id_future = get_run_id.submit(triggered_run_data_future)
      File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 492, in submit
        return enter_task_run_engine(
      File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 741, in enter_task_run_engine
        return flow_run_context.sync_portal.call(begin_run)
    AttributeError: 'NoneType' object has no attribute 'call'
    21:33:16.133 | INFO    | Flow run 'cocky-pony' - Created task run 'Trigger dbt Cloud job run-e4229069-0' for task 'Trigger dbt Cloud job run'
    21:33:16.134 | INFO    | Flow run 'cocky-pony' - Submitted task run 'Trigger dbt Cloud job run-e4229069-0' for execution.
    21:33:16.264 | INFO    | Task run 'Trigger dbt Cloud job run-e4229069-0' - Triggering run for job with ID 135917
    21:33:16.558 | INFO    | Task run 'Trigger dbt Cloud job run-e4229069-0' - Run successfully triggered for job with ID 135917. You can view the status of this run at <https://cloud.getdbt.com/#/accounts/xxxxx/projects/156718/runs/86440872/>
    21:33:16.683 | INFO    | Task run 'Trigger dbt Cloud job run-e4229069-0' - Finished in state Completed()
    21:33:16.812 | ERROR   | Flow run 'cocky-pony' - Finished in state Failed('Flow run encountered an exception.')
    21:33:16.812 | ERROR   | Flow run 'hot-civet' - Encountered exception during execution:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 596, in orchestrate_flow_run
        result = await run_sync(flow_call)
      File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
        return await anyio.to_thread.run_sync(call, cancellable=True)
      File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
        return await get_asynclib().run_sync_in_worker_thread(
      File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
        return await future
      File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
        result = context.run(func, *args)
      File "/opt/prefect/flows/emails_deploy.py", line 35, in emails_gold
        run = trigger_dbt_cloud_job_run_and_wait_for_completion(
      File "/usr/local/lib/python3.10/site-packages/prefect/flows.py", line 390, in __call__
        return enter_flow_run_engine_from_flow_call(
      File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 163, in enter_flow_run_engine_from_flow_call
        return run_async_from_worker_thread(begin_run)
      File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 137, in run_async_from_worker_thread
        return anyio.from_thread.run(call)
      File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
        return asynclib.run_async_from_thread(func, *args)
      File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
        return f.result()
      File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
        return self.__get_result()
      File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
        raise self._exception
      File "/usr/local/lib/python3.10/site-packages/prefect/client/orion.py", line 82, in with_injected_client
        return await fn(*args, **kwargs)
      File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 522, in create_and_begin_subflow_run
        return terminal_state.result()
      File "/usr/local/lib/python3.10/site-packages/prefect/orion/schemas/states.py", line 145, in result
        raise data
      File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 589, in orchestrate_flow_run
        result = await flow_call()
      File "/usr/local/lib/python3.10/site-packages/prefect_dbt/cloud/jobs.py", line 287, in trigger_dbt_cloud_job_run_and_wait_for_completion
        run_id_future = get_run_id.submit(triggered_run_data_future)
      File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 492, in submit
        return enter_task_run_engine(
      File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 741, in enter_task_run_engine
        return flow_run_context.sync_portal.call(begin_run)
    AttributeError: 'NoneType' object has no attribute 'call'
    21:33:16.892 | ERROR   | Flow run 'hot-civet' - Finished in state Failed('Flow run encountered an exception.')
    ...
    ✅ 1
    m
    2 replies · 2 participants
  • m

    merlin

    10/03/2022, 11:58 PM
    Noob question Sorry to post this here, but since its part of the Prefect docs it seems appropriate. I'm doing this tutorial from the blog Workflow without DAGs, a little confused by an error from the first code snippet
    p102-13.py
    If I leave
    .result()
    in the code, it fails with error:
    letter = get_letter().result()
    AttributeError: 'str' object has no attribute 'result'
    But remove the
    .result()
    bit and I get success:
    poetry run python src/quickstart/p102-13.py
    16:48:31.385 | INFO    | prefect.engine - Created flow run 'jasper-uakari' for flow 'Result Flow'
    16:48:31.454 | INFO    | Flow run 'jasper-uakari' - Created task run 'Get a letter-c414aff6-0' for task 'Get a letter'
    16:48:31.454 | INFO    | Flow run 'jasper-uakari' - Executing 'Get a letter-c414aff6-0' immediately...
    16:48:31.468 | INFO    | Task run 'Get a letter-c414aff6-0' - A
    16:48:31.478 | INFO    | Task run 'Get a letter-c414aff6-0' - Finished in state Completed()
    16:48:31.478 | INFO    | Flow run 'jasper-uakari' - You got: A
    16:48:31.488 | INFO    | Flow run 'jasper-uakari' - Finished in state Completed('All states completed.')
    The poetry environment is:
    [tool.poetry.dependencies]
     python = "^3.10"
     prefect = "^2.4.5"
     greenlet = "^1.1.3"
    ❤️ 1
    ✅ 1
    m
    j
    3 replies · 3 participants
  • w

    wonsun

    10/04/2022, 4:33 AM
    Hi, i'm a prefect 1.0 user.🤗 We made a program called 'Uploader'(hereinafter referred to as 'uploader')that can be used outside the server, and the function of that program is to record the meta information of the data in the database installed on the server. What I want to do that the prefect flow is executed as soon as the program has finished writing to the database (if the program has performed all the tasks it needs to do). Until now, after the uploader program execution was finished, a person directly executed the prefect flow. There is a way to set this flow as a schedule and run it according to the set schedule regardless of whether the uploader is terminated, but I want to avoid the execution by scheduled because the uploader is not periodically used. What are the appropriate conditions to run the prefect flow automatically after ended the uploader's process? (It's as long as what I wrote above, but...) To summarize, • There is a program(uploader) that works outside the prefect server. • Uploader functions to send data to server by the user and records the program's log and meta information about the data in a specific table of database. • When the execution of uploader is completed, prefect executes flow to preprocess data newly added by the user. • (As-is) When the uploader's task is completed, someone starting the flow of the prefect directly. • I would like to avoid scheduling the flow because the uploader is not used periodically, if we can do it. Isn't there any way out to executing the flow automatically after than end of uploader process?
    :marvin: 1
    ✅ 1
    r
    2 replies · 2 participants
  • i

    Igor Kotua

    10/04/2022, 7:32 AM
    Hi, I have a question about concurrency. I have a queue with concurrency = 1. Yesterday I manually started a job run via UI and also I have scheduled the same flow to run in 10 hours or so. The manually triggered job didn't finished in 10 hours, so another one shouldn't have started (because concurrency = 1), but it did start. Am I doing something wrong?
    c
    33 replies · 2 participants
  • t

    Thomas Fredriksen

    10/04/2022, 9:02 AM
    I was browsing the Prefect codebase trying to understand how notifications are implemented, when I came across how services are implemented. Having injectable/modular services would be awesome, so I create a feature request: https://github.com/PrefectHQ/prefect/issues/7064
    🙌 4
    ✅ 1
    k
    1 reply · 2 participants
  • d

    Deepanshu Aggarwal

    10/04/2022, 9:54 AM
    hi all. i was using github actions to deploy flows and create/update blocks on prefect (v2) it was running well like a week back but today (without any changes to api keys or the workflow definition) the step where prefect config is set has started failing . attaching screenshot for the same.
    c
    4 replies · 2 participants
  • b

    Bogdan Serban

    10/04/2022, 10:05 AM
    Hello everyone! I am trying to deploy prefect orion server behind an nginx reverse proxy, at a different location than
    /
    , for instance
    /prefect/
    . I am having an issue with loading the UI when I access
    <http://localhost/prefect/>
    as the UI cannot load the corresponding assets, because it ignores the location prefix when doing the requests, for instance:
    <http://localhost/assets/index.1a4d60fb.js>
    is available at
    <http://localhost/prefect/assets/index.1a4d60fb.js>
    . Is there a way to specify a prefix or some sort of redirect URL for the UI to use when doing requests?
    t
    t
    +1
    18 replies · 4 participants
  • c

    Connor Vaid

    10/04/2022, 11:10 AM
    Hi all, I’m running into an issue with Prefect GQL where I get the following error message
    {
      "errors": [
        {
          "path": [
            "flow_run",
            0,
            "id"
          ],
          "message": "Cannot return null for non-nullable field flow_run.id.",
          "extensions": {
            "code": "INTERNAL_SERVER_ERROR"
          }
        }
      ],
      "data": null
    }
    When I add the following “where” filter it seems to work (this would suggest I have some flows with null ids?), however when running a query to find flows with null ids nothing is returned.
    {
      "where": {
        "flow": {
          "id": {
            "_is_null": false
          }
        }
      }
    }
    Please let me know if you have any ideas about why this may happen, thanks!
    🙌 1
    c
    j
    6 replies · 3 participants
  • h

    Hamza Naanani

    10/04/2022, 2:21 PM
    Hello How can we deploy multiple agents on the same kubernetes cluster, and the same namespace ? Apparently, with using the helm charts, we can just deploy one agent, if I try to deploy another, I get some problems with the role binding / role itself (as I will be overwriting the current declaration by binding it to another agent).
    ✅ 1
    j
    3 replies · 2 participants
  • j

    Justin Trautmann

    10/04/2022, 3:15 PM
    hello community, hello prefect team, Is there any new rate limiting for the cloud API in place? when using prefect 2 cloud, i'm recently facing a lot of flow crashes with the error message:
    Crash detected! Request to <https://api.prefect.cloud/api/accounts/[...]/workspaces/[...]/task_runs/> failed. 
    ...
    RuntimeError: The connection pool was closed while 65 HTTP requests/responses were still in-flight.
    this is most likely not related to local network issues as it is reproducible across different networks. when using a local orion server instead, the flow succeeds without any issues and a couple of days ago, the flow ran successfully on prefect cloud. I am submitting ~100 parallel tasks using the RayTaskRunner with a local cluster. Any help is much appreciated. python 3.8.10 prefect 2.4.5 prefect-ray 0.2.0.post2
    t
    z
    +1
    7 replies · 4 participants
  • s

    Sean Turner

    10/04/2022, 3:30 PM
    Hi, I'm running prefect (orion and agent) on EKS with
    kubernetesJob
    on EKS orion, agent, and prefect cli are all
    2.4.5
    When I submit a simple task I get the following error in the agent logs:
    HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:anonymous\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"prefect\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}
    Seems that for some reason the service account called
    agent
    isn't being observed?
    system:anonymous
    is being used instead. The
    role
    and
    roleBinding
    both match what is in the helm chart so permissions should be fine.
    $ kgp agent-5fbdcf4bbb-zrrg7 -o yaml | grep serviceAccount
      serviceAccount: agent
      serviceAccountName: agent
    I was not having these problems before upgrading from 2.4.0 -> 2.4.5 😞. I created a new kubernetesjob block for 2.4.5 to silence an error
    ✅ 1
    j
    6 replies · 2 participants
  • i

    Igor Morgunov

    10/04/2022, 3:54 PM
    Hi All. I have an issue with failed flows not deleting their pods in kubernetes. If I run a flow, and then start another run of the same flow, the second (or any subsequent) run will fail with kubernetes throwing a 409, but the pod it was running on doesn’t get deleted and remains in running state - any suggestions how to get around this?
    m
    m
    3 replies · 3 participants
  • a

    Andrei Tulbure

    10/04/2022, 4:14 PM
    He all, Quick question: If I have several tasks inside a prefect flow, and I want the downstream tasks to wait for some upstream tasks (which are mapped), what can I do? These are not sequential flows ... to be able to use the set.upstream and wait for flow. Any help is appreciated.
    ✅ 1
    n
    m
    +1
    23 replies · 4 participants
  • g

    Garren Moore

    10/04/2022, 4:59 PM
    Is
    databricks
    currently supported as a connection/profile type when using prefect-dbt cli commands (like
    trigger_dbt_cli_command
    ?
    RuntimeError: Command failed with exit code 1:
        Could not find adapter type databricks!
    👀 1
    ✅ 1
    a
    10 replies · 2 participants
  • p

    Philip MacMenamin

    10/04/2022, 6:12 PM
    Is there any documentation on how to generate an API key for a self hosted graphql server?
    m
    8 replies · 2 participants
  • s

    Schuyler Duveen

    10/04/2022, 6:38 PM
    Does anyone recommend any particular consulting groups/agencies around data engineering/data warehouses?
    ✅ 1
    a
    1 reply · 2 participants
  • s

    Sean Turner

    10/04/2022, 7:50 PM
    Is it possible to specify separate
    KubernetesJob
    infra blocks in a
    Deployment
    ? e.g.
    flow a
      task b (uses KubernetesJob with docker image foo)
      task c (uses KubernetesJob with docker image bar)
    It kind of seems like I'm limited to to having one
    KubernetesJob
    per
    Deployment
    ?
    ✅ 1
    m
    a
    6 replies · 3 participants
  • b

    Bradley McLaughlin

    10/04/2022, 8:23 PM
    In Prefect 2.0, is it possible to schedule an adhoc flow run at some time in the future?
    ✅ 1
    m
    1 reply · 2 participants
  • y

    YZ

    10/04/2022, 9:43 PM
    Hello, I am new to using Prefect. I have a task defined as below. I noticed when it fails and retries, the input value got lost. original run
    my_task(run_date="20221004")
    retrying run:
    my_task(run_date=None)
    Any idea what I might have done wrong, or any workaround?
    @task(retry_delay=timedelta(minutes=30), max_retries=10)  # wait for up to 5 hours
    def my_task(run_date: str):
    k
    1 reply · 2 participants
  • s

    Sander

    10/04/2022, 9:47 PM
    Hi, is there a nice pod spec for setting up an ephemeral dask cluster using the latest prefect/prefect_dask versions?
    ✅ 1
    m
    6 replies · 2 participants
  • w

    Will Hayes

    10/04/2022, 10:25 PM
    Hi All, I'm having trouble trying to dynamically build flows from a list of tasks. For example, I have tasks 1, 2, and 3 that will all modify the same dataframe. However, they might need to run in different orders or not all depending on user input. Ideally id like to build a list of these tasks to run later. For instance if I want to run task 1, 3, and 2 in that order (with their associated parameters) I'd like to call a task runner with a list of tuples like: [(Task_1, Config_1), (Task_3, Config_3), (Task_2, Config_2)]. I'd appreciate if anyone could point me in the right direction. Thanks!
    ✅ 1
    n
    r
    6 replies · 3 participants
  • t

    Tan Li

    10/04/2022, 11:50 PM
    hi folks, first of all, wanna thank you for creating such a great tool! I recently realized that besides the workflow orchestration, the block is a really convenient feature for me to store sensitive data. I am wondering, if it possible to use the Prefect block as a standalone module? e,g. using
    Secret.load("github-access-token").get()
    in normal python scripts?
    ✅ 1
    m
    3 replies · 2 participants
  • j

    Jelle Vegter

    10/05/2022, 7:32 AM
    Hi all, I’m running a Prefect (1.0) agent on a VM in Azure. After around a week it gets stuck with this error. Once I restart the process it works perfectly fine for another week until it gets stuck again. Anyone have a clue what is happening? Thanks!
    s
    1 reply · 2 participants
  • h

    Ha Pham

    10/05/2022, 8:02 AM
    Hi, I have a flow run whose all of the child tasks were completed but the flow itself is still flagged as Running. what's the possible cause for this? How can I flag this run as finished?
    👀 1
    k
    4 replies · 2 participants
  • h

    Hieu Tran

    10/05/2022, 8:35 AM
    Hello all, is there a way to pass
    timeout_seconds
    of the flow as parameter to customize it when trigger a deployment of that flow manually from UI?
    @flow(timeout_seconds = 15)
    def log_flow(name: str):
        log_task(name)
    As timeout_seconds is in the flow decorator, it won't be shown as a parameter of the flow run
    k
    1 reply · 2 participants
  • e

    Erik

    10/05/2022, 10:36 AM
    the options for cli
    prefect agent
    are:
    ╭─ Options ──────────────────────────────────────────────────────────────────────
    │ --work-queue    -q      TEXT  One or more work queue names for the agent to pull from. [default: None]                                                                                                                                           │
    │ --hide-welcome                                                                                                                                                                                                                                   │
    │ --api                   TEXT  [default: (from PREFECT_API_URL)]                                                                                                                                                                                  │
    │ --tag           -t      TEXT  DEPRECATED: One or more optional tags that will be used to create a work queue [default: None]                                                                                                                     │
    │ --help                        Show this message and exit.                                                                                                                                                                                        │
    ╰─────────────────────────────────────────────────────────────────────────────────
    What I would like to do is launch an agent once, grab the flow_runs that are sitting in the queue (a single call), once those are completed, the agent would close itself. Is there any kind of option for that?
    ✅ 1
    k
    p
    9 replies · 3 participants
  • s

    Steph Clacksman

    10/05/2022, 10:57 AM
    Is there a limit on data size for params for a subflow? I have a flow which passes two dataframes to a subflow, and one of the dataframes causes an error to be thrown:
    TypeError: Dict key must be str
    but the other doesn't. The one causing the error does have ~1 million rows though.
    k
    3 replies · 2 participants
  • q

    Q

    10/05/2022, 12:42 PM
    👋 My flow is a part of a python package and uses utility functions which are also a part of the package (e.g.
    projname/flows/myflow.py
    imports
    projname.utils.utilfunc
    ). This flow is scheduled from a deployment and uses
    Process
    as its infrastructure block. I would like to ensure that the latest version of
    projname
    is installed (to guarantee that
    utilfunc
    is up-to-date), so I can't install the package when setting up agent environment. I think I can install the package before running the flow if I set
    Process.command
    to something like
    ["pip", "install", ".", "&&", "python3", "-m", "prefect.engine"]
    , but maybe there's a better way to go about it?
    k
    5 replies · 2 participants
  • a

    Aaron Lanzen

    10/05/2022, 1:07 PM
    @Will Raphaelson let me in
    ✅ 1
    :blob-attention-gif: 1
    👀 1
    r
    1 reply · 2 participants
  • g

    Garren Moore

    10/05/2022, 2:49 PM
    Running a very basic flow in Prefect Cloud with prefect agent deployed in EKS. In our logging infra, I’m getting
    ERROR prefect.infrastructure.process - Process 'turquoise-woodlouse' exited with status code: -9
    No more messaging beyond that. In Prefect Cloud UI, the flow runs indefinitely and doesn’t show this log message (level set to DEBUG). Anyone familiar with exit code -9? Can’t find any docs.
    ✅ 1
    t
    r
    +1
    10 replies · 4 participants
Powered by Linen
Title
g

Garren Moore

10/05/2022, 2:49 PM
Running a very basic flow in Prefect Cloud with prefect agent deployed in EKS. In our logging infra, I’m getting
ERROR prefect.infrastructure.process - Process 'turquoise-woodlouse' exited with status code: -9
No more messaging beyond that. In Prefect Cloud UI, the flow runs indefinitely and doesn’t show this log message (level set to DEBUG). Anyone familiar with exit code -9? Can’t find any docs.
✅ 1
cc: @Taylor Curran
t

Taylor Curran

10/05/2022, 3:02 PM
Hey Garren, could you share what docs you are working off of?
g

Garren Moore

10/05/2022, 3:10 PM
I can’t find any docs on this
r

Ryan Peden

10/05/2022, 3:10 PM
I believe exit -9 tells us a Python subprocess was killed by the operating system, usually due to running out of memory.
:upvote: 1
The Python subprocess docs explain a little more here: https://docs.python.org/3/library/subprocess.html#subprocess.CompletedProcess.returncode So -9 is saying the process was terminated by Posix signal 9, i.e. SIGKILL
I think that explains why you're seeing that logged via the agent but not seeing anything in the cloud UI - SIGKILL terminates the flow process before it can report anything is wrong
g

Garren Moore

10/05/2022, 3:44 PM
🤦 thank you @Ryan Peden - it does explain the disconnect The pod itself, however, doesn’t crash with OOM 🤔 - will look into it
r

Ryan Peden

10/05/2022, 3:55 PM
It might be because it's happening to a subprocess - IIRC you will only get OOM for a pod if a container's init process gets terminated by the kernel's OOM killer. So in this case, perhaps the subprocess running the flow gets terminated by the Linux kernel, after which the agent (and the pod it is a part of) continue running
m

Michael Adkins

10/05/2022, 4:43 PM
https://github.com/PrefectHQ/prefect/pull/7070 🙂
😀 2
g

Garren Moore

10/05/2022, 4:45 PM
@Michael Adkins 👌
View count: 1