https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Miremad Aghili

    11/10/2022, 2:26 PM
    Hey guys, I wanted to know how I can get a flow to run on multiple schedules with different labels in prefect 1 (this is going to solve my original problem: How can I make a flow to run on every agent at the same time. so if there is a better solution please let me know.)
    r
    • 2
    • 6
  • v

    Vadym Dytyniak

    11/10/2022, 3:01 PM
    Hi. Is it possible in Prefect 2 to name the task based on input parameters? https://docs-v1.prefect.io/core/idioms/task-run-names.html
    ✅ 1
    b
    k
    +3
    • 6
    • 10
  • e

    Evgeny Ivanov

    11/10/2022, 3:34 PM
    Hi. There is the following suggestion in the prefect-dask documentation:
    Then, register to view the block on Prefect Cloud:
    prefect block register -m prefect_dask.credentials
    But there is no module
    credentials
    in
    prefect_dask
    . And I get the following error (not a surpirse):
    ```Unable to load prefect_dask.credentials. Please make sure the module is installed in your current
    environment.```
    I wonder if this module has ever existed? Or maybe there are plans to create it?
    ✅ 1
    k
    n
    a
    • 4
    • 4
  • a

    Arthur Jacquemart

    11/10/2022, 3:51 PM
    Hi everyone. I am working on a flow on prefect 1.4 and i am struggling to understand why my environment variables are not passing from one task to another. The flow is as follow: • In an upstream task, I add a couple of environment variable to the environment (os.environ object). I logged the object at the end of the task. and I can see that my new environment variables are loaded • Then in a downstream task, I use one of the environment variable and I get a key error. when logging the os.environ object, none of my new environment variable are there. I am missing something? I will add a screenshot of the schematic in the thread.
    m
    • 2
    • 6
  • d

    David Beck

    11/10/2022, 4:39 PM
    Hi again, in Prefect Cloud, once you have registered a new block, how does one remove it? So far I can't find anything in the documentation.
    ✅ 1
    b
    k
    +4
    • 7
    • 26
  • k

    Kalise Richmond

    11/10/2022, 6:00 PM
    https://prefect-community.slack.com/archives/C036FRC4KMW/p1668103207784329
    :party-parrot: 1
    🎉 2
    🙌 1
    j
    • 2
    • 2
  • p

    Patrick Tan

    11/10/2022, 6:51 PM
    I am testing deploying flow to S3 using storage block in prefect 2.0. When I start running flow using Prefect Cloud UI. The agent acknowledged flow run submitted, but wait for 3+ minutes before it starts executing flow run. Does the agent download code in S3 before it starts the execute flow run?
    ✅ 1
    m
    • 2
    • 10
  • b

    Boggdan Barrientos

    11/10/2022, 7:37 PM
    Hi all! What's the best way to implement the ELT flows in Prefect V2? I'm done with EL, I have a sensor task that wait for S3 file and then begins a sync with Airbyte in a Flow. I have multiple file and each file has his own connection, I made a deployment per File, now I want to execute a dbt job after all EL deployments has finished. I'm not sure how to wait for all EL flows, I think that I have to use two subflows one for EL and other for T, but I'm not sure how to pass the params only to one flow or reference the deployment previously created.
    n
    • 2
    • 9
  • a

    Ashley Felber

    11/10/2022, 10:23 PM
    Hello, I am trying to use the ECS block to deploy flows on Fargate. I have some questions on this: • Is there a reason to use Task Definition vs. Task Definition arn field in the ECS block? Is it just preference on providing name vs. arn? • Let's say I configure the ECS block with an existing task definition. This task definition includes the ECR image URI which was built with the flow script. But when building the deployment, it looks like I still need to include the path to the flow script. Referencing the example from this doc. So what role is the image playing here vs. reference to the flow script?
    prefect deployment build -n prod -q prod -a 
    -ib ecs-task/prod -sb s3/prod flow_script.py:flow_function_name
    a
    • 2
    • 3
  • t

    Tim Ricablanca

    11/10/2022, 10:24 PM
    Hello again! I think i’ve tracked down an issue with MS Teams notifications and the notification queue in general where only certain flow run states were being sent. This logger message appears:
    prefect-orion-1  | 21:53:34.590 | DEBUG   | prefect.orion.services.flowrunnotifications - Got 3 notifications from queue.
    However, it looks like the code intends for only one notification to be read off the queue in the first place: https://github.com/PrefectHQ/prefect/blob/main/src/prefect/orion/services/flow_run_notifications.py#L38-L42.
    m
    c
    • 3
    • 47
  • k

    Karlo

    11/10/2022, 10:40 PM
    Hey guys, I’m new to prefect development. Our team is still on v 1.3 How can I fail a task if the task log contains an error? Had a couple flows marked “Success”, but they actually failed in a certain task, the actual errors are recorded in the task’s log and are ERROR level but don’t seem to fail the task.
    :upvote: 1
    m
    c
    k
    • 4
    • 5
  • t

    Tobias

    11/11/2022, 8:25 AM
    I question about working with environment variables in AWS/ECS and Python. I am working on your template. Background: I have a few environment variables that relate to Snowflake (username, password, etc). When I run the flow locally I use python dotenv to load up environment variables. This works fine when i run Prefect locally as env files are loaded from a local .env file. This won’t be the case when I deploy to ECS, so I have to define the secrets elsewhere. Aim: I want to be able to pass on secrets to the prefect environment running on ECS. Question: What is the recommended way of working with environment variables in this case? 1. When I access the AWS ECS Block I can see that adding environment variables to the Task Run is an option (see screenshot). 2. It seems possible to load credentials from a specific block (also just secrets?) using
    from prefect_aws import MinIOCredentials
    3. I can also see that adding Secrets to Prefect Cloud is an option in 1.0, but this seems not to be possible in 2.0?
    n
    • 2
    • 2
  • t

    Thomas Fredriksen

    11/11/2022, 9:39 AM
    Hi there. I have a task that where I want to limit concurrency as it is quite memory intensive. I have been playing around with task concurrency limits, but so far I have not been able to limit my task concurrency. Before I go down the rabbit hole, will something like this work?
    @asynccontextmanager
    async def concurrency_limit(task: Task, limit: int = 10) -> Task:
    
        context = get_run_context()
    
        tag = f"{context.flow_run.id}-{task.name}"
    
        client = get_client()
    
        await client.create_concurrency_limit(tag, limit)
        decorated_task = task.with_options(tags=[tag])
    
        try:
            yield decorated_task
        finally:
            await client.delete_concurrency_limit_by_tag(tag)
    k
    m
    • 3
    • 11
  • m

    Manuel Garrido Peña

    11/11/2022, 10:38 AM
    hey Folks, I am trying to run a deployment, but some of the required files (a dbt project) relies on a path above the flows directory. is there any way I can add a different (not subfolder) folder to the added files in the deployment?
    a
    • 2
    • 1
  • t

    Tibs

    11/11/2022, 11:33 AM
    Hi, we are migrating from prefect 1 to prefect 2, in our previous flows we are using sqlalchemy ORM to run queries on our database, would you recommend to switch to the prefect-sqlalchemy library and use plain SQL?
    k
    • 2
    • 2
  • c

    Christian Juhl

    11/11/2022, 12:30 PM
    Hi guys, How do I store the result from submitted tasks that are run concurrently?
    from prefect import task, flow, get_run_logger
    
    @task
    def square_number(number):
    
        return number ** 2
    
    @flow
    def my_flow():
    
        squared_numbers = []
    
        for i in range(5):
            result = square_number.submit(i)
        
            squared_numbers.append(result.result())
    
        return squared_numbers
    
    if __name__ == '__main__':
    
        output = my_flow()
        print(output)
    In this example, the next task is not submitted until the previous completes (they don't run concurrently), but if I move append(result.result()) outside the for loop, only the result from the last task is returned. Thanks!
    c
    • 2
    • 4
  • k

    Kelvin DeCosta

    11/11/2022, 1:32 PM
    Hey everyone A bit of a story ahead. I discovered that our flow runs were failing almost instantly. The agent logs didn't provide an error message and just skipped to "Completed flow run submission". The
    ECSTask
    block uses the
    boto3
    ecs
    client to call
    run_task
    with some arguments that it derives based on the input attributes. The agent log, if set with
    DEBUG
    log level, shows the exact arguments that are passed to this function. I figured that since the failure happens almost instantly after this message, it had to be this call that was causing the fatal error. Fortunately, when I emulated the call via a python script, I ran into this error:
    botocore.errorfactory.InvalidParameterException: An error occurred (InvalidParameterException) when calling the RunTask operation: Some tags contain invalid characters. Valid characters: UTF-8 letters, spaces, numbers and _ . / = + - : @.
    So, if you have deployments that use the
    ECSTask
    infrastructure block, make sure you only use the following characters in the flow and deployment names:
    UTF-8 letters, spaces, numbers and _ . / = + - : @
    I'd rather not restrict flow / deployment names, so if there are workarounds to this problem, I'd really appreciate it.
    👍 1
    c
    • 2
    • 2
  • o

    Oscar Björhn

    11/11/2022, 1:48 PM
    I've run into an extremely strange docker-related issue where Prefect appears to be duplicating all the folders I've added to my docker image. In my docker files, I copy a bunch of folders to /opt/prefect/flows. Inspecting my created image, I can see that the folders are there, and nowhere else. However, when I inspect the container that is being run by Prefect using a deployment, all contents of /opt/prefect/flows have been duplicated to /opt/prefect. Is this behavior intended? I believe it might be related to the following settings in my deployment.yaml, but I can't figure out what I'd set them to in order to prevent this from happening:
    path: /opt/prefect/flows
    entrypoint: orchestration/flows/test_curated.py:default
    ✅ 1
    a
    • 2
    • 5
  • j

    Jorge Luis Tudela Gonzalez de Riancho

    11/11/2022, 2:25 PM
    Best way to run/call Apache Flink from prefect
    k
    • 2
    • 3
  • a

    Ashley Felber

    11/11/2022, 2:41 PM
    Hello, do you have any documentation on the AWS permissions required to 1) use s3 storage block 2) run flows using an ECStask infrastructure block?
    t
    c
    • 3
    • 9
  • m

    Manuel Garrido Peña

    11/11/2022, 3:09 PM
    does anyboy know how we get the error "Flow could not be retrieved from deployment" when running a deployment?
    m
    • 2
    • 2
  • s

    Stefano Cascavilla

    11/11/2022, 3:42 PM
    Hey everyone 👋 I'm experience an issue while starting prefect through the
    prefect server start
    command. I'm using the 0.15.6 Prefect version and, suddendly, when I try to start the server, an error occurs when hasura tries to create the pgcrypto extension on PostgreSQL. This is the error:
    postgres_1  | 2022-11-11 15:40:49.418 UTC [84] ERROR:  duplicate key value violates unique constraint "pg_extension_name_index"
    postgres_1  | 2022-11-11 15:40:49.418 UTC [84] DETAIL:  Key (extname)=(pgcrypto) already exists.
    postgres_1  | 2022-11-11 15:40:49.418 UTC [84] STATEMENT:  CREATE EXTENSION IF NOT EXISTS pgcrypto SCHEMA public
    It's strange because I've used prefect until yesterday and I've never had this issue Can anybody help me on this?
    m
    • 2
    • 7
  • d

    Dan Wise

    11/11/2022, 5:23 PM
    Hi is there a way to clear work queues? I can see a method to delete but not to clear. Have had instance where a agents polling the queue were down for quite a while and runs built up. I would like to be able to clear any delayed runs rather than having them all play through.
    a
    n
    • 3
    • 5
  • j

    jack

    11/11/2022, 8:51 PM
    Using a self-hosted orion server with domain someserver.com, what value should be used for
    --api
    when starting the agent?
    ✅ 1
    r
    • 2
    • 1
  • k

    Kishan

    11/11/2022, 9:26 PM
    I'm trying to run a deployment as a KubernetesJob on my prefect agent hosted on GKE. It says the flow run is pending and I am getting the following error in the pod
    No module named prefect.engine.__main__; 'prefect.engine' is a package and cannot be directly executed
    . Anyone know what this means? Also, for additional context, here are the storage and infrastructure blocks I set up.
    gcs_block = GCS.load("gcs-block")
    infra_block = KubernetesJob(
    image="us-east1-docker.pkg.dev/{MY PROJECT}/prefect-agents/{MY IMAGE}",
    env=dict(PREFECT_LOGGING_LEVEL="DEBUG"),
    )
    ✅ 1
    m
    • 2
    • 9
  • t

    Tayyab Iqbal

    11/12/2022, 3:02 AM
    Hey All, Can anyone guide me the problem here, I am facing issues while deploying
    prefect-server
    on a centOS 7 Virtual Machine with following versions: Python and pip libraries:
    Python 3.9.10
    prefect 1.4.0
    prefect-server 2022.9.23+15.g356da2d
    docker 6.0.1
    docker-compose 1.29.2
    Docker cli version: Docker version 20.10.21, build baeda1f Docker Compose version: Docker Compose version v2.12.2 I am unable to run the prefect server ui under these environments, startup logs as result of
    prefect server start --expose
    are attached Following is the curl output even if I do from localhost:
    [root@prefect-etl-vm-7 ~]# curl "<http://127.0.0.1:8080/>"
    curl: (56) Recv failure: Connection reset by peer
    Funny thing is I am running the same environment in a mac with just a docker desktop difference and it works on my local system. Any kind of help is much appreciated.
    prefect_logs.txt
    a
    j
    • 3
    • 6
  • l

    Lee Mendelowitz

    11/12/2022, 2:28 PM
    Hey Everyone - I’ve started to migrate to Prefect 2 and enjoying the journey so far. I’m wondering if there’s a Prefect 2 equivalent of the StartFlowRun task that exists in Prefect 1. I’m looking to create a task in flow run deployment A which runs a flow run deployment B, and then waits for that flow run to finish. It’s similar to the subflow concept, except I want to use different deployments (in my case, different docker images for the subflow runs). I couldn’t find a built in task for this, but it seems like I may be able to roll my own using the OrionClient’s
    create_flow_run_from_deployment
    . Thanks!
    ✅ 1
    k
    a
    b
    • 4
    • 8
  • r

    Russell Brooks

    11/14/2022, 1:30 AM
    I am on prefect 1.0 core and would like to be able to set a max number of tasks running concurrently. I see that cloud has task run limits, but how can I get to the same goal with core?
    a
    • 2
    • 5
  • m

    merlin

    11/14/2022, 5:02 AM
    I dont understand the reason for Collections. For example, there is a SQLAlchemy Collection which can use
    prefect_sqlalchemy.database.sqlalchemy_execute
    to send a SQL statement to a DB. But I can do that with with the sqlalchemy python library directly and write flows around that. I must be missing the point?
    ✅ 1
    m
    a
    • 3
    • 4
  • r

    Rajeshwar Agrawal

    11/14/2022, 5:37 AM
    Hey @Prefect, We have started testing prefect orion (2.6.7) to start migrating from our existing self-host prefect community v1 edition I was testing an edge case, where prefect agent was terminated after submitting a bunch of flows to it. I observed that if the work queue is terminated with jobs already running on it -- 1. The flow-run status still showed
    Running
    and does not changes state ever 2. If the work queue is started again, the flow-runs which were supposedly still running do not continue to run and are stuck in
    Running
    state. 3. The work queue does not accept any other pending flow-runs after restarting, and its status remains
    Unhealthy
    The expected behaviour is that once the agent is terminated, the associated
    Running
    flow-runs are marked as
    Cancelled
    or a similar state. Further, once the agent associated with work queue comes back up, it should be able to pickup new flow-runs submitted with its label Steps to reproduce 1. Start a local prefect orion server 2. Create a work queue
    test
    with parallelism 1 3. Build and deploy the attached flow
    log_flow.py
    4. Submit 20 jobs at once
    repeat 20 prefect deployment run 'log-flow/log-flow'
    5. Terminate the work queue process (close the terminate which started the work queue) Observe that work queue status changes to
    unhealthy
    however 1 flow runs that started running still show
    Running
    state. Further on restarting the queue, the interrupted flow-run does not resume and no new pending flow runs are submitted to the work queue
    log_flow.py
    m
    m
    • 3
    • 3
