https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • c

    Constantino Schillebeeckx

    08/03/2021, 5:22 PM
    hi gang! what's the best practice way of registering flows that aren't contained to just one
    .py
    file? we've split out commonly shared functionality between flows into an e.g.
    utils.py
    file which is referenced in the flow. given all the storage documentation this design doesn't seem to fit into the intended use of storage.
    k
    m
    • 3
    • 39
  • d

    David Elliott

    08/03/2021, 6:32 PM
    Hey all, I've been looking at upgrading my prefect version and also my distrbuted, dask & dask-kubernetes versions for our production pipeline, and just wanted to clarify some change in behaviour that I've noticed? • Previously when I ran a flow, the k8s agent would create a job which was in effect the dask scheduler, creating and retiring pods as it needed to. In my case that
    prefect-job-xxxxx
    would create 4 ephemeral dask workers (named something like
    dask-root-xxxx
    ) • Now the behaviour I'm seeing is: ◦ K8s agent creates the
    prefect-job-xxx
    ◦ In the
    prefect-job
    logs, it gives me _prefect.DaskExecutor | Creating a new Dask cluster with
    __main__.make_cluster
    .Creating scheduler pod on cluster. This may take some time._ ◦ there are then 5x
    dask-root-xxx pods
    created, where 1 of them is a dask scheduler - ie the scheduler no-longer sits within the
    prefect-job-xx
    ? Just wanted to check if this was expected/intended behaviour - I couldn't see any reference to it in the prefect release notes • In addition, (and this is more a side note that I think the prefect k8s rbac needs updating) - I've had to add 2x more rulesets to my k8s RBAC to make it work - see these docs for what's now required. Here is specifically what's changed vs the prefect docs Thanks!
    k
    m
    • 3
    • 13
  • b

    Billy McMonagle

    08/03/2021, 8:24 PM
    Hi all, I'd like to add a "create project if not exists" script into my build pipeline (for git branch based work). I know I can do
    prefect create project "My Project"
    ... is it safe to run this command multiple times?
    k
    • 2
    • 3
  • j

    Jeff Baatz

    08/03/2021, 9:00 PM
    For some reason on Cloud, I'm noticing that most of the UI elements aren't loading properly. Is anyone else experiencing this? I've tried it across a few different browsers. It's unfortunately bad enough that I can't really use the UI at all since almost none of the text loads. I tried clearing caches and all that good stuff already. EDIT: Oh wow, you know what... Dark mode was checked, but it doesn't seem to be working well. I forgot that toggle existed.
    🙌 1
    a
    • 2
    • 1
  • n

    Nadav

    08/03/2021, 9:22 PM
    Hi, when running on ECSRun and setting env variables they are auto casting strings to ints which causes boto3 exception, is there a way to avoid auto casting?
    k
    • 2
    • 10
  • a

    Aiden Price

    08/04/2021, 2:32 AM
    Hi folks, if I return a dictionary from a task, is there a way to unpack that result into
    kwargs
    for the next task downstream? Something like;
    # returns {"path": "some/restapi/path", "start_time": datetime...}
    tasks = generate_tasks(first_set)
    
    # Needs a path and start_time argument
    histories = fetch_history.map(**tasks)
    But this gives me the error
    expression after ** must be a mapping with a "str" key
    k
    • 2
    • 4
  • s

    st dabu

    08/04/2021, 3:37 AM
    Hi folks, was just looking at prefect example like this.. However, let's say instead of running the steps as python code, i have some cli commands.
    with Flow("NLP") as nlp_flow:
        aws s3 cp blah blah
        sometool /data/a.csv
    Is it possible to add the steps as cli commands instead ?
    k
    • 2
    • 2
  • r

    Ranu Goldan

    08/04/2021, 4:01 AM
    Hi everyone. I have had encountered this error for weeks, and it keeps getting often. Prefect cloud always showing 404 on Flow pages, only to certain flow, but now it's almost for all new Flow registered I took a look on the network tools, after it got 404, it keeps hitting API and gets 200 in return, until forever. How to resolve this?
    n
    • 2
    • 5
  • h

    haven

    08/04/2021, 6:16 AM
    Hi guys, I'm looking to build a
    class MyFlowRunner(prefect.engine.flow_runner.FlowRunner)
    (or subclassing another component - open to suggestions!) inside a custom package
    my-prefect
    to send some HTTP requests before a
    Flow
    starts, and after it ends. My questions are: • where would be the best way for me to implement such a functionality? along with the considerations: • keep the Prefect API to be as pure as possible • allow the implementation of the
    MyFlowRunner
    (or some other custom component) to be uncoupled, i.e. if a project upgrades
    my-prefect
    , the flow configuration doesn't have to change at all Would appreciate comments/suggestions/discussions from whoever are interested/have had experience doing so!
    k
    • 2
    • 6
  • b

    Ben Muller

    08/04/2021, 6:50 AM
    Hi Prefect, why is this an invalid schedule in the UI? As you can see it is valid cron syntax 🤷
    k
    m
    • 3
    • 4
  • s

    Samuel Tober

    08/04/2021, 8:05 AM
    Quick question. I know that .map() can be used as a single for-loop in a flow. Is there a way to use .map() to create a nested for-loop though?
    k
    • 2
    • 3
  • m

    Michael Fichtinger

    08/04/2021, 8:36 AM
    Quick question from a newbie in prefect. What is the easiest way to set up prefect and the agents to distribute flows to different machines in the local network (see picture). Maybe you can give me some advice and share some readings regarding this. Thanks
    k
    • 2
    • 13
  • j

    Joseph Ellis

    08/04/2021, 10:05 AM
    Hey Prefect community, does anyone know some common causes for flow runs to get stuck in the ‘Submitted’ state? We’re running a docker agent in DEV & TEST, both are deployed from the same configuration, DEV works fine, but all flows to TEST get stuck in ‘Submitted’.
    k
    • 2
    • 6
  • j

    Jai Deo

    08/04/2021, 10:41 AM
    I am trying to import pandas. I have prefect cloud, AKS agent. I have modified the yaml file for pod to include pip install pandas but when I run the flow it can't find the pandas package. When I log onto the pod where the agent is I see that it is installed when the pod gets created by Kubernetes.When I run the flow I get: Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'pandas\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
    k
    • 2
    • 41
  • s

    Sumit Kumar Rai

    08/04/2021, 11:47 AM
    Please help me to clear up my confusion on instantiating prefect flows. I have three dbt commands to run:
    dbt deps
    dbt snapshot
    dbt run
    For now I'm running them by reusing the instance of DbtShellTask:
    dbt_task = DbtShellTask(name="Running dbt commands", ...)
    
    dbt_clean_task_invoke   = dbt_task("dbt deps")
    dbt_deps_task_invoke    = dbt_task("dbt snapshot")
    dbt_run_task_invoke     = dbt_task("dbt run")
    With having this setup, my schematic will have three boxes(tasks) with name
    Running dbt commands
    which is difficult to identify which is which. In order to fix that, do I have to instantiate for each command like below.
    dbt_clean_task = DbtShellTask(name="Running dbt deps command", ...)
    dbt_deps_task  = DbtShellTask(name="Running dbt snapshot command", ...)
    dbt_run_task   = DbtShellTask(name="Running dbt run command", ...)
    
    dbt_clean_task_invoke   = dbt_clean_task("dbt deps")
    dbt_deps_task_invoke    = dbt_deps_task("dbt snapshot")
    dbt_run_task_invoke     = dbt_run_task("dbt run")
    k
    m
    • 3
    • 3
  • i

    Italo Barros

    08/04/2021, 1:15 PM
    Does someone know how to solve this issue? I can't log in since yesterday, but it is possible to reset the password.
    k
    • 2
    • 6
  • g

    Gustavo de Paula

    08/04/2021, 2:31 PM
    Hi all! Is there a way I can run a flow with a custom CloudFlowRunner without having to modify the agent code?
    k
    • 2
    • 6
  • h

    Harry Baker

    08/04/2021, 4:00 PM
    Is there a way to create a task that's reused with different parameters, that for each use is given a different name that then appears in the schematic visualization? I thought that doing
    @task(task_run_name="{name_val}")
    and then passing in a name_val variable would do it, but in the dashboard everything is still showing up as the name of the function definition
    k
    • 2
    • 6
  • n

    Nacho Rodriguez

    08/04/2021, 4:04 PM
    Hi! I've been using Prefect for a while and I'm investigating how to scale it. My organization uses Azure and I was looking for something similar to ECS+Fargate (ECSAgent) integration but in Azure. The closest thing to Fargate is Azure Container Instances. I don't want to use Kubernetes because I want to pay per task, not to have a cluster almost always on idle. Is it possible that it is going to be supported in the future (aka ACIAgent)? Thank you very much in advance
    k
    • 2
    • 4
  • d

    Dan Zhao

    08/04/2021, 4:43 PM
    Hi, my company have recently tested Prefect - we love it and we are thinking of switching from Airflow! The caveat is we need fully on-prem deployment. Is there an estimated cost (order of magnitude will do) for the enterprise option?
    k
    m
    • 3
    • 3
  • h

    Harry Baker

    08/04/2021, 5:58 PM
    What would be the best structure for when I have a task that, on success should just return data normally that is appended to a list, but on failure should run another task that gets data from another source, and appends that instead? Can you use "with case" blocks on a tasks state, rather than on a tasks output? Basically, if the original task fails, I want to replace it's output with the output of another task/function instead, which is then appended the same way
    k
    • 2
    • 22
  • d

    David Elliott

    08/04/2021, 6:36 PM
    Question about mapped tasks and custom state handlers for them - I've got a state handler for when a mapped task fails, however because the object passed to the state handler is a Task and not a TaskRun, the task.name is the generic task name, and not the mapped task name. Similarly in the state handler I don't have the task_run_id either. Is there any way to get the task_run_name in the state handler for a mapped task? Forgot to mention - if I try
    object.task_run_name
    it prints the format string used to generate the mapped task name, not the actual task name. I'm using the pattern here where my
    task_run_name = "{table_name}"
    - and I get
    {table_name}
    output in the state handler, because that's what it's set as at the Task level, rather than it being computed at the TaskRun level
    k
    • 2
    • 6
  • m

    Michael Law

    08/04/2021, 6:51 PM
    Hey folks, Looking for some assistance here if possible, as I’m a little confused. Given the flow below, which basically orchestrates a bunch of other flows, I’d expect this to run the dependant flows as per the dependencies defined, but it does not, it kicks them all of at the same time. * see thread It was my understanding given the schematic that is drawn that, that it would wait for the upstream tasks to be completed (maybe my upstream vs down stream is confused here). The ‘FlowRunner’ class here is just a wrapper around an environment variable to run locally or via Kubernetes. * see thread Can anyone see any obvious mistakes here.
    m
    k
    • 3
    • 32
  • s

    Samuel Kohlleffel

    08/04/2021, 6:55 PM
    What would be the approach to conditionally register a flow depending on both the flow itself and the tasks used by the flow? So, the
    idempotency_key
    used when calling
    flow.register()
    would take into account both flow and task changes. From what I can tell,
    flow.serialized_hash()
    only changes if changes are made to the flow's metadata. It does not take into account if a task within a flow changes.
    m
    • 2
    • 4
  • h

    Harish

    08/04/2021, 7:17 PM
    Hi community, how can I pass parameters to a ShellTask? I can't seem to find an example for it in docs
    k
    • 2
    • 5
  • b

    Ben Muller

    08/04/2021, 11:03 PM
    Hey community, is there a way to loop over a group of tasks in sequential order ? Looping ironically doesnt seem to do this. I dont want to transfer any data between loops, just kind of want to hack a flow to run multiple times within the minute
    k
    • 2
    • 4
  • h

    Hui Zheng

    08/04/2021, 11:35 PM
    Hello, How could we disable flow run retry when this below happen?
    No heartbeat detected from the remote task; retrying the run.This will be retry 1 of 2.
    In most situation, we do want to retry the run. But we have a particular flow that we actually don’t want any retrying when the run lost the heartbeat. we just it to fail. How could we do that?
    k
    c
    • 3
    • 3
  • b

    Ben Muller

    08/05/2021, 1:23 AM
    Hey community, Trying to write a graphql query that I can specify the
    project
    and the
    flow name
    and get a list of currently active flows and their state ( running, or complete etc ). Is this possible ? Cant seem to find anything for the use case...
    k
    • 2
    • 7
  • d

    Dotan Asselmann

    08/05/2021, 8:05 AM
    Hi everyone, I noticed that when we’re submitting flow run [create_flow_run] we got very long response times, usually several seconds [about 5s] sometimes upto 15sec? is that expected behaviour?
    k
    • 2
    • 3
  • s

    Samuel Hinton

    08/05/2021, 8:49 AM
    Hey all! Wondering if anyone knows if Prefect has any prometheus metrics exposed. We’re trying to figure out how to integrate our current AlertManager monitoring with prefect server and its flows. Have other people solved monitoring in an elegant way?
    d
    • 2
    • 2
Powered by Linen
Title
s

Samuel Hinton

08/05/2021, 8:49 AM
Hey all! Wondering if anyone knows if Prefect has any prometheus metrics exposed. We’re trying to figure out how to integrate our current AlertManager monitoring with prefect server and its flows. Have other people solved monitoring in an elegant way?
d

davzucky

08/05/2021, 10:47 AM
They are multiple people interested about the same feature. We opened an issue about it that you can get here https://github.com/PrefectHQ/server/issues/266 Please add your requirements as well
🚀 1
👍 1
s

Samuel Hinton

08/05/2021, 10:50 AM
Fantastic ,thanks mate
View count: 4