https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • c

    Carlos Paiva

    12/01/2021, 4:27 PM
    Hi all, Working on a simple Flow but my Tasks keep stuck on "Pending”. Logs say
    Failed to load and execute Flow's environment: ValueError('Flow is not contained in this Storage')
    My setup is on a AWS EC2 machine and I am running the Flow with a Docker Agent, as per below.
    from prefect import Parameter, Flow, task
    from prefect.storage import GitHub
    from prefect.run_configs.docker import DockerRun
    
    
    @task(log_stdout=True)
    def test(e):
        return e.get("body")
    
    
    storage = GitHub(repo=repo,
                path="flows/flow.py",
                access_token_secret="GITHUB_ACCESS_TOKEN")
    
    run_config = DockerRun(labels=["L1", "L2", "L3"],
                           env={"EXTRA_PIP_PACKAGES": "zcrmsdk zohocrm-prefect-tasks"})
    
    with Flow(name="Name", storage=storage, run_config=run_config) as flow:
        # Pipeline parameters
        event = Parameter('event', required=True)
        test(event)
    Any idea about what I am doing wrong?
    k
    11 replies · 2 participants
  • t

    Tom Shaffner

    12/01/2021, 4:37 PM
    Short version: Is there a way to set retries to use exponential backoff? Long version: The documentation at https://docs.prefect.io/core/tutorial/04-handling-failure.html has retry options at set intervals, but network issues (often the cause of my failures) generally work better with exponential backoff (https://dzone.com/articles/understanding-retry-pattern-with-exponential-back). Is there a way to do this currently?
    k
    a
    3 replies · 3 participants
  • a

    Aleksandr Liadov

    12/01/2021, 4:38 PM
    Hi guys, My problem is following: I’d like to pass the logs (standard python logger) from the submodules to prefect logger ( so it can be displayed on the server). I understood, that it can be done with
    run config
    in flow (I do the local run with the local agent):
    run_config=UniversalRun(env={
        "PREFECT__LOGGING__EXTRA_LOGGERS": '["lib1", "lib2"]',
        "PREFECT__LOGGING__LEVEL": "DEBUG",
    })
    I can ONLY observe
    info
    level from my libs even if I try to switch on
    debug
    level! (However the prefect logger shows the information related to
    CloudFlowRunner
    from
    debug
    level without any problem). What do I wrong?
    t
    k
    20 replies · 3 participants
  • j

    Jason Motley

    12/01/2021, 5:07 PM
    Can you manually specify an upstream task tied to a renamed task? Example, if
    @task(name = "Renamed Transform", log_stdout=True, checkpoint=False)
    , can I set upstream_task = ["Renamed Transform"]) in a downstream task?
    k
    2 replies · 2 participants
  • t

    Thomas Furmston

    12/01/2021, 6:59 PM
    Hi
  • t

    Thomas Furmston

    12/01/2021, 6:59 PM
    I have a regarding resuming failed flows. I hope you can help me.
  • t

    Thomas Furmston

    12/01/2021, 7:00 PM
    Suppose I have a flow that consists of 4 tasks that run run one after the other, e.g.,
    A -> B -> C -> D
    . Now suppose I run the flow and tasks
    A
    and
    B
    complete, but
    C
    fails because of a bug. I therefore want to fix the code in task C and redeploy the flow.
    k
    16 replies · 2 participants
  • t

    Thomas Furmston

    12/01/2021, 7:00 PM
    Tasks
    A
    &
    B
    are expensive to run, so I would rather not re-run them from scratch. Is it possible to re-run the flow with the newly deployed code from task
    C
    onwards?
  • t

    Thomas Furmston

    12/01/2021, 7:01 PM
    For some reason I was expecting the answer to be obvious, but now that I try it I can't seem to find a solution
  • t

    Tilak Maddy

    12/01/2021, 8:48 PM
    Hey y'all I just wanted to say that today I was getting '*Unauthorized. Invalid Prefect Cloud API key.*' when I ran
    prefect auth login --key <MY_KEY>
    even though my key was valid and authorized. I created so many keys later on still didn't work ....and in the end it turned out that I had already logged in with another key couple days ago. So i had to run
    prefect auth logout
    before logging in with the new key .
    k
    3 replies · 2 participants
  • t

    Tom Shaffner

    12/01/2021, 8:55 PM
    I feel like this is a Noob question but I can't find an answer. Is there a way to make prefect server restart automatically after reboots? The documentation mentions using supervisord for agents on this, but there don't seem to be any equivalent instructions for server. The closest I found was another thread on here that seems to use
    prefect server config
    to generate a docker compose file, and I'm partway down the path of trying to set that up to auto-start (still working through https://github.com/flavienbwk/prefect-docker-compose, which was linked in it), but so far it's not working and it seems like there should be a simpler answer here. Docker has restart policies already (https://docs.docker.com/config/containers/start-containers-automatically/), is there not some flag I can pass to
    prefect server start
    to use them?
    a
    k
    22 replies · 3 participants
  • j

    Jason Motley

    12/01/2021, 9:37 PM
    What's the best way to see the first 10 rows of data I've transformed in prefect without running in the cloud? I can manually embed my credentials temporarily and want to do something like this and see the result in the terminal:
    flow = Flow("myflow")
    with flow:
        df = extract(stuff)
    print(df.head(10)) # This does not work
    k
    20 replies · 2 participants
  • v

    volkerjaenisch

    12/01/2021, 10:54 PM
    Evaluation of Orion (Alpha5 as well as the Dev-Version from GH lead to the same findings) We have certain questions concerning Prefect Orion (Prefect Alpha5) • There seems to be no .result() to flows any more. So how does one get the result of a subflow back? • (Sub)-Flow input parameters have to be JSON serializable. Therefore any object/instance parameters are no longer possible. Not even a simple numpy array would work. Why not cloudpickle? • Failed Tasks are not displayed in Radar. • There is no start/stop/restart functionality for Flows. Will there be checkpointing in Orion? • Each parallelized task instance produces a new box in the radar. In Prefect this was much better visualized. • The radar is an interesting new view on the graph. It can be of use. But what we really would like to have is the possibility to allign a certain flow/task at a given coordinate. This is most interesting if one have production code that runs the same flows 24/7 for the next years. The graphs is then frozen.

    https://upload.wikimedia.org/wikipedia/commons/8/89/Kozloduy_Nuclear_Power_Plant_-_Control_Room_of_Unit_5.jpg▾

    • There seems to be no way in Orion to store/retrieve self serialized results any more? The Docu states that results were stored in the SQLiteDB. Is this really the future approach? Usual payloads are in the GB range. • Even for trivial examples (inc/dec example with a flow of flows) with local parallelism (DaskExecutor) we get concurrent access to the SQLiteDB resulting in failures. These faillures seem to heal themselves (retries were made that conclude positive). These collisions are not easily reproduced which hints at a runtime (async probably) problem. • In Prefect there were two APIs to address your problem. Functional and Imperative (Baseclasses). The imperative API is not covered with a single word in the Orion documentation. We would really like to have an imperative API in Orion. What are your plans for the APIs in Orion? Sorry for so much criticism. Orion is quite impressive. The Engine does a good job in auto-parallel. With a few decorators one can optimize its code with no cost. But Orion is IMHO quite far from production. Volker
    👀 1
    k
    m
    +3
    78 replies · 6 participants
  • b

    brian

    12/01/2021, 11:03 PM
    Question about github storage
    k
    8 replies · 2 participants
  • w

    Wilhelm Su

    12/01/2021, 11:17 PM
    Hello everyone, in prefect, what's the idiom to flatten back the results of multiple parallel tasks from apply_map()? I've tried various combinations of syntax and unfortunately I can't seem to get it to work.
    k
    7 replies · 2 participants
  • a

    Arun Giridharan

    12/02/2021, 1:04 AM
    QQ how can I assert the state of a dependent flow is successful? I'm running the flow using StartFlowRun
    k
    6 replies · 2 participants
  • u

    张强

    12/02/2021, 6:20 AM
    Does prefect have a drag-and-drop-based UI,? I want to use it for network operators. They can’t code.
    a
    6 replies · 2 participants
  • b

    Bruno Murino

    12/02/2021, 9:43 AM
    Hi everyone — I’m struggling understanding how prefect behaves when I redeploy stuff. The way it works now is that 1 git repo = 1 prefect Project containing multiple flows. We have a dockerfile whose endpoint simply registers all the flows and then starts a LocalAgent. This is deployed as an ECS Service. When we deploy, we simply kill the existing service and start a new one with a new docker image — this means there is a little of “downtime” in terms of flow schedules and etc, like: • Time is 4.59 • We start a new deployment, meaning that at 4.59 the service is killed • There is a job scheduled to run at 5.00. • The service is only back up at 5.01. • This mean the 5.00 run will be fully missed. (please correct me if I’m wrong) My question, however, is about a second scenario, where we: • start run at 4.58, run takes about 5 minutes to finish, • We start new deployment at 4.59 — meaning there is a run in-progress • What happens with the flow run, if the entire service (local agent + code execution environment) is killed? • at 5.01 when the new deployment finishes, will prefect know to resume that flow run? how would it work? Reason I’m asking is because I plan on changing our deployment strategy to be blue/green, but I’m not sure how prefect will cope with a flow run being killed midway and etc Sorry if this is confusing! I appreciate any help.
    a
    57 replies · 2 participants
  • t

    Tilak Maddy

    12/02/2021, 10:29 AM
    Hey everyone I'm confused with the meanings of various flow runs - Failed, Cancelled, Finished, Skipped. Since I am migrating from Aiflow where we had the concept of "pausing" and "unpausing" a DAG, I'd like to know how we can replicate that behaviour over here. Ideally when I hit pause, all I want to do is stop rescheduling more flows until I hit "unpause". Although the tasks in the current flow should continue to run. How could I achieve that with Prefect ? (By the way I'm loving prefect with versioning, parameters, prefect cloud ~ all that 🤟)
    a
    4 replies · 2 participants
  • z

    Zheng Xie

    12/02/2021, 10:52 AM
    Can I get answer to this problem related to build docker image please? I have put it in Stackoverflow, hoping to help with the others to search. When using docker as storage, in the call as below:
    prefect register --project yourproject -p yourflow.py
    It seems that perfect creates a
    tmp{randomNumber}
    folder and puts healthcheck.py and a Dockerfile, which participate the docker build. But I accidentally deleted the
    tmp
    folder, even after I recovered it, it still throws the following error. What happened was that Prefect expected a
    tmp
    folder with a new random_number as the suffix. How can I recover from it? https://stackoverflow.com/questions/70198040/error-in-prefect-when-build-docker-image-what-triggers-prefect-to-create-the-tm
    a
    18 replies · 2 participants
  • c

    Chris L.

    12/02/2021, 11:01 AM
    Hello Prefect community, I'm getting a strange
    ClientError
    when using Prefect Cloud for a mapped tasks. The inputs to each task are identical, but some mapped tasks are successful while a few raise
    DeprecationWarning: Using 'method_whitelist' with Retry is deprecated and will be removed in v2.0. Use 'allowed_methods' instead
    (see screenshot for trace). Has anybody experienced something similar? I'm using urllib3 v1.26.7 and prefect v0.15.9
    a
    16 replies · 2 participants
  • a

    Aqib Fayyaz

    12/02/2021, 11:33 AM
    Hi, i am trying to install prefect server on gke using helm. instead of doing this
    helm repo add prefecthq <https://prefecthq.github.io/server/>
    i have cloned the repo
    <https://github.com/PrefectHQ/server/tree/master/helm/prefect-server>
    and so that i have this helm chart prefect-server locally and when i do helm install demo1 prefect-server i get the following error
    Error: INSTALLATION FAILED: found in Chart.yaml, but missing in charts/ directory: postgresql
    a
    4 replies · 2 participants
  • t

    Tilak Maddy

    12/02/2021, 11:53 AM
    My question is regarding Storage. How does anything change if I add
    flow.storage=Local()
    to my Hello world flow ? Everything seems to be the same
    a
    27 replies · 2 participants
  • p

    Pinakpani Mukherjee

    12/02/2021, 12:07 PM
    Can someone help me with some graphQL stuff. I need to find all the flows that have a task running with a certain keyword.
    a
    4 replies · 2 participants
  • j

    jcozar

    12/02/2021, 1:01 PM
    Hi all! I have a question about the Schedulers… If I use two clocks, and both of them trigger at the same time, is my flow executed just once? Thank you!
    a
    k
    15 replies · 3 participants
  • a

    Arnon Kimhi

    12/02/2021, 1:25 PM
    Hey, Is there a way where I can get tasks results in flow state handlers?
  • a

    Arnon Kimhi

    12/02/2021, 1:27 PM
    I am trying
    if new_state.is_successful():
            flow_run_id = prefect.context.flow_run_id
            flow_run_view = FlowRunView.from_flow_run_id(flow_run_id)
            b_task_run = flow_run_view.get_task_run("b-1")
    a
    10 replies · 2 participants
  • a

    Arnon Kimhi

    12/02/2021, 1:28 PM
    but this wouldnt work when I run it locally, is there other trick?
  • j

    Jacob Blanco

    12/02/2021, 1:30 PM
    Hey folks, we ran into some issues where task concurrency limits were being gobbled up by “ghost” tasks that were still marked running for Flows that were cancelled. We are able to retrieve those with the following GraphQL query:
    {
      task_run(
        where: {_and: [{state: {_eq: "Running"}}, {flow_run: {state: {_neq: "Running"}}}]}
      ) {
        id
        start_time
      }
    }
    Maybe unrelated but for tasks which have a lot of runs, the dashboard seems to hit some kind of query limit and crashes when you try to see the list of Task Runs. The stuck ghost tasks don’t always have a lot of task runs related to them.
    a
    3 replies · 2 participants
  • t

    Tom Klein

    12/02/2021, 2:20 PM
    Hey all, question here about data-related tasks that are not python-based, e.g. written in NodeJS - today we have these wrapped as Dockers (using a Dockerfile we made) and deployed using our general CI/CD system to K8s as scheduled jobs I started toying with the idea of breaking them into components and migrating them to
    Prefect
    for the various benefits it could offer i saw that there's support for "docker" and "kubernetes" tasks but since my DevOps knowledge is kind of limited i was wondering if (by any chance) there's some examples of that kind of usage laying around somewhere or if you could at least give your thoughts about whether what I'm thinking even makes sense?
    a
    a
    +1
    30 replies · 4 participants
Powered by Linen
Title
t

Tom Klein

12/02/2021, 2:20 PM
Hey all, question here about data-related tasks that are not python-based, e.g. written in NodeJS - today we have these wrapped as Dockers (using a Dockerfile we made) and deployed using our general CI/CD system to K8s as scheduled jobs I started toying with the idea of breaking them into components and migrating them to
Prefect
for the various benefits it could offer i saw that there's support for "docker" and "kubernetes" tasks but since my DevOps knowledge is kind of limited i was wondering if (by any chance) there's some examples of that kind of usage laying around somewhere or if you could at least give your thoughts about whether what I'm thinking even makes sense?
a

alex

12/02/2021, 2:25 PM
Hey Tom, great question! Prefect does have the ability to run tasks that interact with Docker images and containers. There’s a collection of Docker tasks in the Prefect Task library. The documentation for that is here: https://docs.prefect.io/api/latest/tasks/docker.html. In particular you could use the
StartContainer
task to run a container from a pre-built Docker image.
:upvote: 1
a

Anna Geller

12/02/2021, 2:26 PM
@Tom Klein adding to Alex’s answer: for Kubernetes, there is a
RunNamespacedJob
task and here are two examples of how it can be used: https://github.com/anna-geller/packaging-prefect-flows/tree/master/flows_task_library
here is an example for the Docker tasks Alex mentioned: https://docs.prefect.io/orchestration/recipes/k8s_docker_sidecar.html
t

Tom Klein

12/02/2021, 2:39 PM
Hmm, if I understand correctly - assuming our (Docker) images are already being built and stored automatically (using our CI/CD) in the image registry, we need to use this, no? (We are in-fact using ECR for all our "official" service deployments in the company) https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_no_build/docker_script_kubernetes_run_custom_ecr_image.py
basically all i'm trying to do is move the orchestration (scheduling, execution context within a larger DAG, retries, etc.) from the built-in K8s/helm/whatever one to
Prefect
but i'm fine with the build-process being handled by a CI/CD system unrelated to Docker and wired into our GitHub account
but then i'm left with the problem of knowing where the image is, exactly, in ECR, right? So, from my understanding of the "philosophy" here -- i would need to add (or rather, ask our DevOps to add) a step in our CI/CD system that reflects the image path/id/whatever back to Prefect, somehow? e.g. to the K/V store? or am i thinking about it all wrong?
k

Kevin Kho

12/02/2021, 3:02 PM
I think you would know the location ahead of time because when you push it, you can specify the name and tag of that image in ECR
t

Tom Klein

12/02/2021, 3:07 PM
@Kevin Kho but it's done automatically in our CI/CD system (
CodeFresh
) which i'm not really involved in, i.e. - our DevOps (manually) wire some github repo to be built on every commit, and automatically be pushed into ECR upon success They then (manually) create (upon request from us) some K8s job (or long-running service, or whatever) based on said image (this is done once per job/service/etc. , as part of its initial setup) and then when we "deploy" (using our deploy dashboard) - the latest image is taken from ECR and pushed to k8s using their DevOps magic at which point is the "ahead of time" you're referring to, relative to this process?
k

Kevin Kho

12/02/2021, 3:09 PM
When DevOps pushes the image to ECR, they have to push it under a certain registry, image name, and tag so you just need to find the location they are pushing to right?
But yeah if you really don,t KV store is certainly an option to retrieve the image address
t

Tom Klein

12/02/2021, 3:10 PM
@Kevin Kho right - but it's done automatically, so - we should "intervene" in the CI/CD process, add a 4th step (first three being -
git clone
,
docker build
,
push to ECR
) that somehow "sends this" (this being the image path) somewhere that Prefect will know of, right?
there must be some best-practice here that i'm missing - i.e. - i would have had the same problem if i was using Airflow, no?
k

