https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • l

    Leon Kozlowski

    02/08/2022, 9:47 PM
    Does anyone have agent deployments in multiple cloud providers pointing to the same prefect cloud account? I have a use case where I plan to access some data in GCP, while my agents are deployed on AWS EKS clusters - I’ve considered going with some sort of peering approach, or simply standing up an agent on GCP GKE
    k
    8 replies · 2 participants
  • t

    Tony Yun

    02/08/2022, 9:51 PM
    Hi, do you may know how to add Context from the UI? I have variable defined in
    [context]
    local
    /.prefect/config.yaml
    but when released to Cloud, it says the CONTEXT not exists. But I don’t find where can create that on UI.
    k
    5 replies · 2 participants
  • g

    Gaurang Katre

    02/08/2022, 10:25 PM
    Hi, I am looking for a Prefect task for doing an BigQuery export to GCS. Something similar to BigQueryTask but instead of writing to a
    table_dest
    , to write to a GCS bucket. Is there a Task or a way to do this?
    a
    7 replies · 2 participants
  • i

    Ifeanyi Okwuchi

    02/08/2022, 10:39 PM
    I'm running the following flow using
    create_flow_run
    to run a flow multiple times with different parameters. Here is my code and the error i'm getting.
    k
    28 replies · 2 participants
  • v

    vinoth paari

    02/09/2022, 5:00 AM
    Hi How to call outside flow from task. I am getting error
    k
    17 replies · 2 participants
  • k

    Kevin

    02/09/2022, 5:07 AM
    is it possible to get prefect to take in a boolean parameter? Or do all parameters need to be strings?
    k
    m
    +1
    8 replies · 4 participants
  • r

    Romain

    02/09/2022, 7:10 AM
    Hello, It might be a silly question but I can't really figure out the answer. One of the tasks of our flow is retried while it was previously already successful (see screenshot). And because this is a task calling a DELETE API endpoint, the retried task failed (because the object has already been removed). Although I could easily handle such thing, I would like to understand a bit more what's going on. I suspect something failed somewhere in the flow, and the flow is restarted from scratch, and all the tasks are restarted? I could not really find anything in the doc about that. What would cause a flow to retry a task that has been previously successful? Some precision: prefect core 0.15.12 prefect server 2022.01.12
    k
    5 replies · 2 participants
  • v

    vinoth paari

    02/09/2022, 7:50 AM
    Hi Flowrun values is same when i call the task from other task . i want generate multiple flow run for each call to task
    k
    6 replies · 2 participants
  • m

    Muddassir Shaikh

    02/09/2022, 8:59 AM
    Hi, I have a Flow_1 which runs on a Agent_A (machine A), after completing few task, Flow_1's task has start/trigger another flow Flow_2's task on another Agent_B (machine B). Each Code of Flow_1 and Flow_2 are present on different machines(Agent A and B). Basically Flow_2 has tasks whose upstream task are present in Flow_1. I have read about flow_to_flow in prefect but that works for 1 Agent and cannot be mapped for Flow_1.task_A => Flow_2.task_B . How can i achieve this ?
    k
    1 reply · 2 participants
  • m

    Michail Melonas

    02/09/2022, 10:33 AM
    @Kevin Kho When using the
    KubernetesRun
    run configuration, I want to specify a custom Kubernetes Job spec in order to update the
    envFrom:
    value. Is there an appropriate template spec that I can use as a starting point?
    k
    1 reply · 2 participants
  • a

    Alexander Melkoff

    02/09/2022, 10:57 AM
    Hello! I'm trying to figure out the way to organize code in my flow. I'm using Kubernetes agent and GitLab storage so my flow code is being pulled from GitLab into kubernetes pod. The problem is that while Prefect clones the whole repo, other python modules from the same repo are not available for import. It is not an option to store the whole flow code in a single file as it can get enormously large. How do you handle code organization within your flows?
    k
    1 reply · 2 participants
  • p

    Peter Peter

    02/09/2022, 2:28 PM
    Hello, I have a task that I am calling map on using Dask executor which works fine until there is an error. Sometimes I get an "Unexpected error: KilledWorker" which in my case seems to be because of dask worker has run out of memory. There are only a handful of cases where this happens but when it does it kills the whole flow run. Is there any way to handle this without killing the whole flow? I want to have 2 downstream tasks that would take the filtered results and act on them. Any help would be appreciated.
    k
    15 replies · 2 participants
  • a

    Andreas

    02/09/2022, 2:53 PM
    Hi! When writing DWH tables from Prefect, are there any recommended metadata fields that should be attached to the rows. I'm thinking about things like flow label and version as well as created / updated timestamps. Are there ready to use functions in Prefect for this?
    k
    2 replies · 2 participants
  • a

    Alex Furrier

    02/09/2022, 4:11 PM
    Is there an example of a repo or something that has a nice set up for production grade Prefect code (particularly flows, tasks, etc)? We use Prefect on my team but right now somewhat struggling to find a good set up for re-usable tasks and flows that are well tested. We also use prefect server at the moment self hosted on K8s
    k
    1 reply · 2 participants
  • n

    Niels Prins

    02/09/2022, 4:25 PM
    Hi all, I'm running into a issue with RunGreatExpectationsValidation, hope you can help. When I run it directly
    flow.run()
    it works fine. When I register the same flow to a local server and run it with a local prefect agent I get the following error.
    File "/home/prinsn/Code/prefect/repo/gima-prefect-cicd/.env/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
        return Pickler.dump(self, obj)
    TypeError: 'NoneType' object is not callable
    I' m using: ge api V3 prefect==0.15.13 great_expectations==0.14.5 Any pointers? =D
    a
    8 replies · 2 participants
  • s

    Sean Talia

    02/09/2022, 4:33 PM
    Hi all, does anyone here have with running Prefect flows using AWS ECS tasks on EC2? My org currently runs a lot of flows using ECS tasks on Fargate, but Fargate has some constraints that EC2 doesn't (e.g. you can't use GPU-enabled instances w/ Fargate). In order to accommodate some of these requests, I was considering either: 1. Launching a new EC2 instance (that has all the hardware requirements that the requesting teams need) on which we'd have a DockerAgent running, and then use labels to manage which
    DockerRun
    flows will or won't run on that beefed-up EC2 instance 2. Using ECS on EC2, and then simply manage the execution of the flows on the EC2 instance by configuring an
    ECSRun
    flow to use an ECS task that executes on EC2 The former option seems a little more straight-forward, and we could get it up and running pretty quickly, but it would involve overhead of managing labels, new API keys for the agents, etc. The latter is probably more flexible in the end, but there's more up front work for us since ECS on EC2 is not a workflow that we currently support. Has anyone ever deliberated over this issue or experimented with it?
    k
    13 replies · 2 participants
  • c

    Chris Reuter

    02/09/2022, 7:55 PM
    Hey all 👋 @Michael Adkins is our primary guest this week on PrefectLive. We're going live in 5 minutes, come hang out in the chat and ask Michael the most complex question you can think of. We also have a special guest joining us - our very own @justabill! See you at twitch.tv/prefectlive!
    :marvin: 4
  • c

    Chris White

    02/09/2022, 8:09 PM
    @Michael Adkins has been leading the effort in making Orion async-first, parallel by default, and more memory efficient so if you attend one Twitch stream this is the one ^^
    :upvote: 5
    :themoreyouknow: 3
  • t

    Tony Yun

    02/09/2022, 8:21 PM
    Hi, I see a weird issue when registering the flow. Do I have to change anything to make it work? (please see exception logs in this thread)
    k
    7 replies · 2 participants
  • e

    E Li

    02/09/2022, 8:33 PM
    Hi, I have task A and task B, both mapped and A is the upstream task of B. Is there a way of having the successful children task in A move on to B regardless of the state of other children tasks in A?
    k
    21 replies · 2 participants
  • f

    FuETL

    02/09/2022, 8:42 PM
    Hey guys i getting this error while running my flow. Is there something i can look up to verify this error? I'm using the correct images from docker
     Failed to load and execute Flow's environment: StorageError('An error occurred while unpickling the flow:\n  ModuleNotFoundError("No module named \'testing_flow\'")\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n  - cloudpickle: (flow built with \'2.0.0\', currently running with \'1.6.0\')\n  - python: (flow built with \'3.9.7\', currently running with \'3.7.10\')\nThis also may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
    k
    9 replies · 2 participants
  • f

    Farid

    02/09/2022, 8:58 PM
    Hi, I’m using
    GITLAB
    as storage and
    Kubernetes
    as agent to run prefect flows. I noticed I get dependency errors when a custom pip package is used inside the flow, in this case soda:
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'sodasql'")
    Is there a way to address these dependency issues without using Docker images to store the flows?
    k
    s
    22 replies · 3 participants
  • k

    Kevin Mullins

    02/09/2022, 9:53 PM
    I’m starting to use
    create_flow_run
    ,
    wait_for_flow_run
    , and
    map
    to fan-out via sub-flows. Is there any way to get more friendly task names using these tasks so I can more easily identify what child task is still running? Right now I have to go into the parent flow, look under logs for the URL for the created child flow run and then navigate separately. Ideally (maybe in Orion some day) it would be nice if the lineage between parent and child flows could be visualized; however, I realize this may be difficult thing to do. I’m just trying to find ways to make it as user friendly for an engineer to go in and understand which sub-flows failed or are running/etc.
    m
    15 replies · 2 participants
  • j

    Joseph Mathes

    02/10/2022, 1:12 AM
    Does "storage" mean the storage used to store the definitions of flows? The caching of task state? Eventual results? If it's more than one of the above, in particular if it's both the eventual results store and the flow definition store, then how can I separate those?
    k
    1 reply · 2 participants
  • j

    Joseph Mathes

    02/10/2022, 1:12 AM
    I'm trying to get prefect running with Dask, using Prefect Cloud. I'm assuming here that Prefect Cloud is whatever runs or helps running flows that are shown in the prefect.io UI. I have some questions about terminology, which could mostly be summarized by asking "What exactly do flow runners, executors, agents do? What is the timeline of their communication with each other? Where do they run?" FlowRunners handle the execution of Flows and determine the State of a Flow before, during and after the Flow is run. A flow's Executor is responsible for running tasks in a flow. During execution of a flow run, a flow's executor will be initialized, used to execute all tasks in the flow, then shutdown. Agents orchestrate flow runs. Agents start and monitor flow runs. During operation the agent process queries the Prefect API for any scheduled flow runs, and allocates resources for them on their respective deployment platforms. So, when I call flow.run() with a flow that’s configured to run using Prefect cloud resources and coiled/dask, where is the FlowRunner? Is it my local machine, the process that called flow.run()? Or the cli, with which I launched a flow? In Prefect Cloud, there will be a flow visible on the UI, waiting for an agent with the right flags to connect. What do you call the process that is waiting for an agent to connect? If it’s the flow runner, then what do I call the local process that called flow.run()? If an agent does connect to Prefect Cloud and starts orchestrating the flow, what does it launch? An executor? A flow runner? Where does it run them? How does the flow runner learn the location of the executor? According to documentation, the flow runner loops over tasks and sends them to an executor. I’m guessing the executor then finds resources on which to launch TaskRunners. If it’s a DaskExecutor, I’m guessing it looks for Dask workers I’ve provided. However, it seems as though it does not launch dask workers, because that’s what an Agent does. If Agents allocate resources, and Dask is a recommended source of resources, why is there no DaskAgent? Why only Docker, Kubernetes, Vertex, and ECS?
    k
    m
    3 replies · 3 participants
  • m

    Marcos Lopes Britto

    02/10/2022, 2:20 PM
    Hi Guys, My name is Marcos Lopes, I’m from Brazil and I have been working on Operations Director at IntuitiveCare. I love programming and I’m studying now Prefect to help me and my team to create some automations process for IC.
    👋 5
    k
    1 reply · 2 participants
  • t

    Thomas Hoeck

    02/10/2022, 3:34 PM
    Hi. How is the logic around retries and resummiting of flows? I have a flow with some short retry delays (30 secs). This flow will stay running and wait for retry. I have another flow were the retry delay is longer (15 min) and here the flow gets resubmitted. What is the difference here? It yields some issues as there is no need for storing results in first scenario (they stay in memory) while in the second scenario they must stored so they can be used when the flow run is re-run.
    k
    9 replies · 2 participants
  • c

    Chris Arderne

    02/10/2022, 4:02 PM
    I need some help understanding memory limits in
    DaskExecutor
    ! If I run the code below without the
    DaskExecutor
    , it gets to 40/50GB before being Killed because of OOM. However, if I run it as-is (i.e., on
    DaskExecutor
    ), it fails at 10GB, tries three times, and then gives a
    distributed.scheduler.KilledWorker
    . Does Prefect do anything to the Dask worker memory limits? I'm facing this issue running with KubernetesRun and a DaskExecutor with a KubeCluster backend, where the worker spec has 50GB of memory for both k8s and the dask worker limit. It still fails around the 10GB mark. (In practise this is while loading PyTorch models and predicting.)
    import numpy as np
    import prefect
    from prefect.executors import DaskExecutor
    from prefect import Flow, task
    
    @task
    def test_memory_limits():
        logger = prefect.context.get("logger")
        for size in [1, 2, 5, 10, 20, 30, 40, 50]:
            logger.warning(f"Creating array with {size=} GB")
            a = np.ones((size * 1000, 1000, 1000), dtype=np.uint8)
            logger.warning(f"Created with size {a.nbytes/1e9=} GB")
            del a
            logger.warning("Deleted")
    
    
    with Flow(
        "test-memory",
        executor=DaskExecutor(),
    ) as flow:
        _ = test_memory_limits()
    k
    5 replies · 2 participants
  • a

    Andrew Lawlor

    02/10/2022, 4:25 PM
    does anyone have good patterns for registering flows via ci/cd? ideally id like to register only the flows that changed automatically on a git push, and be able to use different run configs for each, while also handling env vars that may be different in prod/dev. currently i have a script that loops through all my files in my flows directory and registers all flows using extract_flow_from_file. it runs on a push but the way i have it set up it: • always registers all flows, regardless of whether they have changed or not • has to pass the same env values to all flows, regardless of what the flow actually needs • passes the same run config to all flows, not allowing me to customize memory requests per flow any guidance on what to do here?
    k
    4 replies · 2 participants
  • c

    Christopher

    02/10/2022, 5:58 PM
    I've got an
    ECSAgent
    running (in ECS, using the base prefect image with
    prefect agent ecs start
    ), but the tasks it spins up are in the wrong subnet so I want to customise the task definition. It looks like I can pass in a task definition path but that's a bit troublesome because now I need to store file somewhere accessible to the agent. Is there a way to pass a task definition ARN instead? It looks like I can pass that to
    ECSRun
    but the subnet ID is generated by Terraform so I can't figure out how to get it into the Python...
    k
    13 replies · 2 participants
Powered by Linen
Title
c

Christopher

02/10/2022, 5:58 PM
I've got an
ECSAgent
running (in ECS, using the base prefect image with
prefect agent ecs start
), but the tasks it spins up are in the wrong subnet so I want to customise the task definition. It looks like I can pass in a task definition path but that's a bit troublesome because now I need to store file somewhere accessible to the agent. Is there a way to pass a task definition ARN instead? It looks like I can pass that to
ECSRun
but the subnet ID is generated by Terraform so I can't figure out how to get it into the Python...
k

Kevin Kho

02/10/2022, 6:14 PM
Have you seen this ? You can pass it to the agent. Or am I missing something?
c

Christopher

02/10/2022, 6:28 PM
I think that's the Arn for the task role, not the task definition? Or there's the --task-definition Param but that takes a path and I was trying to avoid uploading a file separately
k

Kevin Kho

02/10/2022, 6:37 PM
Ah my bad. You are right. Let me think about this
Yes you are right you can not pass the task definition arn, only a file.
Is the subnet generated on the fly?
c

Christopher

02/10/2022, 9:32 PM
it's not totally dynamic, but we have multiple environments with different subnets, so I can't just bake it into the ECSRun
I've actually realised that the subnet isn't specified as part of the task definition anyway, so now I have no idea how to get the task launched in the correct subnet
k

Kevin Kho

02/10/2022, 10:25 PM
Oh yeah that is true. You need subnet to go in
run_task_kwargs
or on the agent
run_task_kwargs
Like this
You might have you separate out the agents into different subnets and then register the flows with different labels to handle it like that?
c

Christopher

02/11/2022, 8:51 AM
I've got it to work by putting the run_task_kwargs in S3 and reading it from there. I'm happy to share my solution in case it helps future readers - is there a good place for me to do that?
k

Kevin Kho

02/11/2022, 1:52 PM
Yes! Slack is too temporary so I’d appreciate a post with your setup here so that I can link it to future people
Also, if ever you make a blog, we’d publicize on our end
View count: 7