https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Jake

    07/07/2022, 10:14 PM
    We want to be able to turn any function into single-task flows. I’ve created a wrapper to do this (as pictured). Calling
    .run()
    on the flow that gets returned works fine, and registering the flow seems to be fine too (using k8s run). However, when we try to actually run it, we get the following error:
    Failed to load and execute flow run: FlowStorageError('An error occurred while unpickling the flow:\n  ModuleNotFoundError("No module named \'build_index\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
    Am I missing something obvious?
    k
    7 replies · 2 participants
  • c

    Cole Murray

    07/08/2022, 3:27 AM
    Hey Prefect community, I’m working with Orion and trying to create a deployment from the API client. I’m able to create it from the CLI, but for my use-case would prefer calling with the api client to ease of use / response parsing. It appears we haven’t updated this yet on the docs page: https://orion-docs.prefect.io/concepts/deployments/#with-the-api Playing around, I have something like this:
    from typing import List
    from uuid import uuid4
    
    from prefect.client import get_client
    from prefect.flow_runners import SubprocessFlowRunner
    from prefect.flows import Flow
    from prefect.orion.schemas.data import DataDocument
    from prefect.orion.schemas.schedules import CronSchedule
    from workflow_etl.flows.flow import hello_world
    
    def main():
        prefect_client = get_client()
    
        schedules = [] # TODO
    
        for schedule in schedules:
            flow_id = await prefect_client.create_flow(hello_world)
            prefect_client.create_deployment(flow_id=flow_id,
                                             name=schedule.id,
                                             schedule=CronSchedule(cron="0 * * * * *"),
                                             parameters={
                                                 **schedule.workflow_params
                                             },
                                             tags={
                                                 'owner_id': schedule.owner_id,
                                             },
                                             flow_runner=SubprocessFlowRunner(),
                                             flow_data=# UNKNOWN WHAT TO PUT HERE
                                             )
    I’m a bit stuck on what to put in the flow_data argument. Anyone tried this / have a link to sample?
    a
    3 replies · 2 participants
  • a

    Arnas

    07/08/2022, 8:06 AM
    Hello, a bit of an odd one, as I am not sure what I am doing wrong here. Effectively, a very simple task seems to be running forever when run via Prefect Cloud (using Local Agent, Prefect 1.0). The same flow when run on the machine that runs the local agent seems to work fine. Code:
    from prefect import task, Flow, Parameter
    
    @task
    def task_function(in_list_size=10, out_list_size=4):
    
        in_list = list(range(in_list_size))
        print(f">>> Input list: {in_list}")
        
        out_list = list(range(out_list_size))
        print(f">>> Output list: {out_list}")
        
        diff_list = [i for i in in_list if i not in out_list]
        print(f">>> Result list: {diff_list}")
        
        return diff_list
    
    
    with Flow("Test Flow") as flow:
    
        in_list_size = Parameter("input_list_size", default=10)
        out_list_size = Parameter("output_list_size", default=4)
        result = task_function(in_list_size=in_list_size, out_list_size=out_list_size)
    
    
    flow.register(project_name="tutorial")
    
    if __name__ == "__main__":
        flow.run()
    Trying to figure out where the problem is - guessing something is wrong on the Local Agent side?
    a
    m
    +1
    12 replies · 4 participants
  • p

    Pierre-Edouard

    07/08/2022, 9:15 AM
    Hello the community. I’m stuck in my code since days. It’s really annoying because It was supposed to be the easiest flow I develop with prefect. So, the algo: I want to read the content of a big redis database (redis.scan_iter()), filter to keep key which not matching a pattern and then remove record (easy !). My problem, I don’t want to load all my redis in memory in the first step task (my dask workers will not handle) and start filtering after. I want to generate batchs of keys and process each batch directly (map feature so). If I run my code without .map(), it work but It’s sequential (I load all my key before filtering). When I run it with .map() the filtering task return directly a failed status without any print error. I have no idea why it fail ! Someone have a clue?
    r
    a
    14 replies · 3 participants
  • t

    Tom Klein

    07/08/2022, 9:34 AM
    Hello 🙋 If we wanna use a
    LocalDaskExecutor
    for our flow, but to be able to limit the parallelisation (due to each task requiring a lot of resources when being run) to - for example - only two tasks at a time, is that possible? I read the docs but still don’t fully understand if we have to use a
    DaskExecutor
    for this, and if so - would it by default run locally (if we don’t give it any other config)? and kind of tangenial (and not directly related to prefect) but is there some advantage of something like the AWS fargate cluster for dask of a k8s dask cluster? is the former just easier to set up or something?
    p
    a
    9 replies · 3 participants
  • t

    Tarek

    07/08/2022, 9:50 AM
    hello, our cloud colleagues want to deploy prefect 2.0/orion to kubernetes, then need this information: helm chart repository and url where i could find this information?
    ✅ 1
    a
    g
    13 replies · 3 participants
  • a

    Andreas

    07/08/2022, 9:51 AM
    Hi! Prefect 2.0 has the ability to enforce pydantic checks for flow input which is a really nice feature. So, when we pass invalid parameters to a flow it goes straight from a pending state to a failed state as expected and we get an error in the form of:
    09:47:51.189 | INFO    | prefect.engine - Flow run 'shiny-falcon' received invalid parameters and is marked as failed
    However this information is quite limited. Is there a way to get more information which of the parameters failed to pass the pydantic check?
    ✅ 1
    👀 1
    a
    m
    3 replies · 3 participants
  • k

    Keith

    07/08/2022, 2:00 PM
    Hi All! I have been loving my experience with Prefect but now that I am deploying to Kubernetes (GKE) I am having the all too common problem of it running fine locally but not in the cloud. Initially I was running into
    OOMError
    but got around that by increasing the memory requested from k8s. Now I am stuck on having the ZombieKiller stop tasks that should be reporting back status since they continue to produce logs with the message
    No heartbeat detected from the remote task; marking the run as failed.
    I have attempted to set the
    HEARTBEAT_MODE
    to
    thread
    and
    off
    via the
    config.toml
    file as well as in the
    KubernetesRun
    environment variables but no matter the combination I setup I still run into heartbeat errors killing a process. I am curious if there is a way to send a heartbeat from within the code base or if there is another approach I should take to get around my time out issue. Cheers!
    ✅ 1
    a
    r
    8 replies · 3 participants
  • b

    Bogdan Serban

    07/08/2022, 2:04 PM
    Hello everyone! I am planning to build an image processing pipeline which consists of both ML and non ML processing. I am planning to build each processing step as an independent function (Prefect Task) and successively apply these functions on each image. I will be pulling the images from a cloud storage container. The ML processing will require some GPU acceleration. My question here is twofold: 1. How do I load and share the ML models to be used in running the inference? I have some pytorch models right now. 2. Is it possible to specify on what type of node (GPU/non-GPU) each task will run on? I want that the ML inference functions to run on GPU nodes, and non-ML on CPU nodes. I would really appreciate your answers! Thanks!
    👀 1
    ✅ 1
    a
    3 replies · 2 participants
  • m

    Marcin Grzybowski

    07/08/2022, 2:42 PM
    Maybe someone will need this: So after some investigation my conclusion is that docker integration with wsl2 is the root cause. I got some successes after reinstalling wsl, but then it was failing again 😕 strange thing that it sometimes works and sometimes not. Even stranger, that when I try to run it in wsl2 docker, then it breaks wsl and I cannot even run prefect as "pure python" (without docker, just local python script) disabling wsl2 docker integration helps, but then I cannot then run docker in wsl 😞 So for now I'll stick to running it in windows docker ...
    r
    4 replies · 2 participants
  • a

    Adam

    07/08/2022, 2:47 PM
    hey folks. Hopefully simple question, I’m testing out prefect as a replacement to my org’s current tools. I want the ability to restart a failed flow manually, AFTER I make changes in the code base. Basically — I’m anticipating flows to fail and I’ll need to make tweaks to our credentials or logic in the code. But I don’t want to re-run the entire flow, just pick up at the current task where it failed after making changes. I can’t seem to register the new version of my edited flow as the same version id as the prior one. I’d need to do this because I need to restart the prior version that failed. The only thing I can do is publish the new version say (version id 15) and then version 14 that failed get’s archived and I can’t restart it with the new code base from version id 15
    k
    28 replies · 2 participants
  • j

    Jelle Vegter

    07/08/2022, 2:59 PM
    Hi all, I’m wondering where most people have the prefect agent running? Do you use a virtual machine or something like Azure Container Instance? Thanks!
    ✅ 1
    a
    t
    4 replies · 3 participants
  • j

    Jan Domanski

    07/08/2022, 5:40 PM
    Is prefect2 beta down for everyone or just for me?
    👀 1
    ✅ 1
    k
    2 replies · 2 participants
  • e

    Erick House

    07/08/2022, 6:12 PM
    When will secrets be available in the prefect cloud beta? Will the mechanisms be different from prefect 1.0? I'm not seeing anything in the docs that are specific to prefect 2.0. I do not have experience with prefect 1.0. I'm creating a prototype to see if our existing workflows can be ported over to prefect 2.0 when it's ready.
    ✅ 1
    k
    a
    5 replies · 3 participants
  • t

    Tim Helfensdörfer

    07/08/2022, 6:30 PM
    In the prefect 2b8:
    File "/Users/****/Library/Caches/pypoetry/virtualenvs/equation-kGZ4A1K9-py3.8/lib/python3.8/site-packages/prefect/cli/cloud.py", line 231, in login
        exit_with_success(
      File "/Users/****/Library/Caches/pypoetry/virtualenvs/equation-kGZ4A1K9-py3.8/lib/python3.8/site-packages/prefect/cli/_utilities.py", line 36, in exit_with_success
        app.console.print(message, **kwargs)
    AttributeError: 'PrefectTyper' object has no attribute 'console'
    When trying to run (which worked in 2b7:
    await login(
                key=os.environ.get("PREFECT_API_KEY"),
                workspace_handle=os.environ.get("PREFECT_WORKSPACE_HANDLE"),
            )
    k
    m
    6 replies · 3 participants
  • s

    Scott Henley

    07/08/2022, 6:30 PM
    Greetings, I am having an issue registering a Flow with a simple ShellTask (Prefect/Cloud 1.0). The RunConfig for the flow is pointing to a custom image in ECR that contains the binary that we are attempting to run via the shell task (using the ECSRun type). The path to the binary in the shell command does not exist locally. When I attempt to register the flow, it fails with a “no such file or directory” error. Does the register cli command attempt to verify the shell task locally? How can I get around this when I only want it to reference the location in the container image?
    k
    16 replies · 2 participants
  • b

    Binoy Shah

    07/08/2022, 6:33 PM
    Regarding the Recent changes in Prefect Orion, Blocks - Is blocks needed if our ecosystem is Kubernetes + ConfigMap for Non-secure values and Kubernetes + Vault/Opaque secrets for Secure values ?
    k
    3 replies · 2 participants
  • k

    Kevin Kho

    07/08/2022, 6:48 PM
    Thank you to everyone here! Just letting you all know July 22 will be my last day at Prefect.
    😮 1
    🖖 1
    t
    r
    3 replies · 3 participants
  • o

    Omar Sultan

    07/08/2022, 9:10 PM
    Hi Guys, we are currently relying on Prefect 1.2 for some critical production ETLs. We are starting to discuss whether we should consider moving to Prefect 2.0. However, I can't find anything in the documentation related to an Imperative API, Do we still have one in Prefect 2.0?
    k
    5 replies · 2 participants
  • w

    Walter Cavinaw

    07/08/2022, 9:37 PM
    we have a schedule with two clocks: every friday, and the first of every month. Sometimes these overlap. Will prefect generate two events, or just one?
    k
    1 reply · 2 participants
  • a

    Apoorva Desai

    07/08/2022, 11:13 PM
    Can we schedule flows so a new run is created as soon as the previous one finishes?
    ✅ 1
    a
    n
    +1
    7 replies · 4 participants
  • j

    Jack Sundberg

    07/09/2022, 7:09 PM
    Hello! The Orion beta is amazing. Thanks for putting together such an awesome package. Could I get some feedback on the client's use with filters? I'm not sure if there's a better way to achieve what I'm doing:
    # Minimal example of my current use, where I need to build filter objects
    
    from prefect.client import get_client
    from prefect.orion.schemas.filters import FlowFilter, FlowRunFilter
    
    async with get_client() as client:
        response = await client.read_flow_runs(
            flow_filter=FlowFilter(
                name={"any_": ["example-flow"]},
            ),
            flow_run_filter=FlowRunFilter(
                state={"name": {"any_": ["Completed", "Running"]}}
            ),
        )
    Is there a way to query with the client like this instead:
    # Avoids the need to import and manually initialize FlowFilter+FlowRunFilter
    
    from prefect.client import get_client
    
    async with get_client() as client:
        response = await client.read_flow_runs(
            flow_filter={"name": {"any_": ["example-flow"]}},
            flow_run_filter={"state": {"any_": ["Completed", "Running"]}},
        )
    Or even:
    # Allows exact matching without use of {"any_": [...]}
    
    from prefect.client import get_client
    
    async with get_client() as client:
        response = await client.read_flow_runs(
            flow_filter={"name": "example-flow"},
            flow_run_filter={"state": "Completed"},
        )
    🚀 1
    ✅ 1
    a
    17 replies · 2 participants
  • t

    Tom Klein

    07/10/2022, 9:54 AM
    Hey again 🙋 We’re trying to test out the (ad-hoc) Dask cluster option for execution but we’re getting errors it and seems like we’re missing something (code in the thread)
    ✅ 1
    a
    o
    75 replies · 3 participants
  • t

    Tom Klein

    07/11/2022, 12:29 AM
    Aaand -- me again 😳 with a related but completely different issue 🤷‍♂️ Question: when using a
    DaskExecutor
    (that relies on a dask
    KubeCluster
    ) - are task Results handled in the flow, or is the Result handling delegated in the dask workers? I'm asking because when we swap from a
    LocalExecutor
    or a
    LocalDaskExecutor
    to a
    DaskExecutor
    - suddenly our S3Results (which all our tasks are configured with) seem to fail on
    AccessDenied
    errors (for
    PutObject
    attempts) So logically it seems like they are being run from somewhere else that doesn't have the proper permissions (whereas the k8s job running the flow itself, does) --- am i missing something?
    ✅ 1
    a
    27 replies · 2 participants
  • j

    Jack Sundberg

    07/11/2022, 3:41 AM
    Can you use orion on MacOS? I don't see any mention of macs in the installation docs and it looks like orion's CI only tests linux+windows. I'm asking because my test suite is passing on linux+windows but failing on mac -- just want to know if the issue is on my end or not.
    ✅ 1
    a
    4 replies · 2 participants
  • s

    Shivam Bhatia

    07/11/2022, 6:20 AM
    Hey I am trying to run a flow using DockerDeployment using prefect 2.0 I have created a docker image of my flow environment and uploaded it on docker hub. I am getting this error
    docker.errors.NotFound: 404 Client Error for <http+docker://localhost/v1.41/containers/ab511ceee07d320a6e458ba7a063b9cf92ba177e49bf84657e85b0f613ae6f4d/wait>: Not Found ("No such container: ab511ceee07d320a6e458ba7a063b9cf92ba177e49bf84657e85b0f613ae6f4d")
    What am i missing?
    ✅ 1
    a
    3 replies · 2 participants
  • x

    xyzz

    07/11/2022, 7:51 AM
    Hello, regarding Prefect 2.0: just wondering... are blocks supposed to be the final answer for secrets or will there be further abstractions for supporting e.g. Hashicorp Vault?
    ✅ 1
    a
    a
    +3
    19 replies · 6 participants
  • a

    Andreas

    07/11/2022, 8:44 AM
    Hi! Thanks for the huge Prefect 2.0b8! I have a question regarding Deployments. Since Deployments based on
    DeploymentSpec
    are deprecated as of Prefect 2.0b8 release, does that mean that defining Deployments in YAML is going to also be depracated in the future? Can I still base my work on YAML?
    ✅ 1
    a
    1 reply · 2 participants
  • s

    Slackbot

    07/11/2022, 9:45 AM
    This message was deleted.
  • f

    Florian Guily

    07/11/2022, 10:04 AM
    Hey, prefect 1 here, just noticed that i have a flow in a "cancelling" status for 3 days now (i just noticed it now with the week end...) it is running on an eks fargate node also for 3 days and i don't know how to terminate it. Should i just kill the pod ?
    ✅ 1
    a
    5 replies · 2 participants
Powered by Linen
Title
f

Florian Guily

07/11/2022, 10:04 AM
Hey, prefect 1 here, just noticed that i have a flow in a "cancelling" status for 3 days now (i just noticed it now with the week end...) it is running on an eks fargate node also for 3 days and i don't know how to terminate it. Should i just kill the pod ?
✅ 1
a

Anna Geller

07/11/2022, 10:56 AM
Yup, you're spot on, you can kill the pod, manually delete the Kubernetes flow run job if needed + set the flow run state to Failed in the UI
🙏 1
f

Florian Guily

07/11/2022, 12:32 PM
Thanks! How could i prevent this to happen again ?
a

Anna Geller

07/11/2022, 12:49 PM
it depends on what was the root cause of this cancellation
f

Florian Guily

07/11/2022, 12:50 PM
if i remember correctly it was from the ui
a

Anna Geller

07/11/2022, 1:03 PM
ahh gotcha, this is hard to do since Prefect doesn't have access to your infra and doing sth like killing a pod is hard (if not impossible) to do without access to the infra
View count: 3