Kevin Kho

12/02/2021, 3:12 PM
I think even if it’s done automatically, that push to ECR has the destination hardcoded? But yeah if not, then you can send to the KV store. You would need your CI/CD environment to be authenticated and then you can use it.
t

Tom Klein

12/02/2021, 3:13 PM
@Kevin Kho i have no idea what's the current naming policy our DevOps applies to images, for us (engineers) it's transparent, we push stuff to git and then eventually it gets built and later we can deploy it 🙂 the wiring is behind the scenes
k

Kevin Kho

12/02/2021, 3:14 PM
Uhh I think the best practice might be if you had more control over the image building and uploading, but I understand that it’s another team in your organization. Persisting that somewhere really seems like the only option here to automate it
💯 1
t

Tom Klein

12/02/2021, 3:15 PM
we (data team) can always do it ourselves but i have a hunch that it's gonna be a bad idea to kind of split our CI/CD system in half into two separate ones
even with control over image building i'm not sure i fully understand the best practice. how generally do big companies who use Airflow (or whatever) combine their CI/CD with an orchestration system which is distinct from it? obviously at least one of them needs to know about the other, the question is if you're supposed to make the CI/CD gnostic of the orchestration layer or if you're supposed to somehow make the orchestration layer gnostic of the CI/CD process (or whatever) or is it sort of like two separate universes and you have a CI/CD for software and a parallel CI/CD process for data-tasks?
k

