https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • p

    Patrick Tan

    02/18/2022, 4:37 PM
    Hello, I am new to Prefect. I want to structure my code so it can be run as Prefect workflow and optionally normal python program (Not as Prefect workflow). All functions has Task decorator and it can't be called like normal Python function. Please advise.
    k
    a
    3 replies · 3 participants
  • d

    Daniel Nilsen

    02/18/2022, 4:44 PM
    Hi is there any best practice for looping through tasks?
    t1 = task1()
    t2 = task2(t2)
    while condition:
      t3 = task3(a)
      t4 = task4(t3)
      t5 = task5(t4)
    k
    1 reply · 2 participants
  • k

    Kevin Kho

    02/18/2022, 4:46 PM
    Join us on February 24, 2022 at 11AM PST (2PM EST) to learn how to generate end-to-end lineage graphs of your data pipelines using Monte Carlo and Prefect so you can reduce the time to detection and resolution for critical data incidents: https://www.montecarlodata.com/how-to-build-more-reliable-data-pipelines-with-monte-carlo-and-prefect/
    👍 4
    d
    3 replies · 2 participants
  • l

    luther1337

    02/18/2022, 6:08 PM
    hey guys, i'm trying to get prefect running on kubernetes. i've packaged my entire python package into docker container, and i'm able to run the flows in the container just fine locally using
    docker run
    . however, when i deploy the flows to kube and use prefect cloud to trigger a run, i get the following error:
    Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n  ModuleNotFoundError("No module named \'my_package\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
    i'm using GCS as the storage. i'm also wondering if/why i need to use GCS as a storage -- doesn't the execution environment have access to the flows? why do they even need to be pickled? thanks in advance! 🙂
    a
    k
    18 replies · 3 participants
  • f

    Florentino Bexiga

    02/18/2022, 6:44 PM
    hello everyone! any chance that it is possible to use the DockerRun run_config with basic auth credentials?
    k
    16 replies · 2 participants
  • j

    Jacqueline Riley Garrahan

    02/18/2022, 6:52 PM
    Hi 👋, I am using
    prefect.tasks.prefect.create_flow_run
    to kick off some tasks. I've noticed that this isn't returning an id for the flow run as documented and instead returning a
    Task
    object. Any advice on how to access the ids of created runs?
    k
    21 replies · 2 participants
  • c

    Chris Reuter

    02/18/2022, 7:45 PM
    Hi all! 👋 Going live for a fireside chat with @Jeremiah and @Chris White in 15 minutes. We'll be covering: 🕐 The Prefect 1.0 Release Candidate 🌠 Latest Prefect 2.0 (aka Orion) features including k8s support, :dask: support, filters and more (a ton has happened since our last Fireside Chat) ☁️ What's next for 2.0 You can join us on Youtube 📺, and feel free to ask any questions in the chat while you're there!

    https://youtu.be/4hIqYhRf6JY▾

    :kubernetes-party: 1
    🚀 2
    :upvote: 2
    :marvin: 3
    :kubernetes: 1
  • r

    Richard Hughes

    02/18/2022, 8:28 PM
    Hi all - if i want to use a python main.py outside of my prefect.py file what is the best way to implement this type of configuration based on usage and deployment? Is there an example somewhere how this ideally should be constructed?
    k
    4 replies · 2 participants
  • w

    William Grim

    02/18/2022, 8:39 PM
    Has anyone seen this kind of error? Everything was fine in testing with local storage, and now that we’ve pushed to prod (which uses s3 storage), we are seeing this when we run our flows almost immediately:
    Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n AttributeError("Can\'t get attribute \'create_params_file\' on <module \'our_filename.py\' from \'/our_filename.py\'>")')
    The signature of
    create_params_file
    , which is not a task but a method that can be called looks like:
    def create_params_file(base_filename: str, **kwargs) -> str:
    k
    52 replies · 2 participants
  • h

    Henning Holgersen

    02/18/2022, 9:15 PM
    How does your team manage flow storage and run types? I have tried local storage, github and docker, I know there are a few others but they seem peculiar. Personally I’m a big fan of docker, but we are debating the functionality/learning-curve trade off. Are local agents running on VMs stable enough for production use? A lot of developers have their hands full learning dbt, python and git - throwing docker at them too might be a little much. Happy to hear any experiences around this.
    👀 1
    k
    1 reply · 2 participants
  • d

    Dexter Antonio

    02/18/2022, 9:36 PM
    Hi, I’m trying to store the Results from each Task in a Flow on S3, but I am having some trouble with it. When I set the results object to be an S3Result, nothing ends up being stored in S3. I am able to directly write files with the S3Results object, but the Results from a task are not automatically stored there. I have tried to set checkpointing to True, so I don’t think that is the issue. Here is some example code.
    MY_RESULTS = S3Result(bucket='my_bucket_without_s3_prefix',location='my_output_folder')
    prefect.config.flows.checkpointing = True
    !export PREFECT__FLOWS__CHECKPOINTING=true
    with Flow("please work", result=MY_RESULTS) as f:
        t1 = my_task()
    state = f.run()
    !aws s3 ls <s3://my_bucket_name_witohut_s3_prefix/my_output_folder>  # nothing is here
    Is there something obvious, which I am missing?
    k
    14 replies · 2 participants
  • b

    Brian Lorenz

    02/19/2022, 12:27 AM
    Hello! I'm testing out orion and have an issue with the client. When I try to run the sample for python client it gives me an error:
    line 282, in get_profile_context raise MissingContextError("No profile context found.")
    Any suggestions on how to fix this?
    k
    6 replies · 2 participants
  • h

    Heeje Cho

    02/19/2022, 1:07 AM
    hey guys is it possible to use
    create_flow_run
    to create a persistent scheduled flow run? Not a flow that runs only ones at a scheduled time but a flow that runs at intervals?
    k
    4 replies · 2 participants
  • a

    Aric Huang

    02/19/2022, 1:38 AM
    Have a question about expected behavior for
    map
    - With the following sample flow, I was expecting the mapped task
    f
    to run concurrently with
    wait
    because there are no dependencies and
    LocalDaskExecutor
    is being used. However, the behavior I see is that only `wait`'s mapped tasks get executed, so
    f
    is not executed until all the
    wait
    tasks return.
    from prefect import Flow, task
    import time
    from prefect.executors import LocalDaskExecutor
    
    @task
    def f(x):
        return x*2
    
    @task
    def wait(x):
        time.sleep(x)
    
    with Flow("test") as flow:
        a = list(range(4))
        wait.map(a)
        result = f.map(a)
    
    flow.executor = LocalDaskExecutor()
    k
    8 replies · 2 participants
  • k

    Kivanc Yuksel

    02/19/2022, 3:18 PM
    Hi! I have some long running tasks that I cache their output with
    target
    , however, from time to time I want to re-run these tasks without manually deleting target files. Is there a way to "force" re-run for such tasks?
    a
    e
    11 replies · 3 participants
  • d

    Dexter Antonio

    02/19/2022, 5:58 PM
    I currently have a prefect flow that operates on a single row of a pandas dataframe. Is there a straighforward way to map this flow to all of the rows in a pandas dataframe? In other words, can I create a flow and then map it? If I cannot map each row of a dataframe to a flow, is there a straightforward way of nesting different tasks into each other and then mapping that “super” task to a series of inputs?
    k
    10 replies · 2 participants
  • o

    Omar Sultan

    02/19/2022, 8:55 PM
    Hello, I was wondering if there was a way to prevent a scheduled flow that is scheduled to run every 30 mins, to not start if the previous run has not finished. Any ideas how to do that?
    a
    2 replies · 2 participants
  • s

    Samay Kapadia

    02/20/2022, 2:35 PM
    I still can’t
    pip install prefect[azure]
    on my m1 mac 😞
    a
    k
    7 replies · 3 participants
  • b

    Brian Lorenz

    02/21/2022, 1:09 AM
    How do you stop a deployment with a scheduled interval?
    k
    m
    5 replies · 3 participants
  • m

    Max Lei

    02/21/2022, 4:39 AM
    If I want to run my flows on the ECS fargate, do I setup a DaskExecutor with Fargate with a cluster class? Is it fine if I have an agent local and not using the ECS agent?
    k
    11 replies · 2 participants
  • a

    Antonio Manuel BR

    02/21/2022, 7:47 AM
    Hello, does it make sense to distribute a Prefect task using Dask (e.g. predict a large dataframe with a ML model), in a Prefect Flow that already uses a remote DaskExecutor?
    a
    k
    2 replies · 3 participants
  • g

    Guillaume Latour

    02/21/2022, 9:20 AM
    Hello everyone, I see on this issue (https://github.com/PrefectHQ/prefect/issues/1545) that in 2019, there was no easy way to retrieve logs from distributed dask workers. Is there any update that I missed? Have you found a new way to deal with this? Is the creation of a service still the recommended way to achieve log retrieval?
    🙌 1
    a
    2 replies · 2 participants
  • m

    Michael Hadorn

    02/21/2022, 11:15 AM
    Hi there I'm not able to let orion run with docker run. (more infos in the threads)
    a
    m
    49 replies · 3 participants
  • d

    Dotan Asselmann

    02/21/2022, 12:09 PM
    Hey! How can i use GraphQL mutation to delete prefect flow run logs by flow run id? an example would be appreciated!
    a
    4 replies · 2 participants
  • i

    iñigo

    02/21/2022, 12:15 PM
    Hello, I'm trying to do some sort of switch scenario where depending on an input parameter it will get data from a DB and get a DataFrame and then go to a common task to transform data and so on. I've attache a description image
    ✅ 1
    a
    3 replies · 2 participants
  • l

    Lucas Hosoya

    02/21/2022, 12:54 PM
    Hi, I'm trying to get logs from the GraphQL API but there is a limitation in the query. Is there a way to paginate the query so I can get all of the content?
    a
    k
    10 replies · 3 participants
  • a

    Arnaldo Russo

    02/21/2022, 2:02 PM
    Hi there! Anyone could explain where I set the 'config', while running SatartFlowRun ? I'm using with
    new_flow_context=prefect.context.get('config')
    k
    10 replies · 2 participants
  • t

    Tomek Florek

    02/21/2022, 2:29 PM
    Hey guys 🙂 Got a question on flow scheduling. I’m using the basic IntervalClock for my flows, with all of them being started at pretty much the same time, which worked fine until now. The numbers of flows increased to 30+ and they started stalling, never finishing. It makes sense since we’re querying the same DBs in them and it’s all run on a single EC2. I’d like to adjust the scheduling, so that they’re starting in small groups, few mins apart. My question is - what’s the best way to do that? First thought is CronSchedule, but as the number of flows starts growing into 100's maintaining those individual schedules might be problematic. Is there another way?
    ✅ 1
    k
    a
    4 replies · 3 participants
  • m

    Marwan Sarieddine

    02/21/2022, 3:19 PM
    Hi folks, since last week we have been encountering an issue with prefect cloud, version locking and heartbeat failures - more details in the thread.
    a
    18 replies · 2 participants
  • a

    Aqib Fayyaz

    02/21/2022, 3:28 PM
    Hi, i have kind of silly question, if i want to run agent, flow and server on same gke cluster can i have local agent instead of kubernetes agent?
    a
    49 replies · 2 participants
Powered by Linen
Title
a

Aqib Fayyaz

02/21/2022, 3:28 PM
Hi, i have kind of silly question, if i want to run agent, flow and server on same gke cluster can i have local agent instead of kubernetes agent?
https://github.com/flavienbwk/prefect-docker-compose i am trying above example using docker-compose and locally it works so i want to have same behaviour on gke
a

Anna Geller

02/21/2022, 4:14 PM
I wouldn’t recommend it as you will likely face issues when you need to scale up or redeploy some components. For Server deployment on Kubernetes, I would recommend the helm chart. But @Aqib Fayyaz I remember we went together through both: • setting up a
KubernetesAgent
on GKE • as well as setting up Server with Helm on GKE and I remember we managed to do it (both), right? did something happen with your setup and you have to start from scratch?
a

Aqib Fayyaz

02/21/2022, 4:15 PM
yeah @Anna Geller i remember all of that worked but now its my job requirements to have docker-compose set up and working and than same way deploy it on gke as it works on docker-compose
but the main thing is this going to work?
local agent deployed on gke and server on same cluster instead of kubernetes agent
our prefect code will also be on same cluster
a

Anna Geller

02/21/2022, 4:21 PM
Again, I wouldn’t recommend that since even if this works, you will 100% face issues with scale - I recommend using either Prefect Cloud or when you want to self host, then Helm Chart is the recommended setting for Kubernetes deployments. If you have very small workloads that fit into a single machine, you can deploy a single VM and self host Server using docker-compose. Docker-compose is meant for a single machine container deployments, not for something to be run on a Kubernetes cluster.
a

Aqib Fayyaz

02/21/2022, 4:26 PM
ok got it, but the thing is half of it has been deployed on gke our prefect code that runs the pipeline is already deployed on gke and it works now i only need to deploy server using this approch so that i can run the deployed pipeline when we want not that it should run automatically when deployed for the first time.
a

Anna Geller

02/21/2022, 4:31 PM
My recommendation is as follows: • if you want to use docker-compose, deploy your Server on a single VM, not GKE • if you want to deploy Server on a GKE Kubernetes cluster, use helm chart, not docker-compose.
✅ 1
a

Aqib Fayyaz

02/22/2022, 11:29 AM
Hi @Anna Geller so now i am using helm for server deployment and i have deployed it on gke following this https://github.com/PrefectHQ/server/tree/master/helm/prefect-server and this awesome video

https://www.youtube.com/watch?v=EwsMecjSYEU&amp;t=2792s▾

.
we are using google file store as shared volume mounted on vm instance for all our gke services and now the main thing is our prefect pipeline also need to access that shared volume for storing the results and i am confused how can we do that. for all other services we defined shared volume in their manifest files like in attached image.
and where should i store the flow and its dependencies so that it can access the shared volume
a

Anna Geller

02/22/2022, 11:42 AM
You can store both your flow and results in GCS (mounting cloud block storage volumes is more involved and I wouldn't do it unless you're a Kubernetes pro, especially given that you need it for object storage and GCS is made for that) as long as you have a service account permissions in your cluster, your flow run pods should be able to interact with GCS
but I don't fully understand why you go through the entire process again, we already did that 2 times - once when you were setting GKE
KubernetesAgent
with Prefect Cloud and once when you were setting up Server on GKE with helm chart and I remember you got it working
a

Aqib Fayyaz

02/22/2022, 11:45 AM
yes even now the server is up on gke using helm chart the only thing which is added is shared volumes and i need to access them in my flow
i have run the flow as service on gke and mounted shared volume in it and it woked now the thing is this server part added and we want to orchestrate the flow using server on gke because without server or cloud we cannot trigger the flow when needed but it runs automatically when deployed on gke
and i can get any permission i want
a

Anna Geller

02/22/2022, 11:52 AM
you would need a persistent volume to mount a drive - check out those docs: https://cloud.google.com/kubernetes-engine/docs/concepts/persistent-volumes#persistentvolumeclaims that's correct, you need either Prefect Cloud or Server backend to run flows on schedule on GKE
a

Aqib Fayyaz

02/22/2022, 11:53 AM
i have them already
for all the other services and it works
now the quesion is how can i use it for our flow i mean how flow can access the volume and where flow should be stroed so that it can acces this
a

Anna Geller

02/22/2022, 12:04 PM
afaik you can't use persistent volume as flow storage, but you probably may use it for results if you specify the path. Again, I would recommend using GCS rather than persistent volume since you need to just store objects (both flow storage and results) and GCS is object storage, while persistent volume is block storage and it's used more for stateful applications like database API backend
a

Aqib Fayyaz

02/22/2022, 12:10 PM
Exactly i don't want to use persistent volume for flow but for result and this is what i want to know where the flow should be stored so that it can access the persistent volume using path specified in flow for storing the results in persistent volume
and i need to use persist volume for storing result because all other services are using it and they need the result of prefect pipeline from persistent volume for further work
a

Anna Geller

02/22/2022, 12:19 PM
You can decide where you store it, there are no restrictions from the Prefect side.
a

Aqib Fayyaz

02/22/2022, 12:29 PM
Can you please tell me how things work like when flow is stored on docker file on gcr and server and agent on gke and when we run the flow from server how things work i mean who gets the flow and where it is run
a

Anna Geller

02/22/2022, 12:32 PM
This thread provides a detailed explanation
a

Aqib Fayyaz

02/22/2022, 12:50 PM
can i place my flow on vm instance when server and agents are on gke?
a

Anna Geller

02/22/2022, 12:54 PM
You are using Kubernetes agent right? If so, Prefect deploys your flow runs as Kubernetes jobs and those jobs must be able to pull your flow. So your Kubernetes job template would be the right way to configure that. But you need to use one of the existing storage mechanisms. If you really don't wanna use GCS, you may try using Local storage and store the flow on this PV, but it's at your own risk, I would really recommend using GCS for that (it's more reliable, scalable and even cheaper)
a

Aqib Fayyaz

02/22/2022, 12:58 PM
I really appreciate the great recomendation but i have only option of pv as other services need to get the result of flow and they are already looking into pv for that
a

Anna Geller

02/22/2022, 12:59 PM
I was speaking of Storage, not results - for results you can use whatever stateful mechanism you want (provided it's configured properly)
✅ 1
a

Aqib Fayyaz

02/22/2022, 1:14 PM
hmm ok so i will store the flow on gcp than how can i access the pv in flow got any idea?
a

Anna Geller

02/22/2022, 1:24 PM
When you specify PV claim, you specify the mount path and you could use the same path for your flow results in theory. Check out this for more info But again, I would strongly encourage you to use GCS for that instead - your custom applications can use the same storage bucket and paths to retrieve the flow results in the same way you would do it with a PV, there is really no difference apart from the fact that GCS is significantly less complex and more reliable/better fit for your use case. Is that you or someone else I need to persuade to GCS? 🙂 And if you still need to use PV, do you have some DevOps in your team who can help you with that? This is hard to support via Slack. Not sure if you saw that but we do provide a paid support for such infrastructure issues
a

Aqib Fayyaz

02/22/2022, 3:32 PM
@Open AIMP
@Anna Geller i have one last question so i deployed my flow on gke as service and inside docker file i gave the command to run the flow soon it is deployed on gke as service
CMD  ["python3", "/usr/app/feat_post_flow_local.py"]
and it works i mean it runs the flow and flow does its job and sends the result to shared storage as well. Server is also deployed on gke using helm chart so my question is can this server interact with the flow stored on gke as servcie?
i know for this i need to register the flow with server but for that i also need to tell it where the storage is but i did not find any option for kubernetes as storage
a

Anna Geller

02/23/2022, 9:41 AM
Exactly, if you want to run a flow on Server backend, you need to register (and probably also schedule) your flow. Prefect doesn't support running flows as a long running service.
You could schedule this flow and run it even forever, but what you do right now is just executing a local script which is not tracked in the backend and thus, it can't communicate via API. To communicate with the backend, flow must be registered
💯 1
a

Aqib Fayyaz

02/23/2022, 9:45 AM
hmm what if i use docker storage for my flow in this way my flow will be able to run both as service (so that i should be able to mount pvc) and also i should be able to register the flow with server?
a

Anna Geller

02/23/2022, 9:46 AM
I don't know what you try to do. Can you explain the problem you try to solve?
a

Aqib Fayyaz

02/23/2022, 9:47 AM
ok lets get it straight i just need to mount file store instance to my flow no matter whever my flow run it just be able to communicate with with filestore instance (that is used as shared volume for all other services that we have on gke)
a

Anna Geller

02/23/2022, 9:52 AM
To access a file from a pod, you don't need to run flow as a service, you need to mount the PV to the pod, as we discussed before. There are some ways to do it, I would ask your DevOps folks to help you set this up and we also provide professional services you can book for such infrastructure issues. From Prefect perspective, you can set it on your Kubernetes job template, that's all I know and can help with tbh
✅ 1
a

Aqib Fayyaz

02/23/2022, 9:59 AM
ok Thank you so much for great help.
ok one more thing i had flow previously stored on github and custom modules on docker file now i want to store flow on docker i mean docker as storage for my flow and all custom modules so can you please provide some useful link where to get started for that.
a

Anna Geller

02/23/2022, 10:05 AM
Sure, here is one example with AWS ECR, but you can use a similar logic with GCP GCR
a

Aqib Fayyaz

02/23/2022, 11:20 AM
incase of aws account id what i need to use for gcp?
a

Anna Geller

02/23/2022, 11:22 AM
your GCR registry url
✅ 1
a

Aqib Fayyaz

02/23/2022, 11:25 AM
and this is the docker file https://github.com/anna-geller/packaging-prefect-flows/blob/master/Dockerfile
a

Anna Geller

02/23/2022, 11:45 AM
yup, correct 🙂
View count: 2