https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    James Phoenix

    01/22/2021, 7:25 PM
    What am I doing wrong with my state_handler?
  • j

    James Phoenix

    01/22/2021, 7:25 PM
    AttributeError: 'Failed' object has no attribute 'Retrying'
    [2021-01-22 19:24:18] ERROR - prefect.my_flow | Unexpected error occured in FlowRunner: AttributeError("'Failed' object has no attribute 'Retrying'")
  • s

    Sean Talia

    01/22/2021, 7:46 PM
    is there any documentation on using the
    agent_config_id
    argument to launch an agent? I'm trying to find out how I can configure the parameters that I'd like to set up for the various agents I intend to launch, but it's not clear to me where this is supposed to be managed from
    n
    • 2
    • 3
  • s

    Sean Talia

    01/22/2021, 8:07 PM
    also just an FYI, there's a link in the ECSRun documentation that links to the docs for the wrong boto3 function:
    n
    • 2
    • 1
  • m

    Mark McDonald

    01/22/2021, 9:46 PM
    is there a way to verify from the prefect cloud UI or from the graphql api which Executor a flow is using?
    n
    • 2
    • 2
  • j

    Josh

    01/22/2021, 10:25 PM
    I’m running a prefect flow that moves a large amount of files from s3 to google cloud storage. When there are a relatively small amount of files to transfer, it’s quick. But as the files to transfer grow and especially if file size is large, the prefect process takes up increasingly large amounts of memory. I am using the
    S3Download
    and
    GCSUpload
    tasks. My suspicion is that the flow is not releasing the memory of the files being transferred. Is there any way to ensure the file contents are being released from memory?
    n
    s
    • 3
    • 4
  • g

    Giovanni

    01/23/2021, 1:49 AM
    Hi Prefect team! I am running a prefect flow with a considerable amount of tasks (~1k backfilling that fetches some APIs and runs workloads on Kubernetes+Dask). At the beginning I had no issues consulting the logs; however, after a couple of hours, when clicking on Flow > Logs, the page reloads automatically and the following message is displayed: "Oh no! This page seems to have gone missing. Sorry for disrupting your flow. Back to dashboard". The only way for me to go back to the flow then, is to force refresh the page. Do you have any idea of what might cause this? Thanks a lot 🙂
    c
    • 2
    • 2
  • j

    Jacob Lester

    01/23/2021, 2:02 AM
    Hey Prefect community. Dropping in here to get some advice on contributing to the repo. Specifically, I made a few changes to server MySQL task and want to spin up a local instance to test. I tried building the docker image and running (with PYTHON_VERSION and PREFECT_VERSION build args) but the container immediately stops when I run it. Am I doing this right? I couldn't find any documentation about running the stack locally and wasn't sure where to start. Any help would be greatly appreciated
    c
    • 2
    • 2
  • a

    Ajith Kumara Beragala Acharige Lal

    01/23/2021, 2:23 AM
    Hello Experts, am trying to configure email notifications(alerts) , but am stuck due to this error
    c
    • 2
    • 11
  • g

    George Shishorin

    01/24/2021, 10:04 PM
    Dear Prefect Community, As it is known, there are some great features for caching tasks results, that make sometimes much convenience to reuse. But at the same time I faced with a problem related to how Prefect deal with RAM during the flow running. The main question: is it possible to remove Tasks results and thus make RAM free during the flow running? Most of flows contains quite difficult computing logic. There is no way to combine several functions into one Task (atomicity and readability will be lost). I would provide an example for prefect vs “nonprefect” flow and its RAM consumption...->
    s
    • 2
    • 15
  • b

    Borut Hafner

    01/25/2021, 1:46 PM
    Dear All, I want to use a fixed point in time in my tasks. An option would be to create a task whiche returns current datetime, but i suppose that flow can offer this information in form of a flow start time. Is a way to get a flow stard date time in a task? Thanks for your answers!
    ✅ 1
    l
    d
    • 3
    • 4
  • f

    Felix Schran

    01/25/2021, 1:59 PM
    Hi all, in prefect core how can I save all the aumatic logging messages to a file? First (before I run the flow), I create a logging file by calling loguru.logger via logger.add("logs.txt"). logger.info("Some info message before the flow is run") then writes "Some info message before the flow is run" to the file build.txt. Next, I run the flow and then get all these nice logging messages about the tasks. For instance: [2021-01-25 12:30] INFO - prefect.TaskRunner | Task 'xyz': Finished task run for task with final state: 'Success'... How can I add these logging messages to the logging file "logs.txt" from above? So ideally my build.txt file should include in the end: ... | Some info message before the flow is run. ... | Task 'xyz': Finished task run for task with final state: 'Success'...
    d
    m
    • 3
    • 5
  • j

    Jan Marais

    01/25/2021, 2:51 PM
    Hi All. I'm new to Prefect and I want to experiment with running a flow on AWS ECS. I'm using this example: https://github.com/lauralorenz/fourteen-oh/blob/main/14-fargate.py, only with S3 storage and the relevant AWS parameter changes. Also making use of a local backend. When running the above script, the flow gets registered successfully and the agent starts. When I run the flow in the UI, it gets submitted and I can also see a ECS task running in AWS console and then it disappears. But the prefect task never starts and the flow never finishes. Any idea what is wrong here? Let me know if I need to share anymore detail. Thanks!
    l
    m
    • 3
    • 21
  • c

    ciaran

    01/25/2021, 4:24 PM
    Hi Folks, is there a way for the Timeline view when running a flow to display each map iteration of a task in its own bar? It'd be nice to see the work actually happening in parallel visually. If it's easier to do with Dask (I'm using the
    LocalDaskExecutor
    ) - Does anyone know how I can visualise the Dask Diagnostics when running Prefect flows? I know I can look at the individual mapped tasks, but I just wanted to see if there's a similar visualisation to how Dask does it
    f
    • 2
    • 9
  • b

    Braun Reyes

    01/25/2021, 6:31 PM
    hey there wanted to ask about this snippet For Prefect Core server or Prefect Cloud users: ... • You can override the automatic result subclass at the global level, flow level, or task level Can you set the result subclass globally?
    d
    k
    • 3
    • 10
  • p

    Philip MacMenamin

    01/25/2021, 7:56 PM
    issues connecting to graphql:4200 on docker Hi, I'm trying to run the docker agent similar to https://github.com/PrefectHQ/server/issues/25 and am still unable to connect to the container it seems:
    requests.exceptions.ConnectionError: HTTPConnectionPool(host='host.docker.internal', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fd0e8943640>: Failed to establish a new connection: [Errno 101] Network is unreachable'))
    ^C[2021-01-25 19:55:03,534] INFO - agent | Keyboard Interrupt received: Agent is shutting down.
    k
    • 2
    • 4
  • f

    Fina Silva-Santisteban

    01/25/2021, 11:36 PM
    Hi Prefect community, I hope everyone’s doing well! I need help setting up Docker Storage and using the docker prefect agent. My Dockerfile looks like this:
    FROM python:3.7
    WORKDIR /app
    COPY requirements.txt .
    RUN pip install -r requirements.txt
    COPY . .
    My flow like this:
    (...)
    storage = Docker(dockerfile="../Dockerfile")
    
    with Flow("Generate Report", result=LocalResult(), storage=storage) as flow:
    (...)
    When I try to register the flow, it seems to have issues with finding the
    requirements.txt
    file needed to create the image:
    [2021-01-25 15:21:03-0800] INFO - prefect.Docker | Building the flow's Docker storage...
    Step 1/14 : FROM python:3.7
     ---> ca194d6afe58
    Step 2/14 : WORKDIR /app
     ---> Using cache
     ---> a729913b700e
    Step 3/14 : COPY requirements.txt .
    COPY failed: stat /var/lib/docker/tmp/docker-builder883681743/requirements.txt: no such file or directory
    
    Traceback (most recent call last):
      File "/Users/fina/Documents/github/concierge_iro_reporting/etl/register_all_flows.py", line 53, in <module>
        register_locally()
      File "/Users/fina/Documents/github/concierge_iro_reporting/etl/register_all_flows.py", line 21, in register_locally
        report_lt_target.register(project_name=project_name)
      File "/Users/fina/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/core/flow.py", line 1675, in register
        idempotency_key=idempotency_key,
      File "/Users/fina/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/client/client.py", line 783, in register
        serialized_flow = flow.serialize(build=build)  # type: Any
      File "/Users/fina/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/core/flow.py", line 1450, in serialize
        storage = self.storage.build()  # type: Optional[Storage]
      File "/Users/fina/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/storage/docker.py", line 363, in build
        self._build_image(push=push)
      File "/Users/fina/.pyenv/versions/3.7.3/lib/python3.7/site-packages/prefect/storage/docker.py", line 431, in _build_image
        "Your docker image failed to build!  Your flow might have "
    ValueError: Your docker image failed to build!  Your flow might have failed one of its deployment health checks - please ensure that all necessary files and dependencies have been included.
    My folder is structure is like this:
    App
    -->prefect_flows
      -->my_flow.py
    requirements.txt
    Dockerfile
    Why is it looking for the requirements.txt inside
    /var/lib/docker/
    instead of the App’s directory? What am I missing? (The container runs normally when I use docker-compose to build and run the app)
    k
    a
    • 3
    • 4
  • a

    Adam Roderick

    01/26/2021, 2:45 AM
    Is this button not working?
    d
    • 2
    • 1
  • s

    Shyam Meduri

    01/26/2021, 8:34 AM
    Hi, Is it possible to upload a file to a Prefect Flow, say while initiating a Flow run, either as a parameter or context object, which can then be further read in a task?
    n
    • 2
    • 2
  • p

    psimakis

    01/26/2021, 9:16 AM
    Hello everyone! Using tasks with manual approval is a nice way to semi-automate the flows that could not be fully automated yet. A great feature about paused tasks could be to support a time-out functionality. When a time-out value is passed, paused task will stay paused only for a period of time and the user will always has the ability to resume/approve the task earlier that the time-out. A use case: Use prefect to start an ECS instance for N hours but allow user to stop the instance any time he thinks that the task in ECS instance is done. Always stop instance after N hours to prevent extra charges. I know that this feature is currently not supported but I would like to know if you find it is useful and also how this functionality could be implemented with prefect.
    j
    • 2
    • 1
  • m

    Marion

    01/26/2021, 12:11 PM
    Hi everyone nice to meet you all. I have to run Prefect's flows from my API, I was wondering if you had a REST endpoint to run a flow, instead of using the prefect pylib.
    j
    w
    • 3
    • 8
  • s

    Severin Ryberg [sevberg]

    01/26/2021, 2:00 PM
    Hi all! We've recently run into the issue that very short tasks which should run in less than a second (tested when using a LocalAgent) can take much longer than that when using a KubernetesAgent. Unsurprisingly, it appears the culprit is the overhead time it takes for the DaskKubernetesEnvironment to spin up and tear down a worker pod, which is done for each task invocation. • Is it possible to avoid this overhead (or at least only pay it once), by telling the KubernetesAgent and/or DaskKubernetesEnvironment to always keep X worker pods alive which continue to wait for and complete tasks without dying until the end of the flow? • Or, if the above isn't possible, then perhaps the only thing to do here is have less finely-resolved tasks so that the overhead is not so noticeable. In such a case, what is the suggested single-task execution time when using a KubernetesAgent / DaskKubernetesEnvironment? • Possibility for other routes?
    j
    r
    • 3
    • 14
  • m

    Marc Lipoff

    01/26/2021, 3:27 PM
    Using the functional api, how can i set a dependency when the outputs of a task dont depend on another. for example, i have 3 tasks: read, delete write. I want to read some data, then delete the old stuff, and then right the new stuff. "deleting" does not take any inputs; but we want to wait until "reading" is done before we do it.
    🙌 1
    e
    • 2
    • 12
  • b

    Ben Wolz

    01/26/2021, 4:18 PM
    Hi all, I'm currently running a simple flow with a Dask Executor that's spinning up a FargateCluster, but I'm getting a strange error that I can't find anywhere else online. When I run my flow with a DaskExecutor set up for a FargateCluster, I get a set of RunTimeErrors which say "IOLoop is closed". I've attached a .txt file of the code as well as the stack trace, if anyone has any advice it would be greatly appreciated! Thank you in advance
    code.txterrortrace.txt
    j
    • 2
    • 5
  • j

    Jeff Brainerd

    01/26/2021, 8:42 PM
    Hey 😛refect: team, a question about migrating to RunConfigs: We’re on Prefect 0.14.4 and looking to migrate off of LocalEnvironment (also using DaskExecutor + FargateAgent). We are currently relying on LocalEnvironment’s
    on_start
    arg in order to bootstrap django when our flow runs. I’m not seeing any replacement for that in the new RunConfig world. Did I miss something… or is there another pattern for this? Thanks!
    a
    j
    • 3
    • 4
  • a

    Amanda Wee

    01/26/2021, 10:01 PM
    Hi friends, I have flows using S3 storage with the local executor and local agent, all wrapped up in a Docker image that is deployed in an ECS service (as per my wider team requirements). It works, but I'm not satisfied with the fact that we're basically reserving a fair bit of CPU and memory on the EC2 instance(s) for a bunch of flows that collectively run across only two hours each day. One option might be to switch to Docker storage with the ECS agent to run the flows as Fargate tasks. The catch is that because of our ECS setup, this may mean some kind of Docker-in-Docker solution, which seems to be a mess. Another option might be to stick with S3 storage, but make use of the Dask executor (and
    dask_cloudprovider.aws.FargateCluster
    plus adaptive scaling) with the local agent. My thinking is that I could separately build a Docker image that has all the required flow dependencies (they're the same for all the flows) and upload that to ECR during CI/CD, and then the local agent can pull from ECR and initialise the Dask executor with this image, while also initialising it with the flows retrieved from S3 storage. Is this feasible?
    j
    s
    • 3
    • 21
  • h

    Henry

    01/26/2021, 10:23 PM
    can a prefect flow's results be piped to another prefect flow using flow-to-flow?
    m
    • 2
    • 1
  • j

    Jim Crist-Harif

    01/26/2021, 11:45 PM
    Prefect 0.14.5 has now been released!!! 🎉 This release: • Improves the quality of logs, warnings, and errors across several parts of the codebase, leading to a better user experience • Adds support for installing extra dependencies at runtime in our
    prefecthq/prefect
    images via setting a
    EXTRA_PIP_PACKAGES
    environment variable • Includes several improvements to agents, notably including a new
    execution_role_arn
    kwarg to both `ECSAgent`/`ECSRun` run config Big thanks to our community contributors this week (Loïc Macherel and Thomas Baldwin) 👏 The full release notes can be found here: https://github.com/PrefectHQ/prefect/releases/tag/0.14.5. Happy Engineering!
    🚀 11
    🎉 8
  • b

    Billy McMonagle

    01/27/2021, 2:21 AM
    I have a question regarding
    KubernetesRun
    ... is there a direct way to set metadata labels on the flow run job/pod, or is a custom job template necessary? I'm specifically interested in tagging my jobs so that datadog is able to collect metrics.
    j
    c
    m
    • 4
    • 5
  • j

    Joyce Xu

    01/27/2021, 3:03 AM
    Hi all. My team is using the
    StartFlowRun
    task to build "flow-within-a-flow" pipelines, i.e. Flow A includes the
    StartFlowRun
    task X, and task X starts Flow B. We would like to use the graphql API to track metadata on our flows. Is there a way to query flows such that for Flow B, we can see that it corresponds to task X, or at least, that it is originated from Flow A?
    c
    m
    • 3
    • 5
Powered by Linen
Title
j

Joyce Xu

01/27/2021, 3:03 AM
Hi all. My team is using the
StartFlowRun
task to build "flow-within-a-flow" pipelines, i.e. Flow A includes the
StartFlowRun
task X, and task X starts Flow B. We would like to use the graphql API to track metadata on our flows. Is there a way to query flows such that for Flow B, we can see that it corresponds to task X, or at least, that it is originated from Flow A?
c

Chris White

01/27/2021, 3:49 AM
Hi Joyce - assuming you are using a recent enough version of Prefect Core, you should be able to query for flow runs that have an idempotency key matching a task run ID of task X
:upvote: 1
🙌 1
j

Joyce Xu

01/27/2021, 10:09 PM
Thanks @Chris White, that worked!
c

Chris White

01/27/2021, 10:10 PM
Awesome - glad I could help!
@Marvin archive “How to identify the parent Flow Run / Task Run that created another Flow Run using the StartFlowRun Task”
m

Marvin

01/27/2021, 10:11 PM
https://github.com/PrefectHQ/prefect/issues/4026
View count: 1