https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • t

    Tomás Emilio Silva Ebensperger

    09/28/2022, 6:17 PM
    I am running an agent through supervisorin a remote server/ I am specifying the path for prefect to run.
    command=/home/ubuntu/project/env/bin/prefect agent start --work-queue "my queue"
    This works fine but then when i run flows in the UI the worker is using the global python runtime and no the one in the virtual environment, please help
    j
    1 reply · 2 participants
  • c

    Chris Gunderson

    09/28/2022, 6:34 PM
    If I wanted to parameterize a flow, how would I do that? I'd like to pass an array of strings (email addresses). The flow below works, but I'm not sure it is the proper way to do things. Please ignore the class name, just trying to test.
    from pydantic import BaseModel
    
    
    class CustomList(BaseModel):
        data: list[str]
    
    
    @flow(name = "Fidelity Allocations")
    def FidelityAllocationsFlow(toEmail: CustomList):
       #code....
    
    if __name__ == "__main__":
        emailAddress = CustomList(data=['<mailto:cgunderson@spiderrockadvisors.com|cgunderson@spiderrockadvisors.com>'])
        FidelityAllocationsFlow(toEmail=emailAddress)
    ✅ 1
    m
    7 replies · 2 participants
  • c

    Chris Reuter

    09/28/2022, 6:58 PM
    Hey all - having some Twitch API issues so we'll be live on YouTube only: https://www.youtube.com/c/PrefectIO
    :upvote: 2
    ✅ 2
  • a

    Amol Shirke

    09/28/2022, 7:23 PM
    Hi All, Running prefect server which is to be accessed via load balancer what URLs should be used in config. Should it be load balancer url or host and port. Same question for agent. Thanks
    ✅ 1
    j
    c
    11 replies · 3 participants
  • a

    Andreas Nigg

    09/28/2022, 7:59 PM
    Hey, on prefect 2 cloud I randomly encounter "internal server error"s when the agent wants to communicate with the server (exception see in thread). I have a flow which spawns up to 50 Tasks (the tasks are all done within seconds). I run the flow currently every 5 minutes. Am I running in some sort of api limitation due to too many tasks? Is there something to prevent this internal server error from happening? This error happens approx. every 15th time, all other times, the flow runs just fine.
    File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 226, in raise_for_status
        raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
    prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url '<https://api.prefect.cloud/api/accounts/bd169b15-9cf0-41df-9e46-2233ca3fcfba/workspaces/f507fe51-4c9f-400d-8861-ccfaf33b13e4/task_runs/|https://api.prefect.cloud/api/accounts/bd169b15-9cf0-41df-9e46-2233ca3fcfba/workspaces/f507fe51-4c9f-400d-8861-ccfaf33b13e4/task_runs/>'
    Response: {'exception_message': 'Internal Server Error'}
    m
    a
    9 replies · 3 participants
  • r

    Rio McMahon

    09/28/2022, 8:34 PM
    Hello, with several long running flows I get an error like:
    prefect.infrastructure.process - Process 'meticulous-manatee' exited with status code: -9
    Any guidance?
    ✅ 1
    m
    14 replies · 2 participants
  • n

    Nick DeCraene

    09/28/2022, 8:45 PM
    hi there, in prefect 1.0 we could dynamically assign the task names with input params, something like:
    @task(name="extract(ctx.id)")
    def extract(ctx):
    These would then be displayed in the UI as
    extract(4)
    for example if our flow was running for ctx.id=4. I haven't been able to find anything like this for 2.0, am I just missing something?
    ✅ 1
    m
    3 replies · 2 participants
  • j

    James Constable

    09/28/2022, 10:09 PM
    What is the relationship between an agent running on kubernetes and a kubernetesJob defined in infrastructure... • Does the kubernetes agent start the kubernetes job? ◦ if so, do I need a kubeconfig in the infrastructure block when the agent is already on the cluster? • If I run the job in an infrastructure process block from a kubernetes agent, will it run on the kubernetes agent?
    ✅ 1
    m
    2 replies · 2 participants
  • a

    Andrew Reeve

    09/28/2022, 10:56 PM
    Hey team. Can someone help with releasing the latest version of prefect-gcp? https://github.com/PrefectHQ/prefect-gcp/pull/61 The current version has a bug with credentials that I think will be solved with the commit from yesterday by @Andrew Huang
    👀 1
    ✅ 1
    a
    6 replies · 2 participants
  • a

    Andrew Reeve

    09/28/2022, 10:57 PM
    I’m getting the following error:
    AttributeError: 'Credentials' object has no attribute 'project'. Did you mean: 'project_id'?
    It looks like your PR will fix this @Andrew Huang.
    ✅ 1
    a
    1 reply · 2 participants
  • d

    Deepak

    09/28/2022, 11:08 PM
    Has anyone here tried running pyspark inside a prefect task? I get
    RuntimeError: Java gateway process exited before sending its port number
    when I run pyspark methods inside a task.
    m
    3 replies · 2 participants
  • Issues when using DaskTaskRunner at scale - running out of available connections when connecting to Orion DB across distributed workers
    j

    Jean

    09/28/2022, 11:10 PM
    Hey guys, I’m running a flow with a DaskTaskRunner that spawns a task that takes around 20seconds and apparently it’s running sequentially?! My machine has 16 threads and I see in the UI each task only being run after another one finishes. Any inputs?
    23:04:54.057 | INFO    | prefect.engine - Created flow run 'optimal-snail' for flow 'test1'
    23:04:54.057 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
    23:04:55.980 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at <http://127.0.0.1:8787/status>
    23:05:00.371 | INFO    | Flow run 'optimal-snail' - Created task run 'Execute values of the query-8165e3c8-0' for task 'Execute values of the query'
    23:05:00.372 | INFO    | Flow run 'optimal-snail' - Executing 'Execute values of the query-8165e3c8-0' immediately...
    23:05:24.612 | INFO    | Task run 'Execute values of the query-8165e3c8-0' - Finished in state Completed()
    23:05:26.370 | INFO    | Flow run 'optimal-snail' - Created task run 'Execute values of the query-8165e3c8-1' for task 'Execute values of the query'
    23:05:26.370 | INFO    | Flow run 'optimal-snail' - Executing 'Execute values of the query-8165e3c8-1' immediately...
    23:05:42.533 | INFO    | Task run 'Execute values of the query-8165e3c8-1' - Finished in state Completed()
    23:05:44.295 | INFO    | Flow run 'optimal-snail' - Created task run 'Execute values of the query-8165e3c8-2' for task 'Execute values of the query'
    23:05:44.296 | INFO    | Flow run 'optimal-snail' - Executing 'Execute values of the query-8165e3c8-2' immediately...
    23:06:09.538 | INFO    | Task run 'Execute values of the query-8165e3c8-2' - Finished in state Completed()
    As you see in these logs it’s not really spawning more tasks. I was under the impression that the call to a function with the
    @task
    decorator would be non-blocking if made within a flow
    @flow
    which uses DaskTaskRunner
    ✅ 1
    m
    a
    64 replies · 3 participants
  • m

    Michael Holvey

    09/29/2022, 2:32 AM
    Hi everyone! Just getting started with orchestrating DBT (CLI) flows using Prefect. Having a weird error where the "trigger_dbt_cli_command" is telling me I'm specifying an invalid project directory because I'm missing a dbt_project.yml file (I'm not). Any idea what's going on? Sample code below:
    from prefect import flow
    from prefect_dbt.cli.commands import trigger_dbt_cli_command
    
    @flow
    def run_dbt() -> str:
        result = trigger_dbt_cli_command("dbt run", project_dir='C:\\Users\\36350admin\\arrow_dbt-1')
        return result
    
    run_dbt()
    It even tells me it's running this dbt command, and when I run that by itself in the same working directory, it works as expected!
    22:29:08.084 | INFO    | Task run 'trigger_dbt_cli_command-321ca940-0' - Running dbt command: dbt run --profiles-dir C:\Users\36350admin\.dbt --project-dir C:\Users\36350admin\arrow_dbt-1
    ✅ 1
    j
    a
    5 replies · 3 participants
  • m

    marque

    09/29/2022, 5:56 AM
    Hi, it seems like DockerContainer block does not support ECR image yet? My agent is failing to authenticate with ECR. I'm basically doing something like this https://github.com/anna-geller/prefect-docker-deployment/blob/main/blocks/dockerhub_block.py but with ecr
    ✅ 1
    a
    8 replies · 2 participants
  • a

    Andreas Nord

    09/29/2022, 9:24 AM
    Hi! I have a flow which only calls two subflows (and nothing else). The subflows show up as completed but the main flow is stuck at running... I've had related problems that seemed to be because of memory issues, but my subflows don't return anything so I don't see how they can complete without the main flow completing
    r
    5 replies · 2 participants
  • v

    Vadym Dytyniak

    09/29/2022, 10:27 AM
    Hello. Is it possible to stream task runner initialisation logs to the Cloud UI logs? I am using DaskTaskRunner and user can't see logs about cluster creation and dashboard url.
    c
    30 replies · 2 participants
  • i

    Igor Kotua

    09/29/2022, 10:51 AM
    Hey there, I have a question about the "right way" to run prefect agents. I have the following setup: local environment (where I quickly test flows and tasks), github repo with CI which automatically redeploys new code, remote machine (which clones code from github, runs prefect deployment build and then starts prefect agent). And finally I am using Prefect Cloud for all remaining things. I am wondering that would happen If I redeploy my codebase (adding new flow and deployment for instance) and at the same time I would have some flows running. Would these flows be able to resume at the "right point"? And in general, what is the best practice for redeploying the codebase with prefect if I use only one machine? Thanks a lot! P.S. I used Airflow heavily before Prefect and now I am migrating some jobs to Prefect.
    ✅ 1
    c
    6 replies · 2 participants
  • d

    David

    09/29/2022, 11:29 AM
    the prefect 2.x documentation regarding infrastructure such as for cloudrun and kubernetes job is way too scarce.
    :thank-you: 1
    a
    c
    3 replies · 3 participants
  • p

    Pedro Henrique

    09/29/2022, 1:08 PM
    Hello, I have a problem when migrating to prefect cloud 2.0. i have a virtual machine and after running pip install -U prefect, pip doesn't work anymore, any lib I want to install doesn't work and the following message is displayed: AttributeError: module 'lib' has no attribute 'X509_V_FLAG_CB_ISSUER_CHECK' I tried to uninstall prefect and install again with the updated version and still the same error.
    ✅ 1
    j
    1 reply · 2 participants
  • e

    Emma Rizzi

    09/29/2022, 1:25 PM
    Hello! I use prefect 1 for flows that write data in a NFS, with Kubernetes and Docker runs, and docker storage for each flow. It seems that all flows run as root in their container as all files are owned by root when generated by the flow. I need to change the ownership of files generated, is it possible to change the user that flows execute as ? Thanks 🙂
    ✅ 1
    c
    9 replies · 2 participants
  • r

    Rudy García

    09/29/2022, 1:38 PM
    Hi! does anyone know if it is possible to have multiple users and workspaces using a self-hosted prefect orion server? and if so, how could i install this server? thaaanks
    ✅ 1
    c
    4 replies · 2 participants
  • n

    Nic

    09/29/2022, 1:42 PM
    When I host the ui locally (Orion) I don't get the opportunity to create new blocks - is this integration only supported in cloud version?
    ✅ 1
    c
    10 replies · 2 participants
  • c

    Ching

    09/29/2022, 2:06 PM
    I have a question and an idea with regards to caching: 1. How can I forceful clean up cache without an expiry time? 2. If I have a task dependency: A->B->C and both A and B are cached, will there be any result transfered from A to B? In theory I would be able to only get cached returned values from B and run C. 3. It would be nice to have configurable cache locations by means of env var for each flow run. Currently when using input hash and with multiple users run the same task, prefect will try to fetch the cache from the first users, which raises permission issues depending. As a workaround I can get add the user to the hash function, but that feels complicated. Any thoughts?
    r
    2 replies · 2 participants
  • a

    Aniruddha Bharadwaj

    09/29/2022, 2:20 PM
    Hi All, Am trying to use the prefect-email collection to send a notification via office 365 and getting the below error. Would anyone know how to fix this issue?
    File "/usr/local/lib/python3.9/site-packages/prefect_email/credentials.py", line 138, in get_server
        server = SMTP_SSL(smtp_server, smtp_port, context=context)
      File "/usr/lib64/python3.9/smtplib.py", line 1050, in __init__
        SMTP.__init__(self, host, port, local_hostname, timeout,
      File "/usr/lib64/python3.9/smtplib.py", line 255, in __init__
        (code, msg) = self.connect(host, port)
      File "/usr/lib64/python3.9/smtplib.py", line 341, in connect
        self.sock = self._get_socket(host, port, self.timeout)
      File "/usr/lib64/python3.9/smtplib.py", line 1057, in _get_socket
        new_socket = self.context.wrap_socket(new_socket,
      File "/usr/lib64/python3.9/ssl.py", line 501, in wrap_socket
        return self.sslsocket_class._create(
      File "/usr/lib64/python3.9/ssl.py", line 1041, in _create
        self.do_handshake()
      File "/usr/lib64/python3.9/ssl.py", line 1310, in do_handshake
        self._sslobj.do_handshake()
    ssl.SSLError: [SSL: WRONG_VERSION_NUMBER] wrong version number (_ssl.c:1129)
    ✅ 1
    c
    2 replies · 2 participants
  • s

    Slackbot

    09/29/2022, 3:32 PM
    This message was deleted.
  • d

    David Cupp

    09/29/2022, 4:37 PM
    I'm seeing some odd behavior in the Cloud UI that is not happening locally. I have
    2.4.0
    installed locally. For example, if I create the deployments
    Foo(customer=1)
    ,
    Foo(customer=2)
    and
    Foo(customer=3
    all under the flow name
    Foo
    , I can go to my local UI, clicked on "Flows" and then "Foo", and I see those deployments. But in the Prefect Cloud 2.0 UI, if I go to "Flows" and click on "Foo", no deployments appear! They do appear if I click on deployments. Is this the expected behavior of the UI? (i.e. is there some criteria for a flow deployment to appear on the flow page?)
    ✅ 1
    m
    j
    +1
    16 replies · 4 participants
  • j

    Joe

    09/29/2022, 5:34 PM
    Question regarding upgrading Prefect 1.0 -> 2.0 Container Execution: All of my Prefect 1.0 flows run Docker containers, they aren't flows in Docker containers so in the course of upgrading to Prefect 2.0, the Docker Infrastructure block seems counterproductive. Do I need to rewrite all of the Docker Client tasks from Prefect 1.0 to maintain their functionality?
    n
    4 replies · 2 participants
  • e

    Esdras Lopes Nani

    09/29/2022, 5:43 PM
    Hi all! I'm currently developing a flow with async and sync tasks. Tasks A and B run successfully but the execution freezes and I even can't see the log for task c, the last log is
    Task run 'b' - Completed()
    . If I force task c to be async runs without a problem. Is that a current bug? Using Prefect 2.3.1
    @task
    async def a():
      # do async
    
    @task
    async def b():
      # do async
    
    @task
    def c():
      # do sync
    
    @flow
    async def main():
      a = await a()
      b = await b(wait_for=[a])
      c = c(wait_for=[b])
    
    asyncio.run(main())
    Thanks!
    ✅ 1
    s
    1 reply · 2 participants
  • n

    Nic

    09/29/2022, 5:46 PM
    is it possible to connect and deploy to prefect through azure when hosted locally like it is with prefect cloud?
    ✅ 1
    c
    8 replies · 2 participants
  • n

    Nace Plesko

    09/29/2022, 5:51 PM
    Hi, Is this a known issue that there are prefect jobs running, but the "runs in progress" is showing 0? We are running into concurrency limits because some tasks are stuck running for ever, but there is no easy way to find those runs, except digging into each flow one by one. Is there a way I can stop all running flows (even though the dashboard is showing 0 running flows)?
    ✅ 1
    1️⃣ 1
    b
    19 replies · 2 participants
Powered by Linen
Title
n

Nace Plesko

09/29/2022, 5:51 PM
Hi, Is this a known issue that there are prefect jobs running, but the "runs in progress" is showing 0? We are running into concurrency limits because some tasks are stuck running for ever, but there is no easy way to find those runs, except digging into each flow one by one. Is there a way I can stop all running flows (even though the dashboard is showing 0 running flows)?
✅ 1
1️⃣ 1
b

Bianca Hoch

09/29/2022, 7:27 PM
Hi Nace! Assuming this is for 1.0, can you try running the following query in the interactive api?
{
  task_run(
    where: {
      created: {_lt: "2022-09-29 00:00:00", _gte: "2022-09-01 00:00:00"}, 
      state: {_eq: "Running"},
      }
  ) {
    id
    name
       flow_run_id
    task {
      name
      id
    }
  }
}
This should get you all of the task runs stuck in a 'Running' state, as well as the corresponding flow run IDs. After that, you should be able to cancel those flow runs from the UI.
If you're unable to cancel them from the UI, you can run the following query to cancel the task runs
mutation setTaskRunStates($input: [set_task_run_state_input!]!) {
set_task_run_states(input: {states: $input}) {
  states {
  id
  status
  message
  __typename
  }
  __typename
}
}
When running the query above, the query variables will need to be set in the API like so
{
  "input": {
    "task_run_id": "task_runid",
    "version": 1,
    "state": {
      "type": "Cancelled",
      "message": "marked task run as Cancelled because \"test\""
    }
  }
}
n

Nace Plesko

09/29/2022, 9:19 PM
@Bianca Hoch yes we are on prefect v1. Thank you!! Honestly I didn't even know that interactive api exists, this opens a whole new world to me, thanks!!
✨ 1
🚀 1
@Bianca Hoch I apologize for my lack of graphql knowledge, but I can't figure it out why this is not a valid json. It's complaining that it's expecting json, but I can't figure it out how my input is not json. Is there something obvious that I'm doing wrong?
b

Bianca Hoch

09/30/2022, 2:52 PM
The same thing occurred when running the query in the screenshot, however, the task was still able to be set to "Cancelled". Would you mind sharing the error you receive when you run the query?
n

Nace Plesko

09/30/2022, 3:05 PM
You are right, the query actually worked for most of the cases! However I was unable to cancel 4 task runs, they are all a few months old. I'm getting the error that is in the screenshot. Do you know how could I cancel those as well?
b

Bianca Hoch

09/30/2022, 3:06 PM
Hmmmm...I presume those flows may have been deleted
Can you provide those 4 task run IDs here? We can look into removing them on our end.
n

Nace Plesko

09/30/2022, 3:13 PM
They are still counting towards concurrency limit though. Yes that would be great if you could delete those (or if there was a way I could delete them). Turns out there are more than 4 in such state, but for some reason just 4 count towards the concurrency limit "0a6c4764-6cba-4061-8151-f6bbb5fe99a6", "68747f03-c8cd-494f-a991-c017fbfb71ba", "5fcb8a72-9572-49cf-ba05-3a60ed65f212", "16019956-cd19-41fa-94c9-e8c04bc032e9", "5e5e8c0f-eff0-4e2f-b111-de0a9c6f139c" and "73bb1530-3455-4400-af2a-8850d37f2e08". They are all version 2
:gratitude-thank-you: 1
Thank you for deleting those! Now I don't see any task in "running" state anymore, but the concurrency limit still says that there are 4 running tasks
I see 0 now, thank you so much!!
:marvin: 1
b

Bianca Hoch

09/30/2022, 4:41 PM
Woohoo!
You got it, Nace.
n

Nace Plesko

09/30/2022, 4:42 PM
It was all you! 🙌
I have another question related to these forever running tasks. We have a shell task that maps to multiple tasks, and we set the timeout like on the screenshot below. I would think that each mapped task would timeout after 3 hours, but just yesterday some of them were stuck for ever and running 13 hours until I manually cancelled them.
db_shell_task
on the screenshot is
ShellTask
Does anything on the screenshot below look obviously wrong? Again, I apologize for my lack of knowledge of Prefect
Rephrasing the same question with the knowledge I just learned. Setting a timeout on
ShellTask
doesn't seem to do anything. ShellTask doesn't seem to have Duration and the mapped child runs are also not affected by that timeout setting. Is there a way to set timeout on mapped children from
ShellTask
?
View count: 1