Kevin Kho

12/02/2021, 3:22 PM
I think so too because a simpler way to state this (ignoring CI/CD) is “there is some team that does some work and I need to know the result”, which just feels wrong. For CI/CD you can check this thread on how people deploy their flows with CI/CD
🙏 1
t

Tom Klein

12/02/2021, 3:30 PM
@Kevin Kho this thread seems to be about a CI/CD process for the flows themselves. e.g. - i wrote a new flow, how do i now push it Prefect as part of an automatic process that maybe includes testing, code-reviews, etc. my question is about a slightly different level of operations - i'm fine with having no ci/cd around flow deployment (for now) but i still need some good way of exeucting non-python code as part of DAGs, with the tasks being (for example) a custom docker image
i.e. - i'm fine with ALL of these steps being manual (for now). it still leaves me with the problem of executing arbitrary images stored on ECR as part of prefect DAGS (flows) this step (running a known pre-built image) cannot be manual, because I myself don't have access or visibility to ECR and i'm not going to probe into (and surely not ask our devops) it every time i want to use some image as a task i suppose that for simplicity's sake it could be replaced with an HTTP invocation of the image instead of direct K8s invocation but i'm not sure of all of the implications of that... for example, it would mean the image would have to act as a service rather than as a stand-alone job
a

Anna Geller

