https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • d

    Daniel Ross

    03/05/2022, 8:50 PM
    Hello Prefect community, I upgraded from 0.14.22 to 0.15.13 and containers are no longer launching. I'm deployed on ECS and I can see a ConnectTimeOutError that is preventing the tasks from coming up. This is the error in question:
    requests.exceptions.ConnectionError: HTTPConnectionPool(host='127.0.0.1', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f1911c36d90>: Failed to establish a new connection: [Errno 111] Connection refused'))
    If I look at the task itself, I can see that the environment variable for PREFECT__CLOUD__API is set to http://127.0.0.1:4200. So this seems like the problem. The host it's trying to connect to is clearly wrong (since the server itself is running on an EC2 instance). So I've adjusted my ~/.prefect/config.toml to look like this:
    host_ip = "my.ip.goes.here"
    host_port = "4200"
    host = "http://${server.host_ip}"
    port = "4200"
    endpoint = "${server.host}:${server.port}"
      [server.ui]
      apollo_url = "<http://my.ip.goes.here:4200/graphql>"
    [cloud]
    api = "${${backend}.endpoint}"
    endpoint = "<https://api.prefect.io>"
    graphql = "${cloud.api}/graphql"
    No luck. So I added the PREFECT__CLOUD__API definition to my environment variables in the container definition. Still no luck. However, when I look at the task definition, I can see the correct (or at least intended) PREFECT__CLOUD__API environment variable there. But the variable in the task is still set to http://127.0.0.1:4200, and the problem persists! I am pretty stuck on this, and hoping that someone here has a line of sight to the solution. (This all worked without much configuration previously ... which now seems weird.) All help appreciated!
    :discourse: 1
    k
    a
    36 replies · 3 participants
  • j

    Jacqueline Riley Garrahan

    03/06/2022, 6:06 AM
    I am testing Prefect deployed into a kind kubernetes cluster using the helm chart. I'd like to mount a folder on my local filesystem to a running job. I've configured this in the job template using:
    apiVersion: batch/v1
    kind: Job
    metadata:
      name: prefect-job
      labels: {}
    spec:
      template:
        metadata:
          labels: {}
        spec:
          containers:
            - name: flow
              image: prefecthq/prefect:latest
              imagePullPolicy: IfNotPresent
              ...
              volumeMounts:
                - name: filesystem-dir
                  mountPath: /job-files
          volumes:
            - name: filesystem-dir
              hostPath:
                # directory location on host
                path: /path/to/my/dir
                # this field is optional
                type: Directory
    I'm unable to mount the path which I believe is a function of the volume not being available on the agent. Is there a straightforward way for me to mount a volume to the agent in the helm chart deployment?
    a
    6 replies · 2 participants
  • k

    Khen Price

    03/06/2022, 10:43 AM
    Hello Prefect Community! A question about mapped tasks and concurrency - we are not sure what is supposed to happen.
    Do mapped tasks run concurrently? Or does concurrency only happen when a dask executor is used?
    Thanks!
    a
    n
    10 replies · 3 participants
  • d

    Dekel R

    03/06/2022, 8:43 PM
    Hey everyone, I have a pretty complex flow with multiple tasks that reads data from multiple sources, then extract the relevant parts from it and saves it to google storage. I have 2 questions: 1. This flow was originally written outside of Prefect and used pool.map in order to extract data from the objects I have (html pages) - is there any knows issue regards using pool.map inside a Prefect task context? 2. In another task that isn’t using pool.map Im running this (valid) pandas row -
    comparable_items_df['tag_rank'] = comparable_items_df.groupby(['id', 'tag_name']).cumcount()
    And I get the following error (when running outside of a task context it works)
    TypeError: unhashable type: 'ResultSet'
    For now I’m running this locally on my mac but it will run eventually using Vertex AI. I’ll appreciate any help since I’m currently stuck. Thanks.
    k
    16 replies · 2 participants
  • b

    Bihag Kashikar

    03/07/2022, 12:55 AM
    Hi Guys - I have a few questions on prefect cloud - the questions are after viewing the architecture picture on this https://docs.prefect.io/orchestration/ under section architecture overview authentication question - can prefect use auth0 for authentication of users ( real people not service accounts/users ) - any reference diagrams will be helpful network and traffic filtering related questions on prefect cloud hosted on GCP - 1) how does prefect cloud connect to gcp, looking at the picture on https://docs.prefect.io/orchestration/ under architecture section the agent 1,2 on gcp and connection to cloud api - is it over internet? any references on prefect cloud documentation stating this 2) on the same connection question, is traffic filtering possible using GCPs private access point option https://cloud.google.com/vpc/docs/private-service-connect, if yes, please any references of this prefect cloud documentation? This is just for my knowledge, and this is more of a documentation question than actual architecture involved here :-) as similar hosted solutions like snowflake and elasticsearch, these two hosted solutions do have all the documentation of above 🙂 thank you in advance.
    k
    18 replies · 2 participants
  • s

    Sen

    03/07/2022, 12:39 PM
    Hey everyone, I have got the docker agent working with GPU based on the instructions from @Kevin Kho. But how do I run a flow locally in the agent and not by spinning up a new docker agent? The problem with this is that the new docker agent being created by prefect when triggered by the flow is actually not having runtime as Nvidia and doesn't see the Nvidia drivers and devices which makes it run in CPU. So I would like to know if it is possible to run the flow in the docker agent itself with starting a new Docker agent for the Docker run? Thanks, Sen
    k
    7 replies · 2 participants
  • e

    Emma Rizzi

    03/07/2022, 1:08 PM
    Hi! I don't know if its a known issue but I experienced some strange behavior with the date selector when using the DateTimeParameter, selecting any date between april and october froze the UI for one minute then it selected some later date (25 october for 2020, 31 october for 2021 for example, for the ones I tested) Seems like a timezone conflict, Prefect Cloud detected my time zone as Paris, I changed to local it's working for now 🙂
    k
    m
    +1
    8 replies · 4 participants
  • j

    Justin Martin

    03/07/2022, 2:34 PM
    Hello all, thanks for building such an amazing product! I do have a situation that i could use some help with: i have a task that does something very small; is there way to define a FunctionTask on the fly and add it to the flow without having to build an actual function with the
    @task
    decorator? Here is an extremely simplified version of what i'm doing (See thread). Am I able to define the run_load_proc function as just a FUnctionTask within the flow without an explicit function? Also, not sure if defining the global SqlServerExecute is a total anti-pattern, let me know. Thanks for all of your help.
    k
    4 replies · 2 participants
  • h

    Haseeb Ahmad

    03/07/2022, 2:35 PM
    Hey guys I have a prefect job that is running on a schedule. What I want to basically achieve is that the new flow does not get run until the old one is finished. For instance consider a flow is scheduled every hour and if it takes more than an hour for it to finish then the new scheduled one should not start until the old one has finished running. I was looking at
    Inspecting flow runs
    from prefect.backend import FlowRunView
    flow_run = FlowRunView.from_flow_run_id("4c0101af-c6bb-4b96-8661-63a5bbfb5596")
    Can you please guide me on the best approach to achieve the above use case. Really appreciate your time and help.
    k
    6 replies · 2 participants
  • s

    Sarah Floris

    03/07/2022, 2:42 PM
    okay so I am trying to translate a prefect core flow to prefect cloud flow. I want to be able to make changes locally and have it update my cloud flow. Is this possible?
    k
    22 replies · 2 participants
  • k

    Kevin Kho

    03/07/2022, 3:36 PM
    Join our events channel for Prefect-related events!
  • d

    Daniel Nilsen

    03/07/2022, 4:26 PM
    I am trying to register a flow to my helm deployed server with
    flow.register(«myProject»)
    . To connect to the server I change the
    server.endpoint
    in config.toml to the correct url. This works. But I don’t want to change the config manualy like this. Is there a way to temporarily change it when registering? I have tried
    set_temporary_config({"server.endpoint": "123"})
    but it does not seem to work 🤔
    k
    3 replies · 2 participants
  • z

    Zhibin Dai

    03/07/2022, 4:34 PM
    when using conditional logic in a flow, is it necessary to have two
    with case
    statements, or can i use just one that does something if a condition is true?
    k
    5 replies · 2 participants
  • j

    Jason Motley

    03/07/2022, 4:59 PM
    What tends to cause this error for a task in the middle of a flow, when upstream tasks have run fine?
    : No heartbeat detected from the remote task;
    k
    29 replies · 2 participants
  • p

    Paul Butler

    03/07/2022, 5:01 PM
    Hi Prefect community!
    👋 2
    k
    c
    2 replies · 3 participants
  • p

    Paul Butler

    03/07/2022, 5:33 PM
    Question for the Community - I am trying to use Prefect to schedule and monitor a Dbt project/pipeline.   I follow example and pass in dbt_kwargs for connecting to snowflake.  My dbt project runs fine with dbt run or dbt compile command, but the DbtShellTask fails - yet does not provide any reason for error.  Anywhere more detailed logging is recorded??  
    
    Sample tasks like the hello one included here, run OK.   I'm using Studio Code to debug this code, but also try to run it in Python IDLE and get same error output     I'm running this:
    
    from prefect import task, Flow, Parameter
    from prefect.tasks.shell import ShellTask
    from prefect.tasks.dbt import DbtShellTask
    
    @task(log_stdout=True)
    def say_hi(name):
        print("hello {}!".format(name))
    
    with Flow(name="dbt_flow") as f:
        name = Parameter('name')
        say_hi(name)
    
        task = DbtShellTask(
            profile_name='default',
            environment='dev',
            dbt_kwargs={
                'type': 'snowflake',
                'threads': 4,
                'account': 'mysnowflake.account',
                'user': '<mailto:myemal@myco.com|myemal@myco.com>',
                'authenticator': 'externalbrowser',
                'role': 'ROLENAME',
                'database': 'DBNAME',
                'warehouse': 'ENGINEERING_XS',
                'schema': 'DV_PROTO'
            },
            overwrite_profiles=False,
            profiles_dir='C:\\Users\myDBTuser\.dbt'
        )(command='dbt compile')
    
    out = f.run(name='Paul') 
    
    [2022-03-07 17:18:07+0000] INFO - prefect.TaskRunner | Task 'DbtShellTask': Starting task run...
    [2022-03-07 17:18:07+0000] ERROR - prefect.DbtShellTask | Command failed with exit code 1
    [2022-03-07 17:18:07+0000] INFO - prefect.TaskRunner | FAIL signal raised: FAIL('Command failed with exit code 1')
    [2022-03-07 17:18:07+0000] INFO - prefect.TaskRunner | Task 'DbtShellTask': Finished task run for task with final state: 'Failed'
    [2022-03-07 17:18:07+0000] INFO - prefect.TaskRunner | Task 'say_hi': Starting task run...
    [2022-03-07 17:18:07+0000] INFO - prefect.TaskRunner | hello Paul!
    [2022-03-07 17:18:07+0000] INFO - prefect.TaskRunner | Task 'say_hi': Finished task run for task with final state: 'Success'
    k
    7 replies · 2 participants
  • a

    Adam Roderick

    03/07/2022, 6:06 PM
    Hi, after upgrading to prefect 1.0 we are seeing this warning during our deployments
    ...venv/lib/python3.8/site-packages/prefect/client/client.py:848: UserWarning: A flow with the same name is already contained in storage; if you changed your Flow since the last build, you might experience unexpected issues and should re-create your storage object.
    I don't understand why we see that, because, we create a new Docker storage object on every deployment
    flow.storage = Docker(....)
    Can anyone explain what this error means, or how to address it?
    k
    10 replies · 2 participants
  • k

    kevin

    03/07/2022, 7:40 PM
    Quick question about the documentation: https://docs.prefect.io/orchestration/ui/interactive-api.html On this page there’s an image that shows that you can query by
    flow_run.duration
    but I can’t seem to find that queryable value in the
    flow_run
    fields on the interactive API in Prefect Cloud. Could someone help me resolve this differential?
    k
    3 replies · 2 participants
  • p

    Pedro Machado

    03/07/2022, 8:51 PM
    Hi there. We are running 0.15.13 and would like to upgrade to 1.0. We are using Kubernetes with an in-cluster agent. Will the 0.15.13 agent be able to run 1.0 flows? Alternatively, if we upgrade the agent first to 1.0, can it run 0.15.13 flows?
    k
    m
    6 replies · 3 participants
  • j

    Jean-Michel Provencher

    03/07/2022, 9:20 PM
    Hi, I’m using the docker image
    prefecthq/prefect:1.0.0
    and when scanning the docker image with Snyk I’m getting up to 114 security issues, as well as 8 critical vulnerabilities. Are you guys planning on fixing them in the base docker image?
    k
    m
    3 replies · 3 participants
  • d

    Dylan

    03/07/2022, 9:40 PM
    I haven’t done much digging on the topic, but I’m sure I’m not the first to ask or think about a data transform registry. Basically there’d be some k:p pairs used to assemble a tree of business stakeholders and the upstream / downstream dependencies based on what’s in a properties file, or can be generated from the code similar to how reflection works. Any thoughts on this? Would be cool to see a table or visualization of it. Maybe some Prefect workflow tags can be used for this too?
    k
    m
    20 replies · 3 participants
  • d

    Dylan

    03/07/2022, 9:41 PM
    That way when you wanna look up data pipelines or transforms by their business function, connector source, or their criticality, you could filter them etc.
  • k

    Kelly Huang

    03/07/2022, 11:27 PM
    Hi, I'm trying to run flows on the cloud using GitHub storage and a local agent. But I'm getting this error:
    in get_flow
      from github import UnknownObjectException
    ModuleNotFoundError: No module named 'github'
    I'm reading differing things about whether or not github storage can access package dependencies? Correct me if I'm wrong, but shouldn't it definitely be able to access my Pipfile and therefore dependencies? Otherwise what would the point of github storage be?
    :discourse: 1
    k
    9 replies · 2 participants
  • a

    Andrew Moist

    03/08/2022, 9:37 AM
    Hi everyone. Our company is looking to get some help from a data engineering consultancy in Europe, ideally with some experience with Prefect/DBT/Redshift. Any recommendations would be appreciated. We’re mainly based in the UK. Thanks.
    👀 1
    k
    1 reply · 2 participants
  • t

    Tomer Cagan

    03/08/2022, 9:38 AM
    Is is possible to use introspection in the API? In hasura documentation (and SO) I see it should possible but for whatever introspection query I try to use, I am getting an error (details inside)
    :discourse: 1
    s
    k
    8 replies · 3 participants
  • v

    Vadym Dytyniak

    03/08/2022, 9:51 AM
    Hi. How the default
    checkpointing
    works in prefect 1.0.0? In documentation I see that it is enabled by default, but what is the default
    Result
    ?
    k
    33 replies · 2 participants
  • b

    Bruno Murino

    03/08/2022, 12:48 PM
    Hi everyone — I’m trying to start a new flow run by using the StartFlowRun prefect task, like in the screen grab, but I’m getting an error:
    TypeError: StartFlowRun.run() missing 1 required positional argument: 'self'
    . I don’t know if I’m doing something wrong, but it’s a bit unclear what to do
    k
    2 replies · 2 participants
  • t

    Tom

    03/08/2022, 1:06 PM
    Hi all, we are evaluating Prefect and were wondering if it is a good idea to initiate flows within flows
    b
    s
    +2
    12 replies · 5 participants
  • t

    Tom

    03/08/2022, 1:57 PM
    I have another question regarding checkpoints, is it correct that results are not persisted when you run flows through
    flow.run()
    ? (https://stackoverflow.com/questions/66660927/prefect-workflow-how-to-persist-data-of-previous-every-schedule-run)
    :discourse: 1
    k
    3 replies · 2 participants
  • t

    Thomas Opsomer

    03/08/2022, 2:55 PM
    Hello 🙂 We have issues with tasks that require a manual approval (with
    manual_only
    trigger), after being approved using the UI on prefect cloud, it takes a very long time before the flow is resumed (like several hours :/). It's not the first time it happens, but currently it's happening on every flows. Anyone having the same issue ? Is it an issue on prefect backend side or should we do something ?
    k
    21 replies · 2 participants
Powered by Linen
Title
t

Thomas Opsomer

03/08/2022, 2:55 PM
Hello 🙂 We have issues with tasks that require a manual approval (with
manual_only
trigger), after being approved using the UI on prefect cloud, it takes a very long time before the flow is resumed (like several hours :/). It's not the first time it happens, but currently it's happening on every flows. Anyone having the same issue ? Is it an issue on prefect backend side or should we do something ?
k

Kevin Kho

03/08/2022, 2:57 PM
Oh man that is the second report. We’ll look into it
👍 1
Could you give me flow run ids?
t

Thomas Opsomer

03/08/2022, 3:06 PM
this one should be stuck: 59b5eec1-5139-4dc4-b429-998eb852d7e5
k

Kevin Kho

03/08/2022, 3:06 PM
Thanks!
t

Thomas Opsomer

03/08/2022, 3:06 PM
another one 9f15dc61-e502-4267-8ba0-1633a882c73c
I'll have to resume these flows ^^ Do you want to me to keep them stuck a bit longer for investigation ?
k

Kevin Kho

03/08/2022, 5:05 PM
Can you resume them? We have a clue what is wrong. There is a backend service not working correctly and we are working on a fix but do whatever you need to, I was able to replicate and have flow runs stuck in the Resume state as well
t

Thomas Opsomer

03/08/2022, 5:09 PM
Yes, in our case these tasks are just "waiting" tasks, they don't do anything, so I just set them as success and then restart the flow...
k

Kevin Kho

03/08/2022, 5:10 PM
Ah ok go for it. I was wondering if you had a hack to get it to execute
Hey @Thomas Opsomer, we found an API query that was timing out in Cloud for the service that handles resuming these tasks. We made edits to it and deployed it and the new deployment just went through but it is still timing out so this is not resolved for now and we are still working on a fix. Sorry about that. (edited)
This has been resolved so new flows will work and then the old ones are being processed now
t

Thomas Opsomer

03/11/2022, 4:49 PM
I confirm it works fine ! Thanks
k

Kevin Kho

03/11/2022, 4:55 PM
uhh the other guy said 15% of flows fail. how many do you have I am just curious?
t

Thomas Opsomer

03/11/2022, 4:59 PM
We don't have much flows running right now. One of my colleague said it worked for the flow he was testing. Our production flows run sunday/monday so I'll give you better feedback next week 🙂
k

Kevin Kho

03/11/2022, 5:01 PM
ok thanks!
t

Thomas Opsomer

03/11/2022, 5:16 PM
Is this feature used a lot by other users ? I would think that more users would report issues, but didn't find much here. Also I read somewhere that it may not be in the v2, so I'm curious
@Kevin Kho to follow up, I still have some flow stuck on tasks with manual validation 😕
here is a flow_id: "c31b77e9-af98-4ac3-a6df-987c9e667d03" if it's of any use 🙂
k

Kevin Kho

03/14/2022, 5:08 PM
Thanks will bring this up. What percent suffer from this and how long is it taking?
t

Thomas Opsomer

03/14/2022, 5:32 PM
ah sorry actually it's a flow that was started last week
k

Kevin Kho

03/14/2022, 5:36 PM
Ah ok
View count: 4