https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • t

    Tim Enders

    01/21/2022, 9:04 PM
    My brain is fried (yay Friday) and I can't seem to find this in the documentation. But how do I specify an upstream task explicitly when using the
    with Flow() as flow:
    pattern?
    k
    • 2
    • 5
  • m

    Michael Bell

    01/21/2022, 9:26 PM
    Hey folks. I am having a really hard time with dependency resolution between
    prefect
    and
    dask-cloudprovider
    right now. It seems
    dask-cloudprovider[aws]
    relies on
    aiobotocore
    which pins to a very specific
    botocore
    version and that's causing conflicts when trying to set up my environment. Anyone have any experience with this?
    k
    a
    m
    • 4
    • 12
  • m

    Madison Schott

    01/21/2022, 9:33 PM
    Hi! Is it possible to set up a Slack notification or any kind of alert for checking the data volume of a Fivetran task?
    k
    q
    • 3
    • 6
  • a

    Alex To

    01/21/2022, 11:10 PM
    Hello prefect community. I am in the process of evaluating various products to replace our in-house orchestration tool. Prefect stands out from the crowd with intuitive UI and flexible workflow branching (conditional, loop, dynamic) which our tool does not do and the other contender does not do very well. (:cough:
    bareflow
    ) Our use case is slightly different: we will be using the tool for container execution orchestration in which each task simply invokes a container (within our k8s cluster or ECS) or a job (databrick job). Each container is an atomic unit of work written in any language. This architecture de-couples orchestration from the actual functional task (container) and avoid recoding of hundreds of existing tasks/containers. This has been working well for us using our internal tool. Our flow would be simply as
    task1: call container-A; task2 call Databrick-job-B; task3: call container-C after task1 and task2 are completed.
    My questions are: 1. Based on the documentations, my best option is to run local agent on some EC2 instances with localDashExecutor. Using any other agent type would require additional resources and add more latency. e.g: With Kubernetes agent, one pod for the Flow run and another for the actual container; Latency = double spin up time. The downside is scaling problem with local agent. Do I understand it correctly? Any other approach? 2. Any plan to add ECSRunTask to AWS Tasks? This is to run any arbitrary task defined outside of prefect context in ECS. Similar to Airflow ECS Operator? I am surprised it's not already on the list. Thanks
    k
    a
    • 3
    • 3
  • t

    Tim Enders

    01/22/2022, 3:09 PM
    If anybody can help, how would I limit the number of threads in a LocalDaskExecutor?
    v
    • 2
    • 2
  • c

    Chris K.

    01/22/2022, 4:09 PM
    I have a similar question to the last, I want to limit the amount of parallel flow runs. I have a very expensive (computationally) ML pipeline and I don't want to run into any blocking issues of the Model API that I use within my flow. I'm registering flow runs via docker (+docker agent). How can I make prefect wait for a flow run to have finished before a new one is started?
  • c

    Chris K.

    01/22/2022, 4:11 PM
    current behavior: I submit two flow runs, two flow runs start executing (in parrallel) intended behavior: I submit two flow runs, the first starts, the second is waiting until the first has finished
  • c

    Chris K.

    01/22/2022, 4:11 PM
  • c

    Chris K.

    01/22/2022, 4:14 PM
    actually from the docs, I don't understand executor, since I don't define an executor at all in my file and still prefect runs my flows perfectly fine and according to the docs
  • c

    Chris K.

    01/22/2022, 4:26 PM
    it seems that by not setting an environment the default environment is used. The default environment loads the default executor which is the localExecutor which the docs say
    An executor that runs all functions synchronously and immediately in the main thread. To be used mainly for debugging purposes.
    k
    • 2
    • 17
  • g

    Guy Thompson

    01/23/2022, 4:13 PM
    Howdy Prefect!
    👋 7
    c
    k
    +3
    • 6
    • 6
  • a

    Ashton

    01/23/2022, 8:29 PM
    hey y'all, do tasks support returning generators? Currently, when I return a generator from a task, the results don't get returned or get picked up in the next task. Couldn't find anything in the docs about it
    e
    a
    k
    • 4
    • 6
  • y

    Yusuf Khan

    01/23/2022, 9:42 PM
    Hey, I'm reading about state handlers and I understand how to use them to send a slack notification if a task fails. I'm wondering how I can do something like: task 1 - Check if network connection is healthy task 1.1 - If network connection is healthy then go do the rest of the tasks task 1.2 - If network connection is not healthy then reset connection and return to task 1 task 2 - Something task 3 - Something task 4 - Something The issue I'm trying to workaround is I have a task running on a raspberry pi, and the network keeps giving out, and I don't want to blindly reset it each time, rather I'd prefer to check if its on, then reset it if needed. I saw tasks can be triggered based on "all_failed", "any_failed" etc. But I guess in this scenario I only care if the one that directly precedes it failed or not.
    a
    • 2
    • 3
  • c

    Chris L.

    01/24/2022, 3:52 AM
    Hello Prefecters! Wondering how one might use the resource manager (https://docs.prefect.io/api/latest/tasks/resources.html#functions) with the imperative API for flows (https://docs.prefect.io/core/concepts/flows.html#imperative-api). I'm using the imperative API with a for loop to add a list of tasks to the flow. But I would like to pass the result (a SQL connection) from
    ResourceManager().setup()
    to my tasks and also have cleanup. Thanks!
    k
    • 2
    • 5
  • a

    Adi Gandra

    01/24/2022, 6:21 AM
    Hey, i’m new to using prefect - but i’ve been trying to follow along with this tutorial: https://towardsdatascience.com/distributed-data-pipelines-made-easy-with-aws-eks-and-prefect-106984923b30 I have most of it working, pods get spun up when i run a flow. However, they are unable to pull the image from ECR that I have pushed. I keep getting the error (i have replaced the account number and repo name)
    Message: Failed to pull image "<http://z.dkr.ecr.us-east-1.amazonaws.com/r:latest|z.dkr.ecr.us-east-1.amazonaws.com/r:latest>": rpc error: code = NotFound desc = failed to pull and unpack image "<http://z.dkr.ecr.us-east-1.amazonaws.com/r:latest|z.dkr.ecr.us-east-1.amazonaws.com/r:latest>": failed to resolve reference "<http://z.dkr.ecr.us-east-1.amazonaws.com/r:latest|z.dkr.ecr.us-east-1.amazonaws.com/r:latest>": <http://z.dkr.ecr.us-east-1.amazonaws.com/r:latest|z.dkr.ecr.us-east-1.amazonaws.com/r:latest>: not found
    I have followed the tutorial pretty closely, although some stuff what slightly outdated so I used the newer paradigms that were introduced. Any insight on how I could configure my EKS to be able to pull images from the ECR would be appreciated. I also did all of this on my master admin account, so I would think that permissions shouldn’t be an issue
    a
    • 2
    • 19
  • t

    Thomas Hoeck

    01/24/2022, 9:48 AM
    Hi, I have a problem were logs for a task run doesn't show up on the in the logs tab but only under the overview (see pics). It doesn't help to try and bring the from "glacier storage". Has anyone tried something similar? I'm using Prefect Cloud.
    a
    • 2
    • 4
  • c

    Clovis

    01/24/2022, 10:58 AM
    Hi everyone ! Relatively new to Prefect, we're currently using this (great!) tool to orchestrate our ELT process (with Airbyte & DBT). Until yesterday and since a few weeks, everything was working great but this night, an odd error occurred on my `AirbyteConnectionTask`: •
    JOB XXX failed
    •
    Final state : Success
    All my tasks were set as reference, so I really don't know how to explain why this flow showed no trace of error (taskbar was green, etc.). Do you have any clues to explain this behaviour?
    a
    • 2
    • 3
  • d

    Dekel R

    01/24/2022, 12:59 PM
    Hey all, I have a couple of Prefect flows - both local and Vertex runs. All of them are using docker storage. Now the flows work without any problem - but sometimes I do get the following error:
    . 23 January 2022 11:00pm]: 500 Server Error for <http+docker://localhost/v1.41/images/create?tag=2022-01-19t11-58-16-139521-00-00&fromImage=uxxx-flow>: Internal Server Error ("Get "<https://xxx>": context deadline exceeded")
    Seems like the docker registry is unavailable for some reason for a short period of time - exactly when Prefect is trying to pull a flow image and run it. I have 2 questions that I couldn’t find any answers to: 1. Is there any configuration option I can use in order to make Prefect retry the image pulling on failure? 2. Can I configure Prefect to pull images only upon change (new flow version) and saving them locally? The way it works right now is that each run invokes an image pull. Thanks!
    a
    • 2
    • 1
  • m

    Muddassir Shaikh

    01/24/2022, 1:16 PM
    I get this error, when i try to restart a failed task. How to fix this ?
    a
    • 2
    • 2
  • x

    Xavier Babu

    01/24/2022, 2:51 PM
    In Prefect 2.0, Is there any option via config to change the default port number 4200?
    a
    m
    • 3
    • 52
  • d

    David Yang

    01/24/2022, 3:02 PM
    Hi All, I have this ELT scenario and appreciate that you could give me suggestions. 1: ET runs as first step that pulls data from a SQL server, saves data into zipped csv files and then uploads them into snowflake by PowerShell scripts. 2: After EL is done, T runs through a dbt project 3: Then we need to process tabular cubes and Power BI datasets through PowerShell scripts and API. I'm thinking a dedicated image for each step and then use prefect to create containers in a workflow, run each step as a task insider this container one by one. and then shutdown containers in the flow's last step. Is that possible? Another way is that creating flow for each step and another "parent" flow to mange the workflow.
    a
    m
    • 3
    • 3
  • d

    dammy arinde

    01/24/2022, 3:13 PM
    Hi all! We have a staging table in Snowflake where we want to update the data validation status of files. We are using Great_expectations to handle the validations and we see the results under the artifact in the UI. If the validation fails, the flow fails and the log shows failed in the UI. 1) Is there a way to get the status of a flow (FAILED or SUCCESS) in a prefect task so we can update the Snowflake table from the task? 2) Is there a way to get the great_expectations result in a prefect task? Thanks!
    a
    k
    • 3
    • 4
  • l

    Leon Kozlowski

    01/24/2022, 4:10 PM
    Is there a way to restart a flow run from the beginning rather than from failed/cancelled task state?
    k
    • 2
    • 4
  • p

    Pedro Machado

    01/24/2022, 4:38 PM
    Hi there. I am getting this error in the
    wait_for_flow_run
    task when trying to implement a flow of flows:
    Unexpected error: TypeError('Object of type FlowRunView is not JSON serializable')
    Any ideas? Code in thread.
    k
    • 2
    • 16
  • a

    Adi Gandra

    01/24/2022, 4:52 PM
    Is it possible to build the docker image ourselves that prefect uses to execute the code? For instance I have:
    flow.storage = Docker(registry_url="<http://x.dkr.ecr.us-east-1.amazonaws.com|x.dkr.ecr.us-east-1.amazonaws.com>",
                             image_name="y",
                             image_tag='latest')
    flow.executor = DaskExecutor()
    flow.run_config = KubernetesRun(env={})
    
    # flow.run()
    flow.register(project_name="SA-Test", build=False)
    I want to build this manually as part of our CI/CD and push it to the ECR. That part is done. Now when I want to run the prefect flow, the kub pod gets the image but then I get an error:
    Failed to load and execute Flow's environment: ValueError('Flow is not contained in this Storage',)
    Is there anything special I need to do in my Dockerfile or when I build this image to allow it to be successfully used without building inside Prefect
    k
    a
    s
    • 4
    • 47
  • b

    brian

    01/24/2022, 5:01 PM
    My team is using Git storage with prefect cloud. We plan to have 2 projects: 1 for dev and 1 for prod. We would like the flows in the dev project to all pull from a “dev” branch of their respective git repos, and the flows in the prod project to pull from a “prod” branch. What’s the standard pattern for doing this? My understanding is that the storage is defined when we register the flow to prefect cloud. Is this correct? If so, is there a way to parameterize the branch_name argument when we use when we initialize Git storage? Thanks in advance for any help!
    k
    • 2
    • 5
  • m

    Mehdi Nazari

    01/24/2022, 5:12 PM
    Hi Community, I have a Flow configured to take in a parameter and after a few processing on the parameter, It sends it to a mapped task for further processing. Problem is, to be able to debug it locally on my computer, I’m facing a few issues. I have been following the debugging guides and not sure what is it that I’m doing wrong.
    with Flow("Reporting-Data-Import",
              storage=Local(path="/app/flow.py", stored_as_script=True),
              run_config=DockerRun(image="reporting_data_import:prod-prefect")) as flow:
        # load parameters
        with open("parameter.json", "r") as f:
            default_param = f.read()
        # 
        json_param = Parameter("data_sources", default=default_param if default_param else "").default
        flow_parameter = extract_parameters(json_param)
        is_valid = is_parameter_valid(flow_parameter)
        with case(is_valid, False):
            <http://logger.info|logger.info>(f"Invalid parameter! {flow_parameter}!")
        # copy table & content
        copy_table_content.map(param=flow_parameter.sources, upstream_tasks=[is_valid])
    Please share your thoughts as I’m not sure where my issue is.
    k
    • 2
    • 16
  • s

    Suresh R

    01/24/2022, 6:10 PM
    Hi,Below is my state handler snippet, i am getting error
    'Running' object has no attribute 'is_success'
    ,can someone help.
    def post_to_cw(obj, old_state, new_state):
        if new_state.is_failed():
            cloudwatch.put_metrics(cid=CID, flow_name=FLOW_NAME, status_code=1)
        if new_state.is_success():
            cloudwatch.put_metrics(cid=CID, flow_name=FLOW_NAME, status_code=0)
        return new_state
    k
    • 2
    • 3
  • j

    Jason Motley

    01/24/2022, 6:11 PM
    I have what may be a dumb question. I've set my tasks to have multiple retries. I've noticed that if the first try fails, the others will always fail as well, no matter how long the delay. Example - I lose connection to our database during an extract task. 2 minutes pass before a retry, and it loses the connection again. A day later (next run), things go fine. So, are the retries actually doing anything?
    k
    • 2
    • 14
  • p

    Pedro Machado

    01/24/2022, 6:11 PM
    Hi again. Is there an idiom to run mapped tasks respecting the order of the item in the sequence? I believe I can use
    LocalExecutor
    and it will run them in order but was wondering if there is a way to explicitly define the dependencies. For example, in this case, I'd like to make sure that the mapped task for 2 runs after 1 and 3 runs after 3.
    items = [1, 2, 3]
    ny_task.map(items)
    thanks
    k
    • 2
    • 3
Powered by Linen
Title
p

Pedro Machado

01/24/2022, 6:11 PM
Hi again. Is there an idiom to run mapped tasks respecting the order of the item in the sequence? I believe I can use
LocalExecutor
and it will run them in order but was wondering if there is a way to explicitly define the dependencies. For example, in this case, I'd like to make sure that the mapped task for 2 runs after 1 and 3 runs after 3.
items = [1, 2, 3]
ny_task.map(items)
thanks
k

Kevin Kho

01/24/2022, 6:14 PM
If you are using a map, you can’t. But if you know
items
beforehand and it’s not a task output, you can do this
Use a for loop to construct your dag
p

Pedro Machado

01/25/2022, 1:15 PM
Thanks!
View count: 3