12/02/2021, 3:59 PM
@Tom Klein if the image tag is generated during CI/CD, then you know this information at the time when CI/CD runs. You could then use the KV store, as Kevin suggested, to store this repository URL as a key-value pair that would be used by your flow. This way, you always get the latest version generated by CI. You would need to ask your DevOps to include this in your CI yaml file:
steps:
      - run: pip install prefect
      - run: prefect auth login --key $PREFECT_API_KEY
      - run: export PREFECT__CLOUD__USE_LOCAL_SECRETS=false && prefect kv set YOUR_FLOW_ECR_IMAGE_URL "<http://XXXX.dkr.ecr.us-east-1.amazonaws.com/image_name:tag|XXXX.dkr.ecr.us-east-1.amazonaws.com/image_name:tag>"
🙏 1
the key and value are just examples
t

Tom Klein

12/02/2021, 6:08 PM
@Anna Geller thanks that's pretty clear. My question is then just if this is considered "best practice" or not 😁 Based on your experience is it more common for data orchestration to have its own separate ci/CD or image building process, distinct from that of the rest of the product? I guess it's kind of unorthodox to have data processes which aren't in python to begin with but still..
Prefect itself executes python tasks as docker images no? (still trying to wrap my head around the architecture)
a

Anna Geller

12/02/2021, 6:11 PM
@Tom Klein absolutely! You data pipelines are important and they deserve having their own CI/CD process! 🙂 given that you don’t know the image tags until the CI step, there is really no other way than injecting this value during CI. And KVStore in Prefect happens to be an extremely convenient mechanism to pass that information along to your flows
t

