https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Shruti Hande

    12/09/2022, 9:13 AM
    I am trying to executed concurrent tasks of flow from multiple agents with same work queue, but only single agent is picking the tasks. Is there a way to run these tasks on multiple agent with same queue? Using prefect - 2.6.4 #prefect-community #prefect-contributors
    m
    • 2
    • 2
  • c

    Clovis

    12/09/2022, 10:11 AM
    Hi everyone ! Is it possible to save/update only part of a
    JSON
    block ? I did see nothing like that in documentation. To give some context, I got multiple concurrent tasks depending on a common block and actualizing only different part of it. As I don’t want to lose data and because of the concurrency, I want to avoid loading the
    JSON
    block in each tasks, saving it just after with my updates and risking to overwrite it with missing value. So my question here is can I save only part of the
    JSON
    block ? Or maybe there is another solution (like lock or something) to prevent block data loss when it comes to async parallel task ?
    m
    • 2
    • 2
  • v

    Vadym Dytyniak

    12/09/2022, 10:34 AM
    Hi. After upgrading agent to 2.7.1 getting the error provided in thread.
    a
    m
    +4
    • 7
    • 21
  • t

    Thomas Fredriksen

    12/09/2022, 1:14 PM
    Hello everyone. I am experimenting a bit with the dask taskrunner, and I am running into some issues related to priority. I have the following test-flow:
    from typing import List, Tuple
    
    import dask
    from prefect import flow, get_run_logger, task
    from prefect.context import get_run_context
    from prefect_dask import get_dask_client
    
    
    def is_prime(number: int) -> Tuple[int, bool]:
        if number == 2 or number == 3:
            return number, True
        if number % 2 == 0 or number < 2:
            return number, False
    
        for i in range(3, int(number ** 0.5) + 1, 2):
            if number % i == 0:
                return number, False
    
        return number, True
    
    
    @task
    def get_primes_from_split(min_number, max_number) -> List[int]:
    
        if min_number % 2 == 0:
            min_number += 1
    
        with get_dask_client() as client:
            futures = [client.submit(is_prime, n) for n in range(min_number, max_number, 2)]
    
            maybe_primes = [future.result() for future in futures]
    
        return [value for value, flag in maybe_primes if flag]
    
    
    @flow(name="example_prime_number_search")
    def main(max_number: int = 1_000_000, split_size=10_000):
        log = get_run_logger()
        context = get_run_context()
    
        <http://log.info|log.info>("Task Runner: %s", context.task_runner.name)
        <http://log.info|log.info>("Searching for primes from up to %d", max_number)
    
        futures = [get_primes_from_split.submit(x, x + split_size) for x in range(0, max_number + 1, split_size)]
        primes = [value for future in futures for value in future.result()]
    
        if len(primes) > 10:
            <http://log.info|log.info>("Found %d primes: %s, ...", sorted(primes)[::-1][:10])
        else:
            <http://log.info|log.info>("Found %d primes: %s", sorted(primes)[::-1][:10]
    When running this with the
    DaskTaskrunner
    , the task
    get_primes_from_split
    is scheduled first, then the dask-future
    is_prime
    . Since
    get_primes_from_split
    is scheduled first it gets higher priority, which causes the dask-execution to lock up, as it is waiting for the task to complete before executing anything else.
    get_primes_from_split
    naturally is waiting for
    is_prime
    to complete, which unfortunately will not execute at this point. Toying around with priorities, I managed to get
    is_prime
    to execute:
    from typing import List, Tuple
    
    import dask
    from prefect import flow, get_run_logger, task
    from prefect.context import get_run_context
    from prefect_dask import get_dask_client
    
    
    def is_prime(number: int) -> Tuple[int, bool]:
        if number == 2 or number == 3:
            return number, True
        if number % 2 == 0 or number < 2:
            return number, False
    
        for i in range(3, int(number ** 0.5) + 1, 2):
            if number % i == 0:
                return number, False
    
        return number, True
    
    
    @task
    def get_primes_from_split(min_number, max_number) -> List[int]:
    
        if min_number % 2 == 0:
            min_number += 1
    
        with get_dask_client() as client:
            futures = [client.submit(is_prime, n, priority=100) for n in range(min_number, max_number, 2)]
    
            maybe_primes = [future.result() for future in futures]
    
        return [value for value, flag in maybe_primes if flag]
    
    
    @flow(name="example_prime_number_search")
    def main(max_number: int = 1_000_000, split_size=10_000):
        log = get_run_logger()
        context = get_run_context()
    
        <http://log.info|log.info>("Task Runner: %s", context.task_runner.name)
        <http://log.info|log.info>("Searching for primes from up to %d", max_number)
    
        with dask.annotate(priority=0):
            futures = [get_primes_from_split.submit(x, x + split_size) for x in range(0, max_number + 1, split_size)]
            primes = [value for future in futures for value in future.result()]
    
        if len(primes) > 10:
            <http://log.info|log.info>("Found %d primes: %s, ...", sorted(primes)[::-1][:10])
        else:
            <http://log.info|log.info>("Found %d primes: %s", sorted(primes)[::-1][:10])
    This causes dask to schedule a few instances of
    get_primes_from_split
    , which in turn schedules all its instances of
    is_prime
    .
    is_prime
    executes properly and starts returning its results, but it doesn't seem like
    get_primes_from_split
    picks up execution. I really don't understand what is going on here. Can anyone provide some insight into how do this kind of execution without reaching a deadlock like above?
    ✅ 1
    k
    a
    • 3
    • 7
  • j

    Jelle Vegter

    12/09/2022, 2:42 PM
    Hi all, I’m looking how to migrate over to Prefect 2.0 and am designing my deployment pattern. Is there an equivalent to the extract_flow_from_file function in Prefect 2.0?
    m
    • 2
    • 1
  • r

    Rio McMahon

    12/09/2022, 2:44 PM
    Within prefect 1.0 is it possible to have multiple slack webhook URLs for different slack channels? e.g. we want to delineate between data engineering and data science prefect alerts (different alerting channels for each). Looking at the docs: https://docs-v1.prefect.io/core/advanced_tutorials/slack-notifications.html#installation-instructions it seems like there is only one secret per account called
    "SLACK_WEBHOOK_URL"
    .
    ✅ 1
    m
    • 2
    • 2
  • k

    Kendall Bailey

    12/09/2022, 4:00 PM
    Question regarding resource management in Prefect 2.x when all tasks are async (i.e. “submitted”) and some tasks are optional, in 🧵
    • 1
    • 5
  • c

    Chris Gunderson

    12/09/2022, 4:40 PM
    Hi all - Not sure why this error is occurring. I've have included s3fs, gcsfs, and adlfs in the Poetry toml file used to create the Docker image.
    k
    m
    r
    • 4
    • 35
  • d

    Denis Sh

    12/09/2022, 5:24 PM
    Hi all, how do you guys check if Block exists inside a Flow? I tried wrapping with
    storage_name = f"{self.account.name}-session"
    try:
        self.session = JSON.load(storage_name)
    except ValueError as e:
        self.logger.error(e)
        self.session = JSON(value={})
        self.session.save(name=storage_name)
        <http://self.logger.info|self.logger.info>(f"created session storage ({storage_name=})")
    but flow keeps failing on this exception.. How to gracefully handle it? ADDED: seems the problem was capital letters in block name! otherwise code works as intended. fixed by modifying type in model definition for account.name to constr(to_lower=True)
    ✅ 1
  • e

    Edmund Tian

    12/09/2022, 5:58 PM
    Could someone help me understand if a Prefect Flow supports my use case? I’m building something similar to Plaid. I’m planning on using a Prefect Flow to orchestrate my per-user data pipeline. Here’s a simplified version of what it looks like: 1. Extract the user’s bank transactions. This involves logging into their bank with their submitted credentials, scraping their bank transactions, and saving this raw data to bucket storage. 2. Transforming the bank transactions. This involves downloading the raw data from bucket storage, transforming the data into our standardized format, and saving this data to a postgres table. This Flow must be run on a per-user basis. There are two instances in which this Flow is ran: 1. When a user is first created 2. During a daily job to refresh all user’s data Thus, if I have 1M users, then I would need something that supports 1M concurrent Flow Runs. Is this achievable with Prefect?
  • j

    Joshua Grant

    12/09/2022, 6:04 PM
    I'm having trouble registering my flows with S3. I saw on the prefect discourse that there is an undocumented
    upload_options
    argument to the
    S3
    block, which is great because I need to specify
    ServerSideEncryption
    , however, I'm receiving
    AccessDenied
    botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
    ✅ 1
    r
    • 2
    • 1
  • a

    Alex F

    12/09/2022, 6:40 PM
    Hello All , I got weird problem with Prefect 1.0 flow run , I have other flows running fine with same server and same Agent(s) . I made a small change on this one and got new version , but still getting error .
    17:06:30 INFO Agent1
    Submitted for execution: PID: 3327309
    17:06:31 ERROR execute flow-run
    Failed to load and execute flow run: PermissionError(13, 'Permission denied')
  • j

    John-Craig Borman

    12/09/2022, 6:40 PM
    Does anyone have any links to docs/code for setting up a Prefect Orion webserver for testing client interractions specifically?
    r
    • 2
    • 6
  • a

    Aniruddha Bharadwaj

    12/09/2022, 7:50 PM
    Hi All, am trying to deploy a flow.. but instead of deploying the flow it is running it..am a little confused. could anyone help me? below is the statement:
    prefect deployment build ./data_source_flows/master_flow.py:master_flow -n master_flow -q test --storage-block github/flows --apply
    ✅ 1
    t
    • 2
    • 2
  • d

    Dillan Smith

    12/09/2022, 8:15 PM
    Hi, I've looked through a couple questions on the helm/k8s setup for Orion but havent found my answer. This is probably just me missing something about k8s itself. Im just trying to test self-hosting orion on my local machine using the helm chart found here and then opening the locally running dashboard. The command im using is
    helm install prefect-orion-poc prefect/prefect-orion --values  helm-orion-values.yaml
    and my values file is
    namespaceOverride: helm-prefect
    postgresql:
      auth:
        password: password
    ingress:
      enabled: true
    It outputs
    Get the application URL by running these commands: <http://prefect.local/>
    but that comes back as an unknown host. Im pretty new to K8S so any help is appreciated.
    a
    • 2
    • 3
  • l

    Lucas Cavalcanti Rodrigues

    12/10/2022, 10:48 PM
    Can someone help me with this question? https://stackoverflow.com/questions/74757050/how-to-deploy-locally-multiple-flows-using-prefect-2-0
    ✅ 1
    a
    • 2
    • 2
  • e

    Edmondo Porcu

    12/11/2022, 2:23 PM
    Hello, does anyone use Prefect Helm Chart? https://github.com/PrefectHQ/prefect-helm#the-database-deploys-correctly-but-other-services-fail-with-bad-password I think I have this problem but I am not sure I understood the solution
    j
    a
    • 3
    • 8
  • n

    Nico Neumann

    12/11/2022, 9:45 PM
    Hey 🙂 I have shared code that is used by both prefect flows + system where no prefect is installed. Thus I need a logger which is able to log everywhere. I created an own logger but have some problems with prefect to get it running. I only found documentation on how to set up a custom logger for v1: https://docs-v1.prefect.io/core/concepts/logging.html#extra-loggers But I think it should work the same for v2. I also had a look here: https://discourse.prefect.io/t/prefect-logging-faq/1476
    ✅ 1
    a
    • 2
    • 6
  • l

    Lucas Cavalcanti Rodrigues

    12/11/2022, 10:15 PM
    Hey, I'm running prefect orion on Docker. The prefect UI launched fine but I'm keeping getting this error. What am I doing wrong?
    ✅ 1
    r
    • 2
    • 4
  • e

    Emma Rizzi

    12/12/2022, 9:39 AM
    Hi! I'm using Prefect 1.2.4 with docker storage and kubernetes agent I need to change the umask of the flows as they are writting to shared storage. I tried setting it in the .bashrc as I did for my users outside of flows, but apparently it is ready only for interractive sessions so not with flows! I see the recommended way is to set the umask in the entrypoint of the docker, example :
    ENTRYPOINT ["./set_umask.sh"]
    https://codeyarns.com/tech/2017-07-21-how-to-set-umask-for-docker-container.html#gsc.tab=0 Is there a way to override the entrypoint of the flow ? Or any other solution to set umask?
    👀 1
    1️⃣ 1
    • 1
    • 1
  • a

    Anco

    12/12/2022, 12:23 PM
    Hi all. I'm using run_deployment on existing deployments (Prefect 2.7.1) but I notice a 5 to 15 sec delay in the start of the flow run. I dont use scheduled runs so it should start right away. Should I look for parameters or config to remove the delay or is it something enviroment related?
    j
    r
    a
    • 4
    • 17
  • j

    Joël Luijmes

    12/12/2022, 1:48 PM
    Hi I was wondering if there are any guidelines/ideas on running subflows. As far I know we can do it either; by referencing/calling them from your code directly or using
    run_deployment
    . Assuming that all prefect flows are contained in the same repository, and are deployed to the same orion server. Are there any benefits/disadvantages over one or the other?
    ✅ 1
    a
    • 2
    • 4
  • j

    Jelle Vegter

    12/12/2022, 2:01 PM
    I’m attempting to set up Prefect 2.0 and I want to run flows in a docker container. My docker image is in an Azure Registry. I created a block referring to the image. I’m getting the following error: No module named prefect.engine.__main__; ‘prefect.engine’ is a package and cannot be directly executed. Any clue what I’m misssing?
    ✅ 1
    r
    • 2
    • 15
  • s

    Santhosh Solomon (Fluffy)

    12/12/2022, 2:14 PM
    I have got below error while running my code in Prefect cloud. The flow generated large number of tasks, so I am assuming there is a limit for number of tasks per flow?
    12:48:12.659 | ERROR | prefect.infrastructure.process - Process 'optimal-toucanet' exited with status code: -9; This indicates that the process exited due to a SIGKILL signal. Typically, this is caused by high memory usage causing the operating system to terminate the process.
    j
    • 2
    • 4
  • a

    Ashley Felber

    12/12/2022, 3:00 PM
    Hello, I’m reposting an issue i’m having as I have not been able to resolve. I have created a deployment using an ECStask block. The ECStask block is configured to run an existing task definition. When I try to run the deployment I get the following error: Submission failed. KeyError: "No class found for dispatch key 'ecs-task' in registry for type 'Block'." This error shows up under details of the flow run but there are no other logs. I already tried registering the block again. cc: @Taylor Curran
    ✅ 1
    m
    j
    • 3
    • 8
  • a

    Adam Roderick

    12/12/2022, 3:53 PM
    Hello, I have 4 tasks that run successfully when running locally using FlowRunner. When running in Prefect Cloud, I get the following results How can I troubleshoot this? • TypeError("'generator' object is not callable") • SystemError('unknown opcode') • SystemError('Objects/dictobject.c:1527: bad argument to internal function') • The 4th task hangs indefinitely
    d
    m
    • 3
    • 19
  • s

    Scott Chamberlain

    12/12/2022, 4:20 PM
    I’d like to follow a workflow with Prefect 2.0 of separate
    @task
    functions called from a
    @flow
    - all of our work is done outside of Python called via make commands, which I currently use
    shell_run_command
    for. However, you can’t put
    shell_run_command
    within a function decorated with
    @task
    . I could just remove the
    @task
    decorator. Is there a workflow folks use to leverage
    @task
    decorator that run shell commands?
    ✅ 1
    k
    p
    r
    • 4
    • 14
  • s

    Sean Conroy

    12/12/2022, 5:18 PM
    Hi...I need help printing task results using Prefect 1: seems like printing the output only prints the task name, not task result:
    from prefect import task, Flow
    
    @task
    def get_value():
        return 10
    
    with Flow("task-results") as flow:
        v = get_value()
        print(v)
    
    state = flow.run()
    I get
    <Task: get_value>
    instead of
    10
    . What am I doing wrong?
    ✅ 1
    m
    • 2
    • 2
  • s

    Scott Chamberlain

    12/12/2022, 6:16 PM
    I haven’t been able to find info on this yet, but I’m guessing it exists in docs somewhere: file structure. AFAICT
    prefect deployment build
    only allows for pointing at one file. If you want to pull in imports from other files like utility functions (not pypi packages) how do you suggest doing that? I assume we don’t have to have all code in the one file the deployment build command points at?
    ✅ 1
    r
    • 2
    • 2
  • a

    Ashley Felber

    12/12/2022, 6:32 PM
    Hello, I have created a deployment using an ECStask block. The ECStask block is configured to run an existing task definition. When I try to run the deployment I get the following error: "Submission failed. botocore.errorfactory.AccessDeniedException: An error occurred (AccessDeniedException) when calling the RegisterTaskDefinition operation: User <> is not authorized to perform: ecs:RegisterTaskDefinition on resource: * because no identity-based policy allows the ecs:RegisterTaskDefinition action." Why does the agent need the ability to register a task definition if it already exists?
    m
    • 2
    • 20
