https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Jeffery Newburn

    04/14/2021, 4:32 PM
    Tacking onto the last post. Are there any creative ways to limit the amount of tasks any given agent takes? Our agent is beefy but it will just keep taking flows until it runs out of memory. We have limited the flow concurrency but task concurrency doesn’t seem to fit right when we go with multiple agents. Personally I would love to be able to configure the agent itself to not bite off more than it can chew.
    k
    4 replies · 2 participants
  • v

    Varun Joshi

    04/14/2021, 5:22 PM
    The new automation feature is really helpful! Kudos to the Prefect team on that! 🙌
    😍 4
    🎉 5
    m
    1 reply · 2 participants
  • j

    Justin Chavez

    04/14/2021, 8:12 PM
    Hi! Are Dask Executors the only environment that can achieve parallelization for mapped tasks? For example, I have a custom task called
    run_command
    , and inside it is launching the command on a
    RunNamespacedJob
    to use Kubernetes. I have multiple commands that take a while to complete so I would like multiple Namespaced Jobs to run at the same time, I tried using a mapping like:
    with Flow as flow:
        run_command.map([cmd1, cmd2,...])
    But Prefect is running each Namespaced Job in serial. Would switching to a Dask executor be the key? Or could I adjust the map function to achieve parallelization?
    k
    r
    5 replies · 3 participants
  • r

    Riley Hun

    04/14/2021, 10:51 PM
    Hello, For the Prefect Server helm chart, I'm trying to expose the UI on an nginx controller ingress, but it's returning a 404 or 503 error. I can confirm that the UI was deployed successfully and it works when using a standard external load balancer. Concomitantly, I don't think the issue is nginx controller because I have exposed other applications on it. Also note that I have the UI working in tandem with nginx controller in a different environment (I think using a different version of the helm chart).
    prefect-ui-ingress.yaml
    m
    t
    +1
    13 replies · 4 participants
  • r

    Ranu Goldan

    04/15/2021, 3:13 AM
    Hi everyone, my team are using Prefect Cloud and find that secrets are attached to the team, not project. So to divide the secret between environment (dev/stg/prod) we want to create new team for each env. But I can't find the button on the settings-team? How to create a new team?
    ➕ 2
    c
    n
    7 replies · 3 participants
  • j

    Jeremy Tee

    04/15/2021, 6:58 AM
    Hi everybody, I am trying to retrieve results from my child flow in my parent flow. I am currently storing all my child flow results in s3. I am finding it hard to retrieve the location based on the states returned using
    client.get_flow_run_info("xxxxx")
    . Is there another way for me to get the location on where the task results are stored?
    👀 1
    m
    j
    2 replies · 3 participants
  • l

    Lukas N.

    04/15/2021, 11:24 AM
    Hello 👋 We're using Prefect server and running our flows with the KubernetesAgent. Sometimes a flow run is running twice in parallel. After a bit of investigation I found this: The first flow run fails the heartbeat so the ZombieKiller retries the flow run (starting the parallel execution). But the first one is still running, it's not dead, it just didn't do the heartbeat because of long blocking operation. Any ideas how to prevent this? I don't even know how the heartbeat system works
    No heartbeat detected from the remote task; retrying the run.
    k
    g
    8 replies · 3 participants
  • h

    Hawkar Mahmod

    04/15/2021, 12:15 PM
    Hey everyone, I am getting the following error:
    TypeError: can't pickle generator objects
    on a task that returns a generator. Now this task is not persisted using a
    Result
    , there is no Result or checkpointing enabled on the task. When I run locally the flow works just fine. However when I trigger via the Prefect UI, and use S3 Storage it tries to persist all tasks I think. This is what this line in the documentation refers to I believe (see image). How can I get this task to not be persisted by default if that is what is causing this.
    n
    2 replies · 2 participants
  • m

    Mickael Riani

    04/15/2021, 1:51 PM
    Hello everyone, I'm trying to find a way to do the task execution distribution on different server (batch and front). I would like my task to run with priority on my front server and if this one is not available I would like to run the task on the batch server. Do you know how I could do it?
    z
    1 reply · 2 participants
  • j

    Jérémy Trudel

    04/15/2021, 2:05 PM
    Hey everyone! I'm trying to log some parameter in a prefect flow. After launching a quick run, I can't find any mention of my log on cloud. It looks a bit like this:
    @task
    def extract_copy_history(cursor, schema_table):
      logger = prefect.context.get("logger")
      <http://logger.info|logger.info>(f"Schema table name is {schema_table}.")
    Now when I do a quick run on Prefect Cloud, no mention of it appears in my logs despite it being set on "Showing logs for all log levels". I see the log for the task in itself (extract_copy_history) and all other tasks. Just not my custom log.
    k
    17 replies · 2 participants
  • s

    Satheesh K

    04/15/2021, 2:38 PM
    Hello everyone! What is the best way to access actual mapped task results, I want to get only part of the result, E.g:
    intermediate_result = 0
    with Flow("flow") as flow:
    	param1 = Parameter("list1")
    	mapped_list = create_map(param1)
    	results = task1.map(mapped_list)
    	intermediate_result = results[1]
    	results2 = task2(results[1])
    k
    23 replies · 2 participants
  • g

    Greg Roche

    04/15/2021, 4:07 PM
    Hi folks, a question about reregistering flows after the flow logic has changed. We have a local agent (running inside a docker container) which executes flows that are stored in S3. Each flow is usually split across multiple Python files, usually with a
    main.py
    file that imports code from these other files and defines the actual flow logic. If
    main.py
    is updated, a simple re-registration of the flow seems to be enough to allow the agent to execute the updated flow, because the updated logic is stored on S3 and is then downloaded by the agent during the next execution. However, if one of the other files (which
    main.py
    imports) is changed, re-registration alone isn't enough to allow the agent to execute the updated flow, seemingly because only the content of
    main.py
    is stored on S3 at registration. Practically this means that almost every time we make any change any of our flows, we need to rebuild our docker image with the updated logic, redeploy it, and replace the old agent with the new one, before then re-registering the flow. Is there some way for us to register a flow so that all of the flow's code, not just the code in the file that defines the flow, is stored in S3 and we don't need to constantly rebuild and redeploy the agent's image for almost every change? Or is there a cleaner approach to solving this issue which has worked for anybody here? Thanks in advance.
    m
    5 replies · 2 participants
  • r

    Robin

    04/15/2021, 4:48 PM
    Dear prefect people, we have some troubles on running our dbt model with prefect. Using dbt run, we are able to run the model locally, however in prefect, we only get the following error
    ERROR - prefect.DbtShellTask | Command failed with exit code 1
    Any thoughts on what could be wrong or how to get further information and debug the flow are appreciated! 🙂
    k
    m
    60 replies · 3 participants
  • j

    Joseph Loss

    04/15/2021, 7:17 PM
    Quick question, has anyone implemented .net tasks using Prefect?
    k
    4 replies · 2 participants
  • c

    Carter Kwon

    04/15/2021, 7:28 PM
    Hello, I have a question around the registration and execution of flows. I have an ETL flow that looks something like this
    <task functions... >
    
    with Flow("ETL Flow", schedule=schedule, storage=Docker(registry_url=os.getenv("REGISTRY_URL"), image_name=os.getenv("IMAGE_NAME")), run_config=ECSRun(task_role_arn=os.getenv("TASK_ROLE_ARN"), execution_role_arn=os.getenv("EXECUTION_ROLE_ARN"))) as flow:
        DAYS_AGO = 5
        TARGET_DATE = (datetime.now() - timedelta(days=DAYS_AGO)).strftime('%Y-%m-%d')
    
        <use TARGET_DATE to make API calls inside tasks... >
    We have a CI/CD process in place that registers our flows after they've been pushed to git. For this particular flow,
    TARGET_DATE
    should equal
    today's date - 5 days
    because the API needs a few days for the analytics to be available. I've noticed that
    TARGET_DATE
    actually ends up being
    date of flow registration - 5 days
    . Is there a way to have this code executed every time the flow is run instead of once at registration so
    TARGET_DATE
    changes every day?
    z
    2 replies · 2 participants
  • j

    Julio Venegas

    04/15/2021, 7:36 PM
    Hi community! I’m creating my own instance of a Task class that returns multiple values, I added the Tuple return-type annotation but I’m still getting the error `TypeError: Task is not iterable. If your task returns multiple results, pass
    nout
    to the task decorator/constructor, or provide a
    Tuple
    return-type annotation to your task.` when I instantiate the Task with nout=2. Class in the thread. Any suggestions?
    k
    26 replies · 2 participants
  • r

    Ryan Baker

    04/15/2021, 9:45 PM
    If I build my own docker image for use with a prefect flow in prefect cloud, my experience has been that if I run the flow, then update the docker image, then run the flow again, it does not pull the new image, but is caching the previous image and using that. Is there a way I can clear the cache? Or am I forced to specify a new docker image in the run configuration, such as with a git-hash tag on the image?
    k
    9 replies · 2 participants
  • j

    jack

    04/15/2021, 10:25 PM
    Hey all! We were testing out our local prefect agent spinned up from our EC2 instance and we were able to run most flows except for one type - flows registered with Docker storage type. I believe docker flows aren't supported by local agents... Are there any good alternatives to Docker storage type flows for containerized flow storage method that could be used to more sophisticated flows? We would like to use the local agent for most things but don't want to move to a fargate agent or something anytime soon before we try other flow storage method.
    m
    5 replies · 2 participants
  • v

    Vincent

    04/16/2021, 1:44 AM
    Hi All, I was wondering is someone could help me identify why some of my tasks are pending. I have the following flow running on prefect cloud with a dask backend. for some reason, the task scheduler has not started 2/4 of the tasks. thanks for any advice
    k
    m
    10 replies · 3 participants
  • j

    Jeremy Tee

    04/16/2021, 4:57 AM
    Hi people, I am wondering how does everyone organize their code when defining a flow? Initially my intentions were to split task and flows each in a file, however when i save my flow into s3 and run it from an agent, it is not able to find the location of the "task" file! Thanks in advance!
    k
    1 reply · 2 participants
  • m

    Matthew Alhonte

    04/16/2021, 5:10 AM
    Would this affect Prefect? It uses type annotations at runtime, right? https://github.com/samuelcolvin/pydantic/issues/2678
    👀 2
    m
    n
    2 replies · 3 participants
  • j

    James Gibbard

    04/16/2021, 2:11 PM
    When registering a flow and it being stored in an S3 bucket, is it possible to use a different AWS_PROFILE for deploying, to the the one it uses when executed by Cloud? My machine uses a "production" profile to access the aws account, but inside AWS the profile is "default". Any ideas? Thanks.
    k
    10 replies · 2 participants
  • j

    Joseph Loss

    04/16/2021, 3:39 PM
    can someone please explain to me the changes implemented with service accounts? Do I auth login under my user 'joe' or a user 'scheduler' that I will have deployed on multiple servers. Does each one of these need a separate runner token?
    1 reply · 1 participant
  • j

    Julio Venegas

    04/16/2021, 5:25 PM
    Hi community! I have a question regarding how and when environment variables are pulled, and whether I need to pass environment variables to an agent or not. If I have the following python script for a flow
    import os
    from prefect import task
    
    @task
    def get_env():
        return os.environ.get("CURRENT_ENV")
    
    with Flow(name="get-env") as flow:
        env = get_env()
    
    flow.register(project="get_env")
    and I have “CURRENT_ENV” in my bash/zsh environment variables, and run the flow in with a LocalAgent, then it’s not necessary to pass any environment variables when I execute
    prefect agent local start
    because the environment variable is already in the local system. But if wanted to run the flow in a non-local environment, say in a Dask cluster in Kubernetes, then I would need to pass environment variables to
    prefect agent kubernetes start
    ?
    k
    3 replies · 2 participants
  • p

    Peter Peter

    04/16/2021, 5:37 PM
    Hello, Trying to work with Great Expectations Task and am having issues. What version of Great Expectations does prefect work with? Wondering if it is a version issue since people were getting the same error between versions of GE tutorials. Error I am getting is great_expectations.exceptions.exceptions.DataContextError: No validation operator
    action_list_operator
    was found in your project. Please verify this in your great_expectations.yml
    m
    k
    +1
    6 replies · 4 participants
  • m

    Marc Lipoff

    04/16/2021, 5:41 PM
    I'm running into a bit of a catch 22. I am trying to set up a CI process that registers the flows. Here is an example
    import pandas as pd 
    from prefect.storage.docker import Docker
    
    # ... task definitions 
    
    with Flow('test_flow', storage= Docker(
                registry_url=ecr_registry_url,
                image_name=a_repo_name,
                python_dependencies=["python==1.2"])
    ) as flow:
       
        # ... all the steps
    I then to to execute
    prefect build -p path/to/file.py
    and it throws an error that pandas is not installed (which it isnt)
    ModuleNotFoundError: No module named 'pandas'
    Is there a way to register a flow, without having to install the flow's dependencies first?
    k
    m
    15 replies · 3 participants
  • s

    Sean Talia

    04/16/2021, 7:07 PM
    hi all – I have a question about best practices around using secrets. I'm migrating a script to Prefect that has depended on a sensitive value being stored as an environment variable. Initially I was thinking to just using a
    Secret
    to set this sensitive value as one of the run config's environment variables, e.g.
    env = { 'SECRET_KEY' : Secret("SECRET_VALUE").get() }
    , but I'm wondering if this raises some kind of security issue when registering the flow to my Cloud instance. In this setup, would this
    Secret
    value be retrieved at the time of registration and then sent to Prefect Cloud as a part of the flow's metadata? Or would Prefect know that this env variable should be brought into the run config container only at flow runtime, and it's perfectly safe to do something like this?
    k
    6 replies · 2 participants
  • c

    Cab Maddux

    04/16/2021, 7:49 PM
    Hi! I have a flow running on preemptible nodes on GKE, which looks to have been preempted and subsequently caught by the Zombie Killer. My flow was then marked as failed but I expected the Lazarus process to pick it up and restart the flow after 10 minutes. As seen in the screenshot the Zombie killer fails the flow at 13:48 and then nothing until I manually restart the flow via the Prefect Cloud UI at 14:45 about an hour later. I confirmed that Lazarus is enabled for this flow. Is there anything else I need to do to have Lazarus pick this up?
    k
    n
    17 replies · 3 participants
  • t

    Tihomir Dimov

    04/16/2021, 8:11 PM
    Hi all, We have a flow which accepts as input string array and for each item 3 Tasks (get, map, save) are executed. Until now we are using
    flow.environment = LocalEnvironment(executor=LocalDaskExecutor(scheduler="threads", num_workers=num_workers))
    to achieve the following execution example: get['string1'], map['string1'], save['string1'] -> get['string2'], map['string2'], save['string2'] -> get['string3']..., but we experience some issues with the LocalDaskExecutor, therefore we want to use DaskExecutor instead, but we struggle configuring it to achieve the same result. Currently we use
    flow.executor = DaskExecutor()
    and the tasks run like this: get['string1'], get['string2'], get['string3'] -> map['string1'], map['string2'], map['string3'] -> save['string1']..., which is not recourse-effective. How can we configure the DaskExecutor to achieve the first execution example?
    👍 1
    k
    24 replies · 2 participants
  • a

    Adam Lewis

    04/16/2021, 10:38 PM
    Hi, I'm using the
    DaskExecutor
    with
    dask-kubernetes
    to spin up a dask cluster when a flow starts and running it to process 5,000 files via a mapped task with a few final aggregation tasks. I sometimes see (via the Prefect UI) that the dask cluster appears to spin down near the end of the task, but before it's completely done leaving a few tasks stuck pending with no workers to process them. Has anyone seen this before? If so, how did you solve it?
    k
    4 replies · 2 participants