Tom Klein

12/02/2021, 6:13 PM
@Anna Geller hmm, not sure our DevOps would agree with that statement, since we're still a small startup and they (for example) are in charge of matters like security, resource management, infrastructure budgeting, etc. etc. --- so splitting it up in two possibly introduces a new array of concerns that would be hard for them to manage if it happens outside the (singular) CI/CD and ECR they work with not to mention that we ourselves don't want to add these concerns to our own process, we want to be concerned with
which
things run, or
when
or in
what
order, but not
where
or
how
...
a

Anna Geller

12/02/2021, 6:15 PM
whatever works for your team best, do that. The same thing could be done from a mono-repo, right? There is no right or wrong here.
t

Tom Klein

12/02/2021, 6:18 PM
ya, you're right... it's just because we're really stepping into a new field (for us, and for the company itself) i'm just trying to adhere to industry best-practices , so trying to understand what's more sane and what's less sane to do (in our particular case, which is probably similar to a lot of other companies of our size) 😆
👍 1
alright, i think i know what i'm missing - our Deploy dashboard already must have (or rely on ) some kind of mapping between
service-name
to its ECR image, so i guess we can talk to our DevOps to make it so we utilize the same mapping for Prefect... and then we'll just need to know the name/type of the container we're interested in, and not the actual ECR path
k

Kevin Kho

12/02/2021, 6:51 PM
That sounds like a good approach
View count: 7