Powered by Linen
Title
a

Ashley Felber

12/12/2022, 6:32 PM
Hello, I have created a deployment using an ECStask block. The ECStask block is configured to run an existing task definition. When I try to run the deployment I get the following error: "Submission failed. botocore.errorfactory.AccessDeniedException: An error occurred (AccessDeniedException) when calling the RegisterTaskDefinition operation: User <> is not authorized to perform: ecs:RegisterTaskDefinition on resource: * because no identity-based policy allows the ecs:RegisterTaskDefinition action." Why does the agent need the ability to register a task definition if it already exists?
m

Michael Adkins

12/12/2022, 6:50 PM
We register a new task definition if you are using settings that require a new task definition 🙂 some fields can only be updated with a new task definition.
If you turn on DEBUG logs, we’ll display a diff of the task definition required for run the
ECSTask
as configured vs your pre-registered one.
e.g. https://github.com/PrefectHQ/prefect-aws/pull/166
a

Ashley Felber

12/12/2022, 6:55 PM
Sorry if I missing this but where/how do I turn on DEBUG logs?
m

Michael Adkins

12/12/2022, 7:02 PM
PREFECT_LOGGING_LEVEL=DEBUG on your agent
a

Ashley Felber

12/12/2022, 7:09 PM
Ok thanks and where do I view the logs?
m

