https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • b

    Ben Muller

    11/12/2021, 12:51 AM
    Hey Prefect, would someone please be able to get in touch with me to tell me how I can purchase some additional k'v pairs to store in my kv storage on prefect. I have hit my limit and have been made aware that I can buy additional ones?
    k
    • 2
    • 2
  • s

    Sergey

    11/12/2021, 6:43 AM
    Hello everyone, what could be the problem?
    Failed to load and execute Flow's environment: StorageError('An error occurred while unpickling the flow:\n AttributeError("Can\'t get attribute \'_unpickle_timestamp\' on <module \'pandas._libs.tslibs.timestamps\' from \'/usr/local/lib/python3.7/site-packages/pandas/_libs/tslibs/timestamps.cpython-37m-x86_64-linux-gnu.so\'>")\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n - prefect: (flow built with \'0.15.3\', currently running with \'0.14.19\')\n - python: (flow built with \'3.7.3\', currently running with \'3.7.4\')')
    Is it that I have a version older than on the server?
    a
    • 2
    • 17
  • a

    Adam Everington

    11/12/2021, 7:36 AM
    Is it possible to get the flow name through the task in a state_handler? eg:
    def my_state_handler(task,new_state,old_state)->None:
      send_message(f'the following task failed {task.name} within this flow: {[GET FLOW NAME HERE}]')
    
    @task(on_failed=my_state_handler)
    def my_task():
      ....
    Had a look through the task class on github and I can see it's passed to various methods but couldn't see a property it persisted in
    a
    • 2
    • 4
  • c

    Chris L.

    11/12/2021, 9:39 AM
    Hello all, I’m wondering if anyone has a solution to showing private images in Prefect artifacts?
    a
    • 2
    • 13
  • v

    Vipul

    11/12/2021, 1:58 PM
    Got an Orion specific question: Just trying to see how can we move from the Prefect Server to Orion and I see it supports the caching but was wondering if there is any plan to offer equivalent of Prefect Server "checkpointing" that ensures that every time a task is successfully run, its return value is written to persistent storage and it is been cached for subsequent run.
    k
    m
    b
    • 4
    • 8
  • c

    Chris Arderne

    11/12/2021, 3:04 PM
    Thanks to the support here I've got Prefect running on GCP with
    VertexRun
    and
    GitHub Storage
    . I'm now trying to get a distributed
    DaskExecutor
    to run using
    dask_cloudprovider.gcp.GCPCluster
    . Using the same Docker image that I already had working with VertexRun, with the Dask dependencies added. I also created a Packer image based on this. It works if I run the Flow locally (with prefect flow run …, so Vertex is bypassed), spins up a Dask cluster and completes successfully. But when I ran it from Prefect Cloud, via Vertex, it provisioned a scheduler which had some errors (failed to restart for crond, nscd, unscd) and then didn't do anything. Aside: after I cancelled the Flow, I had to manually delete this scheduler. VPC is set up to all full access within the network, so shouldn't be anything to do with that, Any ideas? Has anyone got this working well?
    k
    • 2
    • 9
  • t

    Tim Enders

    11/12/2021, 3:41 PM
    How do I access the Orion logger? It doesn't seem to run with the
    prefect.context
    and I can't do
    prefect run
    to get a context.
    m
    • 2
    • 6
  • t

    Tom

    11/12/2021, 5:16 PM
    Hey, i´m trying to load and transform a dataset of images with prefect tasks. Below is a code snippet, first i load images with task
    load_data_task
    . This provides me
    images_train
    and
    images_val
    . And now i want to transform
    images_train
    with task
    rescaling_images_task
    . I need to map over `images_train`d(for each image and label) and then apply to
    recaling_images_task
    . Outside of prefect this works with following code:
    images_train_rescaled = images_train.map(lambda x, y: ((preprocess.rescaling_images() (x, training=True), y))
    But how i can run this in prefect flow?
    preprocess = PreprocessingClass_Images(cfg)
    
    with Flow("Preprocess") as flow:
    
            #1. load images
            images_train, images_val = preprocess.load_data_task()
    
            # 2. rescaling
            #the code below works outside of prefect flow
            images_train_rescaled = images_train.map(lambda x, y: ((preprocess.rescaling_images() (x, training=True), y))
    k
    • 2
    • 5
  • v

    Vamsi Reddy

    11/12/2021, 5:24 PM
    hi everyone, we have been using prefect for a while and experimenting it. we want to setup CI/CD for our flows/automation project. has anyone tried to setup something like this before. we usually use github and CircleCI. we are thinking of having two projects prod and dev . we want to register the flows in our master branch to our prod project and develop branch flows to our dev project. so whenever we push our updated flow code to github it will register the flows to prefect cloud projects accordingly. not sure if this would be a good approach. would like to hear your thoughts on this.
    k
    c
    • 3
    • 3
  • b

    Brett Naul

    11/12/2021, 7:40 PM
    I've noticed that the new(er)
    prefect run --watch
    (and similarly
    watch_flow_run
    ) don't show any logs until my run has finished (at which point they all show up in one deluge), whereas
    prefect run flow --logs
    would stream them continuously. @Michael Adkins is that the intended behavior or might something be amiss? in this case it was about 5 minutes of no logs, haven't tried with something even longer yet
    m
    • 2
    • 4
  • x

    Xinchi He

    11/12/2021, 8:31 PM
    Hey, dear community, I have a quick question on the capabilities to run docker tasks as sidecars on k8s(see the official example here https://docs.prefect.io/orchestration/recipes/k8s_docker_sidecar.html) . For our use case scenario, we are going to have two docker images being pulled and executed one by another. Should we define two flow container entries in the job template (flow-container1, flow-container2)? or just keep it as what it is from the example code? Much appreciated!
    k
    • 2
    • 8
  • s

    Santiago Gonzalez

    11/12/2021, 8:44 PM
    Hey. How can I create a custom implementation of `PrefectSecret`that accept a `Parameter`as an argument? is that possible?
    a
    k
    • 3
    • 36
  • m

    Max Mose

    11/12/2021, 10:47 PM
    Hi, is there any way to configure the number of retries that lazarus goes through before marking the run as failed in prefect cloud? Specifically, we do not want to keep submitting kubernetes jobs if they are unschedulable for any reason. If there’s no way to change the config values for this process, is there a recommended alternative approach?
    k
    • 2
    • 1
  • k

    Kathryn Klarich

    11/12/2021, 11:15 PM
    Is the return order of mapped tasks guaranteed to be the same as the input order?
    k
    • 2
    • 2
  • a

    Aqib Fayyaz

    11/13/2021, 2:44 PM
    Hi, I have prefect agent running on google gke, prefect flow on github, custom modules in docker image deployed on gcr and everything is running fine. i am orchestrating our feature Engineering pipeline using prefect and our datalayer needs to use it which is deployed on same cluster so my question is can i invoke prefect flow using some cli command or python api instead of using prefect cloud ui so that i can directly invoke prefect flow in my datalayer(python code) which will return the features to datalayer?
    c
    k
    • 3
    • 2
  • c

    Cooper Marcus

    11/13/2021, 2:45 PM
    We regularly use run Prefect flows to test code in branches that have open Pull Requests in our GitHub repo. Any pointers on configuring things so that the flow posts updates to the Pull Requests? (Maybe at the start of the flow, a pull requests comment is added, and that commented is updated at the end of the flow? Or perhaps the flow updates a Check on the PR? Not picky, just looking for general pointers on getting Prefect to update GitHub. I did check out https://docs.prefect.io/api/latest/tasks/github.html and none of those tasks are useful to us)
    k
    • 2
    • 2
  • c

    Carlos Paiva

    11/13/2021, 9:25 PM
    Hi, I am starting with my Prefect custom tasks and I'd like to know if someone has a kind of tutorial to push my Task repository into my Prefect server?
    k
    • 2
    • 1
  • m

    Mike Lev

    11/14/2021, 11:01 AM
    hey so thus far we are successfully running flows with our own base img with a size of 1.5 gb on eks with the kubernetes agent…. is there any way to speed up the pod spin up (we are using s3 storage stored as script for flows)
    a
    • 2
    • 5
  • a

    Aqib Fayyaz

    11/14/2021, 3:36 PM
    can we forcefully register the flow, i have not make change to flow but to the functions that are in flow but it does not register the flow when i run the command prefect register, it skips
    a
    k
    • 3
    • 2
  • a

    Aqib Fayyaz

    11/14/2021, 6:04 PM
    import requests
    
    query = """
     mutation {
      create_flow_run(input: { flow_id: "9479ea34-f558-4616-8e64-50c7a508787d" }) {
        id
      }
    }
    """
    
    url = "<https://api.prefect.io>"
    response = <http://requests.post|requests.post>(
        url, json={"query": query}, headers={"authorization": "Bearer your-api-key"}
    )
    print(response.status_code)
    print(response.text)
    I am using above python code to invoke flow in prefect cloud, i have given the value of Api key and changed the flow_id as well but when i run this file i get the following error 200 {"errors":[{"path":["create_flow_run"],"message":"Unauthenticated","extensions":{"code":"UNAUTHENTICATED"}}],"data":{"create_flow_run":null}}
    k
    a
    • 3
    • 8
  • a

    Aqib Fayyaz

    11/15/2021, 8:32 AM
    Did someone tried deploying prefect agent on GKE using skaffold.yaml file?
    a
    • 2
    • 2
  • a

    Adam Everington

    11/15/2021, 8:55 AM
    Morning @Anna Geller! Quick one this morning... can you force a flow to run in the UI when it has a schedule defined within the flow? My flow runs at 8pm each night but for some reason it failed on Friday and gave an error saying Lazarus couldn't reschedule so I want to run it now. I press "Quick Run" and "Start Now" but nothing happens
    a
    • 2
    • 5
  • a

    Adam Everington

    11/15/2021, 10:26 AM
    I get a Failed to load and execute Flow's environment: TypeError("'type' object is not subscriptable") only on my AZ deployment of Prefect... locally it runs absolutely fine?
    a
    • 2
    • 23
  • j

    Joseph Oladokun

    11/15/2021, 11:34 AM
    I’m trying to deploy my flow backend to EC2, anyone has any resources to guide me?
    a
    l
    • 3
    • 11
  • g

    Greg Adams

    11/15/2021, 3:01 PM
    Question about the prefect.io cloud: are there any plans for something like an “agent/compute hosting service”? Devops/infrastructure is kind of my weak point (and therefore my orgnizations’ weak point), so managing a docker instance on a VM is kind of a pain. I’d totally pay for someone to do that for me and I can just focus on the pipeline code.
    j
    k
    • 3
    • 6
  • m

    Martin Goldman

    11/15/2021, 3:47 PM
    Hi all. I’m trying to figure out whether any of the Prefect products would be suitable to help me support a multi-tenant model. What I mean by that is: • I want to have a single orchestrator that’s owned/managed by me. • Each customer would have their own agent(s) which they would self host. • Each customer would have one or more flows, that only they would be able to see or run. I have 2 specific questions about this: • I see that the Cloud product offers RBAC, but I can’t figure out if that actually does what I want. I see you can create custom roles with specific permissions. But is it possible to prevent one user from seeing another user’s flows? • I understand that you could use labels to tell Customer A’s agent to only run Customer A’s flows, for example. But I don’t know if you can actually prevent Customer A from applying Customer B’s label, and then having Customer B’s flow runs go to Customer A’s agent. Is there any mechanism to lock that down?
    k
    • 2
    • 1
  • j

    John Jacoby

    11/15/2021, 3:58 PM
    Hi all. I'm getting an error when I try to use task mapping with multiple return values. Is this possible? This is the error: 'TypeError: Task is not iterable. If your task returns multiple results, pass 'nout' to the task decorator/constructor, or provide a 'Tuple' return-type annotation to your task.' I have done both of the suggested steps and it works for non-mapped tasks, but not on mapped tasks. Is this feature supported?
    k
    • 2
    • 2
  • k

    Kostas Chalikias

    11/15/2021, 4:12 PM
    Some questions about moving off a completely local agent & execution model on Heroku, along with the cloud scheduler, to a more scalable GCP based execution model. Here is what we would like to achieve, with some questions... • Flow descriptions are read directly from Github • We build a single Docker container from our python monorepo which can be used to both resolve flows/task structure and run the tasks • We don't really need parallelisation inside a specific flow although if it's free it wouldn't hurt • However we would like to be able to scale up execution for our flows in general (on GCP), ideally without just increasing the size of a single VM. So either just scale up to as many VMs as flows running at the same time, or using a fixed pool of them. • We don't re-register/restart everything on every release like we do today to be safe that changes are picked up We've been through the docs a few times and have a few questions... • If we declare flows in our codebase, using the github storage, what causes the definition to be re-read or a new flow to be discovered? • What is the right run_config for our flows so they just run as containers with some controllable CPU/memory requirements with reasonable defaults? • What is the right type of agent? Presumably it depends on the choice of run_config. • What is the recommended way to get secrets/config env vars to the flows & running tasks?
    k
    • 2
    • 4
  • t

    Tao Bian

    11/15/2021, 4:28 PM
    Hi, how can I use "prefect.tasks.gcp.storage.GCSUpload" to directly upload a .jpg file?
    k
    • 2
    • 3
  • j

    John Jacoby

    11/15/2021, 4:47 PM
    So I have a bunch of tasks which create and read from files. The downstream tasks in my flow don't really depend on the actual Python return value of the upstream tasks, only that the files have been created. Is there any way to explicitly declare that one task relies on another? Ideally I'd have one parameter for my flow that would be passed to each task. But if I do that out-of-the-box, then using a Dask executor each function will try to run immediately despite the necessary files not being created.
    a
    • 2
    • 3
Powered by Linen
Title
j

John Jacoby

11/15/2021, 4:47 PM
So I have a bunch of tasks which create and read from files. The downstream tasks in my flow don't really depend on the actual Python return value of the upstream tasks, only that the files have been created. Is there any way to explicitly declare that one task relies on another? Ideally I'd have one parameter for my flow that would be passed to each task. But if I do that out-of-the-box, then using a Dask executor each function will try to run immediately despite the necessary files not being created.
a

Austen Bouza

11/15/2021, 4:50 PM
@John Jacoby you can declare these as upstream tasks by declaring them explicitly with the
upstream_tasks
keyword argument, e.g.
with Flow('my_flow') as flow:
    unrelated = unrelated_task()
    other = other_task(upstream_tasks=[unrelated])
This guarantees that
unrelated
finishes first even if
other
doesn’t need any values from it.
:upvote: 2
j

John Jacoby

11/15/2021, 4:52 PM
Thank you! I was looking for something like that in the docs.
Does this work with mapped tasks? During debugging it seems that a task that shouldn't run based on a failed earlier task is still running.
View count: 2