Powered by Linen
Title
a

Adam Lewis

04/16/2021, 10:38 PM
Hi, I'm using the
DaskExecutor
with
dask-kubernetes
to spin up a dask cluster when a flow starts and running it to process 5,000 files via a mapped task with a few final aggregation tasks. I sometimes see (via the Prefect UI) that the dask cluster appears to spin down near the end of the task, but before it's completely done leaving a few tasks stuck pending with no workers to process them. Has anyone seen this before? If so, how did you solve it?
k

Kevin Kho

04/17/2021, 2:13 AM
Hi @Adam Lewis! This only happens sometimes but works other times? Is there anything in the logs that would give more details? Is it the aggregation tasks that get stuck in pending or the mapped tasks?
No clue if this will help but this might be relevant . The thread also talks about potentially upping resources.
a

Adam Lewis

04/17/2021, 2:28 PM
Thanks for pointing to that thread. That could be a similar problem since things were stopped near the end of the large mapped run. Things I could try, 1) upping the resources of the kubernetes job associated with this flow (or at least look at its memory usage to see how much memory it's using as the mapped tasks are gathered) 2) exposing the dask dashboard and watching it near the end of the mapped tasks 3) trying changing from using threads to processes.
k

Kevin Kho

04/17/2021, 2:32 PM
Yes those are all the thoughts I have for now. Decreasing the resources may also do something.
View count: 2