Michael Adkins

12/12/2022, 7:12 PM
Wherever your agent is running
a

Ashley Felber

12/12/2022, 10:08 PM
Ok I see the logs but not sure how to read them. How do I know what's missing from the task definition using the logs?
m

Michael Adkins

12/12/2022, 10:20 PM
There’s an example in that issue, can you share the output?
Ah, you see all those “+” bits? That’s content that was added at runtime.
This is because you’ve enabled
stream_output
which means we need to configure a log stream which can only be set per task definition not task run.
a

Ashley Felber

12/12/2022, 10:42 PM
Thanks. For the log configuration, these values are already set in my task definition: "logConfiguration": { "logDriver": "awslogs", "options": { "awslogs-group": "/ecs/analytics-ct-snowflake-load-historical-task", "awslogs-region": "us-west-2", "awslogs-stream-prefix": "ecs" } Are you saying it needs to be set somewhere else? Also, are the volumesFrom and name components that have a + required? What's the name for and why does the VolumesFrom need to be set?
m

Michael Adkins

12/12/2022, 10:45 PM
I’m not sure what the volumes thing is — I’m not sure there’s a real difference there.
Do you have
stream_output
set on your
ECSTask
?
a

Ashley Felber

12/12/2022, 10:46 PM
Yes I do, I was told it was actually required
m

Michael Adkins

12/12/2022, 11:05 PM
Ah I see, nice! The difference seems to just be the container name
We require a container named “prefect” to exist — do you have the name set to that on your task definition?
a

Ashley Felber

12/12/2022, 11:18 PM
Nope the container has a different name, trying that now
Ok that resolved that issue, thanks!
🔥 1
m

Michael Adkins

12/12/2022, 11:45 PM
No problem! I opened an issue to clarify this
:wizard2: 1
:gratitude-thank-you: 1
View count: 4