https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Michal Mucha

    06/02/2021, 3:54 PM
    Hi! Has anyone tried to use the dask.distributed performance_report with a Prefect DaskExecutor ? Would this be a good place to inject it? https://github.com/PrefectHQ/prefect/blob/b45029d00579ef6ad9a9479e5eb03b3e21ee31b7/src/prefect/executors/dask.py#L232 Thanks
    k
    4 replies · 2 participants
  • j

    John Ramirez

    06/02/2021, 4:51 PM
    Hey everyone! I’m trying to automate API calls using the map functionality. are the conditional tasks able to work with the mapping results or do I need to reduce the results before performing the condition
    w
    k
    +1
    8 replies · 4 participants
  • j

    jcozar

    06/02/2021, 5:54 PM
    Hi! A few months ago I designed a Flow using ECS run configuration, S3 for results and Docker for storage providing a registry_url in ECR. For the prefect agent, y created a service in AWS ECS (EC2 infrastructure). It worked correctly, but I am reproducing the same configuration for a new project and now it does not work: • Ir requires the awsvpc configuration. I solved that using the arg 
    run_task_kwargs
     in the ECSRun configuration definition. • The arguments 
    cpu
     and 
    memory
      from the ECSRun configuration definition seems is not working. Even if I use valid values (cpu=256, memory=512) it creates the TaskDefinition using 1vcpu and 2GB RAM (I think it is the default configuration). I tried 
    run_task_kwargs
     with the keyword 
    overrides
     but it does not work neither. Do you know what I am doing wrong? Thank you very much!!
    k
    m
    +1
    16 replies · 4 participants
  • j

    Jason Prado

    06/02/2021, 7:42 PM
    I’d like to trigger some flows whenever a new file is added to a Google Drive folder. My current plan is to run a small python server that receives webhook calls from the Drive API, then uses the Prefect API to trigger the flows. Questions: 1. Is the right Prefect API to use the GraphQL API mutations API
    createFlowRun
    ? 2. Is there anything like this around already? Would love to not reinvent the wheel and deploy a new service.
    n
    3 replies · 2 participants
  • z

    Zach Schumacher

    06/02/2021, 8:06 PM
    hi all, I’ve had a flow in prefect cloud that has been scheduled to start for ~1 hour. I know its not a label issue bc I see the k8s agent picked it up. Could very well be something in our k8s cluster but wanted to check if there was something else going on. Including the flow run ID in the thread.
    k
    19 replies · 2 participants
  • c

    Chris Leber

    06/02/2021, 8:52 PM
    Greetings, friends! Anyone have experience with adding timeouts to the child tasks spawn from mapping? With a single task, I have no problem adding timeout arg to @task decorator. But when I do the same for a task that is mapped, there seems to be no effect.
    👀 1
    k
    15 replies · 2 participants
  • j

    Justin Chavez

    06/02/2021, 9:46 PM
    Hi all! I am having trouble understanding how to approach a specific prefect mapping. Let's say I have a task that returns a list of dicts. Each member of the dictionary is a parameter to a prefect task. I have provided an example flow below to illustrate what I am trying to accomplish
    from typing import Dict, List
    from prefect import task, Flow
    
    @task
    def generate_user_status_list() -> List[Dict]:
        return [
            {
            "user": "User 1", 
            "status": "active"
            },
            {
            "user": "User 2",
            "status": "inactive"
            }
        ]
    
    @task(log_stdout=True)
    def print_user_status(user: str, status: str):
        print(f"Status of {user} is {status}")
    
    
    with Flow('check user status') as flow:
        user_status_list = generate_user_status_list
        print_user_status.map(user_status_list) # <<<< Fails here as it doesn't recognize the parameters in the dictionaries
    
    flow.run()
    It's the second to last line that I am trying to figure out. I want each member of the generated list in the first task to be the parameters to the second. It's like I am trying to do the parameterized flow run in the StartFlowRun section, except I want to run a task instead of a flow with the parameters: https://docs.prefect.io/core/idioms/flow-to-flow.html#running-a-parametrized-flow I can get around this by creating separate tasks that create each parameter as a list of str, but was hoping for a simple all in one task that generates the params for some of my more complex downstream tasks.
    k
    7 replies · 2 participants
  • m

    Michael Law

    06/03/2021, 7:32 AM
    Hi folks, absolute newbie here so be gentle. I am looking to onboard prefect on to our current solution which uses databricks on Azure. I am aware of the prefect databricks methods for submitting jobs etc, and am reasonably comfortable with how this can work in our process with prefect. Where I am unsure is the local development story. For example, if I use the methods provided within prefect I guess I will have to submit a job to a databricks cluster. Without doing any sort or ‘if this then that’ condition, would it be possible for me to trigger a flow using prefect and submit the jobs to a local spark cluster, or do the nice methods provided by prefect only allow me to submit my job to a remote cluster? Thanks
    k
    3 replies · 2 participants
  • s

    Stéphan Taljaard

    06/03/2021, 8:24 AM
    Is it possible to set a flow's readme inside the python flow file? (If a flow gets deleted, its readme will be gone, but if it's in the file, the readme is "saved permanently")
    n
    3 replies · 2 participants
  • r

    Ranu Goldan

    06/03/2021, 1:22 PM
    Hi everyone, I want to ask about flow concurrency: I’ve set flow_concurrency for label
    agent:k8s-prod
    to 100 But I’ve seen the maximum of concurrent run is only 22-24, the other runs is scheduled I have 1 kubernetes agent deployment on my k8s cluster How to make the flow run concurrency more than 24? Anything I miss? Thanks in advance
    k
    n
    7 replies · 3 participants
  • j

    Julio Venegas

    06/03/2021, 1:45 PM
    Hi everyone! I’m working on task state handlers to trigger a notification, one piece of information that I’m interested in sending is flow name and flow run but from my research, neither of them is available in the task or state. Any way to get the flow details for a task state handler?
    k
    3 replies · 2 participants
  • z

    Zach Schumacher

    06/03/2021, 3:40 PM
    maybe im doing something wrong, but I’ve noticed differences in behavior in terminal state handlers between running in cloud vs running locally. Locally, the terminal state handler always does what I would expect (runs once a flow is in its final state). However I’ve seen flow runs for long running tasks in cloud that have the terminal state handler called when the flow is still in a running state.
    n
    k
    23 replies · 3 participants
  • b

    bral

    06/03/2021, 5:46 PM
    Hi folks! How can i store secrets in config.toml? Can anybody provide a minimal example ? Thanks!
    k
    2 replies · 2 participants
  • c

    Charles Liu

    06/03/2021, 6:53 PM
    does prefect-cli support dynamic passing of vars for one-off flow runs? Sometimes our usecase requires adhoc execution with custom parameters (sometimes running a whole pipeline is not time efficient or necessary, like if a smaller date range needs to be backfilled etc.).
    s
    k
    9 replies · 3 participants
  • j

    Jason Prado

    06/03/2021, 7:33 PM
    I’m confused around how to use Secrets to specify things like a DB host for use in a
    PostgresFetch
    .
    db_host = PrefectSecret('db_host')
    fetch = PostgresFetch(
      host=db_host,
      ...,
    )
    I can’t do this because
    UserWarning: A Task was passed as an argument to PostgresFetch, you likely want to first initialize PostgresFetch with any static (non-Task)...
    . Does this mean I need to either put the DB host in my code or get it from another method like an environment variable? Why is
    host
    part of the static args to
    PostgresFetch
    if I’m likely to extract it from a Secret?
    k
    7 replies · 2 participants
  • p

    Pedro Machado

    06/03/2021, 7:49 PM
    Hi everyone. I have a flow that uses docker storage. I am triggering the flow from Prefect cloud and the flow starts but it does not update the progress in prefect Cloud. The flow status is "Submitted" but and all tasks show as "pending". However, I see that the flow is running fine when I start the agent with
    --show-flow-logs
    . It looks like the flow is not communicating with Prefect Cloud. The flow is running an R script. I am using the
    rocker/r-ver:4.1.0
    image as a base and installing python 3.7 via
    apt-get
    . I suspect the issue is related to this. Maybe I am not installing python correctly. See my docker file in the thread. Thanks!
    n
    10 replies · 2 participants
  • h

    Hugo Kitano

    06/03/2021, 10:17 PM
    Hello all, I am trying to reimagine an AWS Step Function, consisting of Lambda Functions and Fargate tasks, as a single flow. Lambda Functions can easily be reconfigured as Prefect tasks, but Fargate tasks, which require their own Docker containers, are a little bit more complicated. The
    ECSRun
    functionality could work, but it is designed for running flows, and this would necessitate flows in flows which could be a little tricky. What would be the best way to reconfigure a Fargate task as a Prefect task?
    n
    14 replies · 2 participants
  • d

    DK

    06/04/2021, 1:49 AM
    Hi Everyone, I'm having an issue with registering flows. Whenever I register the same flow, it bumps the version, even if the metadata is unchanged. I've tried using the CLI as well as flow.register(). I've also tried using flow.register('project_1', idempotency_key=flow.serialized_hash()) and it still bumps the version. I am using a local agent, local storage and the cloud backend. Is there another configuration that needs to be set so this doesn't happen? Or should this all be done in docker?
    k
    r
    +3
    13 replies · 6 participants
  • w

    Wai Kiat Tan

    06/04/2021, 6:30 AM
    Hi, I have a flow in Prefect Cloud which invokes AWS lambda to perform file aggregation with different S3 bucket paths. The agent is a local agent in a Docker Container hosted in AWS ECS. Problems: When the lambda runs more than 6-7 minutes, the agent is not able to detect the response return from the lambda. And the task keep running forever. I follow the recommendation from here https://docs.prefect.io/orchestration/agents/local.html#requirements and use the official Prefect docker image. Is there anyway I can solve/improve on this? Thanks!
    :upvote: 1
    k
    d
    52 replies · 3 participants
  • a

    ash

    06/04/2021, 9:04 AM
    Hi my prefect dashboard is working but data is not coming. It keeps loading flows. This is due to some error that came up when prefect server was launched but i am unable to debug it.
    n
    36 replies · 2 participants
  • p

    Prabin Mehta

    06/04/2021, 9:05 AM
    Hi, I am running Prefect locally. I have flows that I want to run through RestAPIs. I could not find anything regarding
    running flows through rest API.
    I want to run a flow passing config with the request call. Can anyone help me on this?
    v
    2 replies · 2 participants
  • d

    Dali

    06/04/2021, 10:37 AM
    Hey folks 👋 Looking at prefect and I have a (user) config related question. We have config library at our place that loads config based on execution environment (dev/stage/prod) etc. and merges it all into a dict. It's something that's familiar and liked by our developers, so having to use a different way to configure prefect is seen as less than ideal. I see that the location of user config is configurable (pun not intended 😉) via an environment variable. It would be great if there was an option to provide user config as a dictionary, which prefect would then merge with it's defaults and environment variables. Is something like this possible and I missed it? If not is there a reason why it couldn't be done? Cheers
    k
    2 replies · 2 participants
  • m

    Marwan Sarieddine

    06/04/2021, 2:12 PM
    Hi folks, we have been getting unexpected flow run failures (see the thread for more details)
    k
    d
    20 replies · 3 participants
  • h

    Hugo Kitano

    06/04/2021, 5:18 PM
    Is there a Prefect way to trigger a flow run when a file is uploaded to s3? Or will I have to set off the flow in a Lambda Function triggered by the upload?
    k
    3 replies · 2 participants
  • f

    Fina Silva-Santisteban

    06/04/2021, 5:34 PM
    Hi everybody! We’d like to have the flow registration done automatically as part of our CI pipeline, and I’m curious to hear how others have approached this. I’m trying things out using a github action which runs on push an AWS ECS Task which runs a container in a Fargate cluster (our flows all run on Fargate using Docker storage) which then runs the registration method , but the aws setup is not as straight forward as I hoped. 😅 Pls let me know your solution approaches! 🙏
    k
    a
    5 replies · 3 participants
  • h

    Hamza Ahmed

    06/04/2021, 5:45 PM
    What is the correct way to set the
    imagePullSecrets
    value in the
    prefecthq/prefect-server
    helm chart? Tried multiple ways, but none seem to work (examples in comments)
    n
    3 replies · 2 participants
  • j

    Julio Venegas

    06/04/2021, 11:23 PM
    Hi community! Lately I’m getting a lot of time out errors from tasks, running a LocalDaskExecutor. See error inside thread, any hints on what the issue is?
    k
    16 replies · 2 participants
  • m

    Michael Law

    06/04/2021, 11:58 PM
    Just a note to say, props to @Kevin Kho for giving me some pro-tips on Prefect setup in combination with databricks using delta lake. I have orchestrated a bunch of our jobs through our data layers now and setup a strategy for our developer loop using a combination of a local cluster for unit tests and databricks-connect for our on cluster runs, either via local machines or deployed to AKS. Not production ready yet, but huge strides. Couldnt have done it without your help ❤️
    :upvote: 6
    :hero: 2
    k
    1 reply · 2 participants
  • o

    Oussama Louati

    06/05/2021, 9:49 AM
    Hello, I am using prefect to run some machine learning workload. Each time my run fails with an OOM error related to prefect. It is related to caching results, I am not using output caching so i suspect it is related to the automatic input caching (I am using a 500Gib disk) , can anyone help me with this?
    k
    d
    17 replies · 3 participants
  • c

    Colin

    06/05/2021, 3:45 PM
    Hi All, I am after some help. I am new to prefect and i am using docker storage and kubernetes as my hosting environment. Everything is working beautifully except..... Sometime, just sometime a flow doesnt start at the required time, it skips 15 mins ?. I am really struggling to find out whu ?
    z
    2 replies · 2 participants
Powered by Linen
Title
c

Colin

06/05/2021, 3:45 PM
Hi All, I am after some help. I am new to prefect and i am using docker storage and kubernetes as my hosting environment. Everything is working beautifully except..... Sometime, just sometime a flow doesnt start at the required time, it skips 15 mins ?. I am really struggling to find out whu ?
z

Zach Schumacher

06/05/2021, 9:29 PM
Likely due to some configuration in your k8s cluster. spot instances? resource constraints? something along those lines
i assume youre using a k8s agent?
View count: 1