Powered by Linen
Title
r

Rajeshwar Agrawal

11/14/2022, 5:37 AM
Hey @Prefect, We have started testing prefect orion (2.6.7) to start migrating from our existing self-host prefect community v1 edition I was testing an edge case, where prefect agent was terminated after submitting a bunch of flows to it. I observed that if the work queue is terminated with jobs already running on it -- 1. The flow-run status still showed
Running
and does not changes state ever 2. If the work queue is started again, the flow-runs which were supposedly still running do not continue to run and are stuck in
Running
state. 3. The work queue does not accept any other pending flow-runs after restarting, and its status remains
Unhealthy
The expected behaviour is that once the agent is terminated, the associated
Running
flow-runs are marked as
Cancelled
or a similar state. Further, once the agent associated with work queue comes back up, it should be able to pickup new flow-runs submitted with its label Steps to reproduce 1. Start a local prefect orion server 2. Create a work queue
test
with parallelism 1 3. Build and deploy the attached flow
log_flow.py
4. Submit 20 jobs at once
repeat 20 prefect deployment run 'log-flow/log-flow'
5. Terminate the work queue process (close the terminate which started the work queue) Observe that work queue status changes to
unhealthy
however 1 flow runs that started running still show
Running
state. Further on restarting the queue, the interrupted flow-run does not resume and no new pending flow runs are submitted to the work queue
log_flow.py
FYI @David Haines
m

merlin

11/14/2022, 5:17 PM
I also have observed flows in Running state after a system restart. I'm only building a prototype in local environment, so I dont have all details. Deployed flow was running on agent queue, during long running task that connects to a DB service. Machine restarted, and after restoring the prefect agent and orion db the flow continued to show "Running" state for many days. I cant confirm if the agent picked up new scheduled flows or not.
m

Mason Menges

11/14/2022, 6:23 PM
Hey @Rajeshwar Agrawal @merlin we do have an open enhancement around this https://github.com/PrefectHQ/prefect/issues/7239
View count: 3