https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Joseph

    01/13/2021, 8:24 PM
    Can I have multiple overlapping instances of a given flow run (let’s saw with different parameters)? If so can I have shared state between those runs (preferable in memory rather than on disk)?
    j
    5 replies · 2 participants
  • v

    Venkatesh Tadinada

    01/13/2021, 10:13 PM
    Hello, Happy to join this channel
    👋 6
  • m

    matta

    01/14/2021, 12:41 AM
    How many accounts would you generally need? Just 1 for each Data Engineer or would every Ops person also potentially need their own account?
  • r

    Riley Hun

    01/14/2021, 2:46 AM
    Would really appreciate some assistance on this issue I'm experiencing with my prefect deployment. I am able to register it fine, but it fails to run:
    AttributeError: 'FunctionTask' object has no attribute task_run_name. Did you call this object within a function that should have been decorated with @prefect.task?
    I've encountered this error before and usually it's due to version conflicts. This time, I can confirm that all my prefect versions are consistent between my local machine and docker images. I also submitted the flow using the same prefect version
    0.13.18
    . My prefect server is hosted on kubernetes, and I'm submitting my flow using docker storage from my local machine. When I bash into my docker container, I can see that the flow is indeed there. I'm able to run the flow against the Dask Cluster no problem.
    c
    3 replies · 2 participants
  • j

    Jan Rouš

    01/14/2021, 3:27 AM
    I'm working on using prefect to improve the structure and speed of our data pipeline. We are dealing with lots of data O(10GBs) and I have noticed that when i use
    LocalExecutor
    the implicit in-memory caching results in the ETL eventually getting oom-killed 😕 I'm using checkpointing and some of the tasks explicitly write their results to disk where they are picked up by subsequent stages so the in-memory caching is not strictly necessary. I have not really found a way how I could tell prefect to not bother with this or at least, not eat all available memory while doing so. Any pointers for how I could solve this problems would be really helpful!
    m
    12 replies · 2 participants
  • s

    Sasha Mikhailov

    01/14/2021, 10:12 AM
    Need an advice on how to use context in cloud Hey there! Glad to be here :-) We use Prefect via Cloud. There some parameters in our flows. I want to store parameters' values in the prefect.context.  Code from the docs works fine locally:
    @task(log_stdout=True)
    def print_context():
      print('print context from task:', prefect.context.get('db_env'))   
       
    DB_ENV = Parameter("database to connect to", default='local')
    
    prefect.context['db_env'] = DB_ENV  # ← doesn't work in cloud
    
    with Flow('testing_context') as flow:
      print_context(upstream_tasks = [DB_ENV])
          
    flow.run()
    then I register the flow in a docker storage and push it to the cloud. And it doesn't work when I run the flow via cloud ui: it shows None instead of parameter.
    s
    m
    6 replies · 3 participants
  • r

    Rolf Schick

    01/14/2021, 12:55 PM
    Hi has anyone had this error before?
    Failed to load and execute Flow's environment: UnpicklingError("invalid load key, '{'.")
    g
    s
    +2
    11 replies · 5 participants
  • d

    Dolor Oculus

    01/14/2021, 2:21 PM
    Is there a way to prevent the prefect docker flow storage from pip installing the latest and greatest prefect? My build setup/team is aghast. (We prefer to explicitly manage dependencies ourselves.)
    s
    b
    +2
    9 replies · 5 participants
  • l

    Linnea Sahlberg

    01/14/2021, 2:51 PM
    Good morning! I’m seeing that for CPU intensive tasks, memory usage is ~4 times (or more) higher when I run a task in a prefect flow context vs not in a prefect flow context. I’m working on putting together a simpler example to post here, but in the meantime was just curious if this is something that is known or has been seen before? We are currently using Prefect version 0.12.0, if relevant. Thanks so much!
    m
    1 reply · 2 participants
  • c

    Charles Leung

    01/14/2021, 3:08 PM
    Hey prefect team, just wanted to let you know a heads up on installing GCP extras with pip - 🙂 i ran into this issue where it kept saying it needed to be installed as a prefect extra, but realized after i removed the try/except in the _init that_ it's due to this issue: https://github.com/googleapis/google-cloud-python/pull/9826. After upgrading my six package it's all good now, but maybe it's worth rewording the importerror?
    ➕ 1
    m
    8 replies · 2 participants
  • m

    Mitchell Bregman

    01/14/2021, 4:19 PM
    Hey there, is this possible? I want to default out another parameters based on a previous parameter. In this example, let’s say
    x
    is a string and I want to
    .lower()
    it.
    flow = Flow("some flow")
    with flow:
        x = Parameter("x")
        y = Parameter("y", default=x.foo())
    The use case I will be doing this for is date specification and building dynamic dates based on a main driving parameter
    m
    8 replies · 2 participants
  • j

    Josh

    01/14/2021, 4:39 PM
    Is there a concept of a task of tasks? I have a few ETL flows that all start with similar steps of
    1. List S3 files
    2. Filter to S3 files I want to copy
    3. Download S3 files
    4. Upload to GCP
    I’m wondering if I can make a compound task that can parameterize the following steps. Or is the prefect idiom to make a dependent flow? Part of the motivation is laziness, but if we ever change where/how we are starting our data ingestion, it’s be great not to have to change it in multiple places
    m
    j
    21 replies · 3 participants
  • k

    Kyle Flanagan

    01/14/2021, 5:21 PM
    I think there's something I'm missing about states that I just can't figure out. I want to send a notification of a Task failure but ONLY if the task has exceeded its retries. I can't use is_failed(), because I'd get a notification for each failure, same for is_retrying(). I could look at the run_count and if it's on its very last retry go ahead and send a failure notice, but that's not necessarily true (the final attempt may succeed). I thought maybe is_finished() would only be True on the final execution, but that's not the case. Is there a way to know if a task failed and exceeded its retries? (I'm using a task state handler for this)
    👀 1
    m
    4 replies · 2 participants
  • r

    Raphaël Riel

    01/14/2021, 5:33 PM
    Hello Prefect Community! I have a Flow that needs some post-clean-up. The Clean-up tasks is configured to always run, no matter what happened beforehand. Unfortunately that seems to also make my Flow always Succeed. Any way to make the flow fail even if the last Task Succeeded?
    m
    3 replies · 2 participants
  • j

    jcozar

    01/14/2021, 5:53 PM
    Hello Prefect Community! I’ve been testing Prefect and it is amazing 🙂 I am simulating different task flows. I love versioning the flows, but it comes to my mind that, when a flow has a bug (i.e. discovered because new input data raises a new exception) and causes an error in a flow run, I need to deploy a new flow version and restart the failed flow run. In that case, if I restart the failed flow run it uses the previous version. What is the recommended workflow for this use case? To make a manual flow run with the correct parameters? Thank you in advance!
    n
    2 replies · 2 participants
  • j

    Joseph

    01/14/2021, 7:41 PM
    Is there any precedent to creating a custom trigger? For example could I create a trigger that would cause a specific task (not flow) to not be run until a particular time?
  • j

    Jeremy Phelps

    01/14/2021, 8:18 PM
    Hello Prefect community! Is there a way to make Prefect stop trying to execute a flow? There’s a pod in my Kubernetes cluster that is labelled with a particular flow run ID. It will never succeed because I took down the Docker registry it depends on. If I delete this pod, Prefect will create another one. The button in the Prefect UI to cancel the flow run was greyed out, so I simply deleted the flow run with the
    delete_flow_run
    GraphQL mutation, thinking that without the flow run's DB entry, Prefect would stop trying to execute it. That removed the flow run from the UI, but if I delete the associated Kubernetes pod, Prefect still recreates it. I found no subcommand in the
    prefect
    CLI tool that can solve this problem, either. Is there any way to stop Prefect from doing this?
    j
    m
    8 replies · 3 participants
  • a

    Aiden Price

    01/14/2021, 10:59 PM
    Hi everyone, I ran into the
    Could not serialize object of type Success
    problem that I've seen a few times on this Slack. From my reading of https://github.com/PrefectHQ/prefect/issues/3625 I suspect it's because I'm using
    resouce_manager
    classes to hold database connection and HTTP connection pools while I map over everything in my Dask cluster. Is it not possible to share a connection pool across a Dask mapped pipeline (thinking about it now, it would be difficult)? If not how should I go about mapped reading & writing? Thanks.
    1 reply · 1 participant
  • m

    Marwan Sarieddine

    01/15/2021, 12:23 AM
    Hi folks, I am running a graphql mutation to delete a flow but I can still see the flow on the UI even after refreshing - is this because there are older versions of the flow ? I see there is an option to delete all the versions of a flow from the UI, is there an equivalent approach using graphql to do so?
    n
    11 replies · 2 participants
  • m

    matta

    01/15/2021, 1:06 AM
    Does Depth-First Execution work if you need to use a
    flatten
    at some point? Like if you pull a bunch of IDs and it takes the form of a list of lists, and then use those as the basis for a bunch of other calls?
  • s

    Sonny

    01/15/2021, 3:11 AM
    Hi all, new to prefect - quick question. In the past our team used Kylo (a NiFi abstraction) for basic workflows and Airflow for complex workflows. One key requirement is to have a way to define a "template" pipeline. lets say pull from blob storage and dump to staging database. Things like blob storage location and database table names may change and should be tied to a particular run. Kylo did a good job of this; a self service for end users to create flows from NiFi templates - users could create the entire pipeline from UI using the template as base. The Kylo project hasn't been maintend much. Airflow doesn't fit this model as dags have to be developed for each new run instance unless you get into dynamic dags etc. Does Prefect offer something like this in the prefect UI? Let me know if this makes sense or a more detail example is needed.
    j
    3 replies · 2 participants
  • j

    Joël Luijmes

    01/15/2021, 10:33 AM
    I’m running Prefect in Kubernetes. What I have is a flow which spawns 15 Kubernetes high CPU intense jobs. In order to do some parallelism, I have in the flow a DaskExecutor configured (6 workers, 1 thread). What I see is the prefect-job which is created by the Kubernetes agent, uses quite some resources (400m CPU, ~850 MiB) . Is there a better alternative to deploy this? Which uses less resources? The alternative is using a static dask cluster, but as multiple flows can be ran at the same time, this doens’t seem like a better alternative. I’m open to some suggestions 🙂
    n
    16 replies · 2 participants
  • v

    Vipul

    01/15/2021, 2:10 PM
    Hi everyone, was wondering if there is any easy way to find the result of all the task inside the flow through GraphQL. The rationale being we trigger the task from Flask and wanted to show the result for each task from Flask UI
    n
    3 replies · 2 participants
  • r

    Raphaël Riel

    01/15/2021, 2:29 PM
    Hello all! Does setting+updating a Secret From within a Task is something possible with Prefect Cloud (In Cloud Context) ? I see I can easily do this via the Client/GraphQL, but didn’t manage to find a way to access the task’s current client/GraphQLClient. Any hints on how I could achieve writing to a secret from within a Task?
    n
    14 replies · 2 participants
  • j

    J. Martins

    01/15/2021, 3:35 PM
    Hi! I would like increase the retry_delay of a failed task each time the task fails. Is there any way to achieve this?
    a
    j
    2 replies · 3 participants
  • l

    liren zhang

    01/15/2021, 4:00 PM
    Hi Prefect experts, I am wondering what is the web server that manages the prefect UI. I do not see that is being part of prefect core server architecture unless the web server is incorporated as one of the component like Apollo etc.
    n
    4 replies · 2 participants
  • m

    Matthew Blau

    01/15/2021, 4:17 PM
    Hello all, I have a more conceptual question today: My company has existing ETL tasks that are all in their own docker containers. I have been successful in having prefect run the container as is. My question, however, is how would we be able to get the most out of prefect. Right now each ETL task lives in its own docker container with its own sort of logging handled by python's logging class. We are wanting to move everything over to being handled by prefect and I am looking for input and advice on how best to utilize the capabilities of Prefect for Flow deployment. I know that flows can be restarted at any stage in which they fail, but how would we be able to utilize this if the task the flow is running on is in a docker container? Are they any docs that I could read, as well as examples? We are wanting to be able to get alerts at any point in the process chain upon failure, i.e. if the API call times out or the task of loading the data into our databases fail. Thank you in advance for your help and advice!
    👀 1
    n
    45 replies · 2 participants
  • n

    Nate Lambeth

    01/15/2021, 5:20 PM
    hello all! i am in the process of trying to hack together an ETL flow. i've successfully gotten prefect server running on a remote host in a docker container, and it can run a simple flow with no problem as long as the flow is built into the container's image. what i don't quite follow is how to register another flow without stopping and rebuilding the container--it looks like there used to be a tutorial on using prefect with Docker, but it was removed? https://docs.prefect.io/orchestration/tutorial/docker.html is there an example or tutorial somewhere that explains this? i can't figure out how to run the python script containing the flow registration code in the running prefect environment
    n
    12 replies · 2 participants
  • l

    Loic M

    01/15/2021, 5:36 PM
    Hello everyone ! Sorry If this has already been answered here but my search did not return anything. I have trouble understanding authentication through google cloud to store my results with the help of prefect cloud secrets: Following this part of the documentation, I have setup a secret GCP_CREDENTIALS containing the JSON service account key. From what I understood, this should be enough to automatically authenticate my flows with the right credentials: however, when I run my docker Agent, my task fails with the following traceback:
    google.auth.exceptions.DefaultCredentialsError: Could not automatically determine credentials. Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials and re-run the application. For more information, please see <https://cloud.google.com/docs/authentication/getting-started|https://cloud.google.com/docs/authentication/getting->started
    Is there something I am missing there ? Should I include the google credentials when building my flow's docker image ?
    j
    n
    3 replies · 3 participants
  • j

    Josh

    01/15/2021, 5:38 PM
    Is it possible to run multiple flows from the same docker image? I’m using a docker agent and have multiple flows with the same set of dependencies. So I wanted all the flows to use the same docker image with different tasks and parameters per flow. When I run register the flows, each register call builds and pushes a docker image for that flow which will start to add up once I have more than a few flows. Is there a way to ask the flow/storage to check if a docker image exists already and only create if it doesn’t exist?
    j
    n
    5 replies · 3 participants
Powered by Linen
Title
j

Josh

01/15/2021, 5:38 PM
Is it possible to run multiple flows from the same docker image? I’m using a docker agent and have multiple flows with the same set of dependencies. So I wanted all the flows to use the same docker image with different tasks and parameters per flow. When I run register the flows, each register call builds and pushes a docker image for that flow which will start to add up once I have more than a few flows. Is there a way to ask the flow/storage to check if a docker image exists already and only create if it doesn’t exist?
j

Joël Luijmes

01/15/2021, 6:29 PM
Yes this is possible! You can build the image with multiple flows, and then push it once. After which you can register the flows. I originally found this example https://gist.github.com/mousetree/8e58096ae3ad742407cd4042597984d6
:upvote: 1
🙏 1
n

nicholas

01/15/2021, 9:36 PM
Agreed with @Joël Luijmes here and I'd add that you could also use the ability to extend Docker images to build yourself a base image that includes those assets that change with less frequency and then extend that image with each of your flows.
💯 1
j

Josh

01/15/2021, 10:10 PM
thanks for the sample! I’m definitely going to build a base image with the dependencies and then a second layer image with the individual tasks I need.
@Joël Luijmes How do you do this with run_configs. I saw that environments are deprecated and I’m getting this error
Failed to load and execute Flow's environment: KeyError('<FlowName>')
I’m trying to use docker storage on a vm if that helps.
Was able to just use run_configs instead of environments and got it running
🚀 2
View count: 4