https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • r

    Robin

    09/16/2020, 8:22 AM
    Hey folks, we are finally scaling up our dask kubernetes on EKS and having quite some fun with it! ☺️ While investigating on some (probably external) performance issues, I was wondering whether there is a way to illustrate task duration histograms from the prefect cloud UI? I found the dask dashboard super helpful to understand, where bottlenecks are. Give me a handshake 👋 if you would be interested in such a feature. 🙂 Right now, something like a simple histogram of number of tasks vs. task durations (maybe with a binning parameter) combined with a task filter would be already super helpful. Both to see, which children within a mapped task take longer than others as well as to see which mapped tasks take longer than others... Anyway, we are already appreciating the UI quite a bit, also because the mobile version allows to check on flow state pretty seemlessly! 🙏
    ❤️ 1
    👋 5
    ✔️ 1
    j
    2 replies · 2 participants
  • d

    Dinu Gherman

    09/16/2020, 11:59 AM
    Hi @Marvin!
  • m

    Marvin

    09/16/2020, 11:59 AM
    Incredible. The contest is even worse than I thought it would be. I'll still enter you to win, @Dinu Gherman
    m
    2 replies · 2 participants
  • s

    sebastian.clanzett

    09/16/2020, 12:40 PM
    Hi folks. I have another short question. Is it possible to get more than one return value from a task inside a flow context e.g.:
    with Flow("Flow1") as flow:
        config = get_config('local')
        value1, value2 = task1()
    If i try this i get : TypeError: 'FunctionTask' object is not iterable
    m
    j
    4 replies · 3 participants
  • d

    Dinu Gherman

    09/16/2020, 1:29 PM
    Hello! I’m excited at getting my feet wet with Prefect, just installed it locally on macOS, ran
    prefect backend server
    and
    prefect server start
    , got the web UI up and wonder how I can make the standard hello world example below appear in that UI? Do I need a “project”? If so how do I wire that up with the example code running?
    from prefect import task, Flow, Parameter
    
    @task(log_stdout=True)
    def say_hello(name):
        print("Hello, {}!".format(name))
    
    with Flow("My First Flow") as flow:
        name = Parameter('name')
        say_hello(name)
    
    flow.run(name='world') # "Hello, world!" 
    flow.run(name='Marvin') # "Hello, Marvin!"
    j
    19 replies · 2 participants
  • c

    Charlie Cahoon

    09/16/2020, 4:02 PM
    Hi all, we are using the Kubernetes agent with the
    KubernetesJobEnvironment
    and some of our flows are failing with the error below. I'm sure it's something minor... if we wait a few minutes and retry from the UI it does eventually run.
    Failed to create Kubernetes job: (409)
    Reason: Conflict
    HTTP response headers: HTTPHeaderDict({'Audit-Id': '8b3f7073-0520-49a3-9d38-60d761af5fbe', 'Content-Type': 'application/json', 'Date': 'Wed, 16 Sep 2020 15:55:38 GMT', 'Content-Length': '242'})
    HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch \"workflows-prefect-job\" already exists","reason":"AlreadyExists","details":{"name":"workflows-prefect-job","group":"batch","kind":"jobs"},"code":409}
    j
    2 replies · 2 participants
  • g

    Glen Trudeau

    09/16/2020, 4:08 PM
    Hi Guys. I’ve been working on a prefect server implementation on EC2 using the Fargate agent. I’m running into some errors trying to execute flows using Fargate. The agent would deploy the flow to the Fargate cluster but then would return flow not found. From the logs it looks like it was trying to find the flow inside the Fargate node instead of where it is registered and running on the EC2 instance? I then thought maybe it might be better to try to run this using S3 storage but that is returning the error:
    Failed to load and execute Flow's environment: TypeError('an integer is required (got type bytes)')
    j
    11 replies · 2 participants
  • v

    Vipul

    09/16/2020, 4:28 PM
    Hi All, I created two flow with same name but different version_group_id. I am able to see both these flow name in UI but can't figure out what version_group_id it is, is there any way to identify...
    j
    5 replies · 2 participants
  • k

    kevin

    09/16/2020, 4:33 PM
    Hey guys, is there a way to render just the flow schematic seen in the cloud UI on a local machine?
    j
    m
    5 replies · 3 participants
  • l

    Leonard Marcq

    09/16/2020, 7:50 PM
    Hi there, I managed to deploy a flow that spawns a bunch of mapped tasks that run on a Dask cluster on Fargate with
    DaskCloudProviderEnvironment
    , it works pretty well and was much easier to set up than I expected. But it seems that the cluster that gets spawned runs the Dask tasks into the default VPC in AWS (and its associated security group), but I would prefer those tasks to run in another VPC than the default one. I did not find much info on the network config for that in the doc or in Dask's Cloud Provider documentation - ideally I would like to pass specific security groups / subnets and all that for the Dask cluster. (1) Is there a way of passing a network config to the Fargate Dask cluster through the
    DaskCloudProviderEnvironment
    interface (or some other way)? (2) Does it matter what kind of agent picks up that flow? From what I understand, the only thing the agent does in this case is spawn a cluster with the Scheduler and the Worker nodes in it and check that the tasks completed at the end (and maybe tear down the Dask cluster?), so I guess any agent could do the job or does it specifically have to also be a Fargate agent for some reason?
    j
    2 replies · 2 participants
  • h

    Hui Zheng

    09/16/2020, 11:02 PM
    Hello Prefect support, I need some help. One our prefect flow [1] has difficulty initiating flow-runs since 3 hours ago. the flow-runs get hold up at the state of
    Scheduled to start
    , but there is not any run logs and no gke job was created. All runs scheduled for the past 3 hours had the same issue. If we manually start a flow-run, it also run into the same issue. The project agent shows in good status. Those delayed runs won’t even show up in the flow runs tab. It seems to me that the flow scheduled the run, but somehow was not able to start the run when the time comes. [1] https://cloud.prefect.io/semios/flow/dcf941bd-6365-45e8-aafd-71daae6c29f0?version=7
    n
    18 replies · 2 participants
  • v

    Valentin Willscher

    09/17/2020, 7:58 AM
    Hey guys - is their a way to list all registered flows? I couldn’t find a way in the API docs
    s
    1 reply · 2 participants
  • a

    Adam

    09/17/2020, 9:32 AM
    Hello everyone! Hope you’re having a great day so far! I need some quick help regarding the API for
    PostgresFetch
    . It seems to require
    user
    and
    host
    arguments when declaring the task and then
    password
    can be supplied when instantiating it. How can I send all when I instantiate it rather? I’m trying to achieve something like this:
    get_customers = PostgresFetch(
        name="Get customers ready for migration",
        db_name="postgres",
        user="CAN_I_USE_ENV_SECRET_HERE?",
        host="CAN_I_USE_ENV_SECRET_HERE?",
        query="""
            SELECT DISTINCT customer_id
            FROM customer_migration_request
        """,
        fetch="all",
    )
    
    
    with Flow("migrate customers") as flow:
        postgres_user = EnvVarSecret("POSTGRES_USER", raise_if_missing=True)
        postgres_password = EnvVarSecret("POSTGRES_PASSWORD", raise_if_missing=True)
        postgres_host = EnvVarSecret("POSTGRES_HOST", raise_if_missing=True)
    
        customers = get_customers(
            password=postgres_password, user=postgres_user, host=postgres_host
        )
    d
    8 replies · 2 participants
  • i

    Iain Dillingham

    09/17/2020, 1:53 PM
    Hi #prefect-community 👋 Could anyone point me to where the context object is populated? I've found that
    prefect.context.scheduled_start_time
    contains slightly different timezone information depending on whether I run the flow with
    my_flow.run()
    or whether it is run by the Docker agent. • Run with
    my_flow.run()
    the object as follows:
    DateTime(2020, 9, 17, 13, 28, 18, 669528, tzinfo=Timezone('UTC')
    . • Run by the Docker agent the object is as follows:
    DateTime(2020, 9, 17, 13, 45, 0, tzinfo=Timezone('+00:00'))
    . The distinction is important because they are both instances of
    pendulum.DateTime
    . Calling
    <http://dt.to|dt.to>_iso8601_string()
    results in a different string, because if the timezone is UTC, then this method replaces the trailing
    +00:00
    with
    Z
    . Whether or not this is a good idea, it would be good for
    prefect.context.scheduled_start_time
    to return consistent timezone information.
    d
    4 replies · 2 participants
  • s

    simone

    09/17/2020, 2:02 PM
    HI #prefect-community I have task mapping on a list of images (~12000) :
    out = func.map([A1, A2, A3, A4, A5, B1, B2, B3, B4, B5])
    in the next step I would like to partially reduce the output and combine only the matching subgroups ex combining 
    A = out[0:5] B =out[5::]
    and then process in parallel 
    A
     and`B`. I have three questions: (1) If I understood correctly order matters for mapping in prefect so input and output have the same order, correct? (2) I am running the code on a HPC. If I proceed this way will the entire 
    out
     be collected in memory or the different output groups dispatched to the specific worker where the reduce is happening? (3) Is there a more efficient way to do this? thanks a lot!
    d
    14 replies · 2 participants
  • p

    Pedro Machado

    09/17/2020, 4:18 PM
    Hi there. I am looking for suggestions on the best way to structure a repo to store Prefect flows. We expect to have several flows that use some shared functionality (example: code to run queries and get data from Presto or to interact with an API). Could you suggest a folder structure that allows us to create flows and import shared code? Also, I anticipate having several sql files that would be run by the different flows. This is one of those "I don't know what I don't know yet" questions. Just trying to set things up right from the beginning. Although we won't do CI/CD from the start, I'd like to have a set up that will allow us to implement CI/CD. in the near future This client uses GitLab. A little more background ... • I am planning to use Prefect cloud. • We'll probably start with a docker agent running on an AWS instance but I am open to suggestions. The workflows will be primarily pulling data, running queries, sending files, etc. (we won't be training ML models, etc.) • The group I am working with has limited devops support and any additional infrastructure takes a while to request, get approved, and provisioned. • Most of the code will be written in Python but they have some legacy R stuff that we have been running inside of a container Thansk!
    d
    12 replies · 2 participants
  • a

    ale

    09/17/2020, 4:25 PM
    Any chance to use Bitbucket instead of Github for storage?
    d
    1 reply · 2 participants
  • s

    sundaram

    09/17/2020, 4:26 PM
    @Marvin
    m
    1 reply · 2 participants
  • j

    jars

    09/17/2020, 6:08 PM
    Prefect Cloud gives our Flow Runs cute names like beautiful-cricket, industrious-turkey, etc. Is this name accessible inside of the Flow's execution? We'd like to tag some external resources w/ this name so we could easily know which Flow made those resources for debugging.
    j
    p
    11 replies · 3 participants
  • h

    Hamza Ahmed

    09/17/2020, 7:30 PM
    @Marvin
  • m

    Marvin

    09/17/2020, 7:30 PM
    Why should I want to make anything up? The contest is bad enough as it is without wanting to invent any more of it. I'll still enter you in it, @Hamza Ahmed
  • s

    Steven Kalt

    09/17/2020, 8:26 PM
    @Marvin
  • m

    Marvin

    09/17/2020, 8:26 PM
    This contest will all end in tears, I just know it. You're entered anyway, @Steven Kalt.
    l
    2 replies · 2 participants
  • g

    Glen Trudeau

    09/17/2020, 9:10 PM
    @Marvin
  • m

    Marvin

    09/17/2020, 9:10 PM
    This contest will all end in tears, I just know it. You're entered anyway, @Glen Trudeau.
  • s

    Sam

    09/17/2020, 9:26 PM
    @Marvin am I too late?
  • m

    Marvin

    09/17/2020, 9:26 PM
    Don’t pretend you want to talk to me, I know you hate me. I'll still enter you in the contest, @Sam.
  • j

    Jacob Blanco

    09/18/2020, 4:46 AM
    One of my flows is currently 5 minutes late for execution. Any ideas how to debug?
    c
    5 replies · 2 participants
  • s

    sark

    09/18/2020, 4:55 AM
    hi, what’s the best way to express a job which has to be explicitly started and stopped at specific times, but with other downstream jobs which are supposed to start only once that job has stopped? schedules seem to apply to flows, not tasks but if i separate the starting/stopping into separate flows then i lose the ability to express dependencies
    c
    5 replies · 2 participants
  • l

    Lewis Bails

    09/18/2020, 6:15 AM
    @Marvin
Powered by Linen
Title
l

Lewis Bails

09/18/2020, 6:15 AM
@Marvin
View count: 1