https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Mohit Singhal

    11/29/2022, 7:07 AM
    Hi Everyone, I am having issues in printing logs I am running task parallely and in each task I am creating log as logger = get_run_logger() and when I am writing logger.info("----"), logs are not getting printed but if I run the same task in seq then logging is working. not sure why
    k
    • 2
    • 1
  • m

    Mohit Singhal

    11/29/2022, 7:25 AM
    any idea for the above query?
  • a

    Andreas Nigg

    11/29/2022, 8:19 AM
    Hey 👋 Prefect2.0 Cloud User. For some time now I encounter, that some individual flows runs keep stuck in pending state. This is a flow which runs 96 times per day - and about once or twice per week it gets stuck in run-state "Pending" and never leaves it. (one got in "Pending state 2 days ago and still was there today). No logs in the agent about this flow as well. Without me anything changing, the next flow run which was scheduled 15 Minutes later runs successfully. ( I was able to indentify multiple different flows for which this happened, so it seems not to be related to a specific flow) Any ideas what I can do to debug this issue? I fear it's not really in my hand, is it? And the follow up: Unfortunately, these stuck "pending" tasks block a work queue if concurrency is set to 1. I have a use case where it's from utmost important that only one flow runs at a time - therefore I use the concurrency setting in the queue. However, is there a setting to stop flow runs if they are pending for too long? (Like a pending-timeout?). Or would this be a reasonable feature request?
    :gratitude-thank-you: 1
    ✅ 1
    j
    s
    a
    • 4
    • 12
  • j

    James Zhang

    11/29/2022, 9:04 AM
    hey guys, a quick question, how can i clean up the finished KubernetesJob Pods? i have a flow that runs every 5min, i have to delete hundreds of pods everyday…
    ✅ 1
    r
    • 2
    • 2
  • j

    Javier Ruere

    11/29/2022, 9:26 AM
    Hello! I had a bunch of Flows running for a while. These were v0.something. I begun getting some alerts that they are no longer running and when I log into the dashboard, I have no idea where they are. It looks like an empty account and everything is very different. Where should I go for help on this?
    ✅ 1
    m
    • 2
    • 2
  • s

    Seif Harrathi

    11/29/2022, 9:38 AM
    Hey I'm new with Prefect and I have a small issue. Like I was able to run prefect inside a docker with as S3 Minio everything works fine, even I was able to trigger my flow run via an API call /api/deployments/{id}/create_flow_run. My flow:
    @flow(name="Run Check", description="Flow that controls the data integrity")
    def run_check(layers_data):
        """
        1- Extract data from Arcgis databse
        2- Transform data
        3- Load result into Excel output
        @param layers_data:
        @return:
        """
        # 1- Extract data from  databse
        state = extract_data(layers_data, return_state=True)
        raw = state.result()
        # 2- Transform data
        state = DataTransformer.transform_results(raw, return_state=True)
        data = state.result()
        # 3- Load result into Excel output
        state = DataWriter.load_results(data, return_state=True)
        output = state.result()
        return {"Result": {
            "nb_errors": 12,
            "nb_lines_checked": 456
        },
            "output_path": "path_to_s3"
        }
    My question is how to get the results returned by my flow run like I want to get
    {
      "Result": {
        "nb_errors": 12,
        "nb_lines_checked": 456
      },
      "output_path": "path_to_s3"
    }
    I used the Endpoint :/api/flow_runs/{id} But I I dont the my result in the response 😕 Any help ? Any idea . I thought about saving the result in the S3 bucket than retrieve it but not sure if this is the best practice Thaaaaaaaaaaaanks in advance
    ✅ 1
    s
    • 2
    • 4
  • a

    alvin goh

    11/29/2022, 10:23 AM
    Prefect 2.6.9 Hi all, for prefect 2, how can I use asyncio with upstream task triggers/dependencies? I have an async task that takes as param an array of upstream async task outputs (an array of coros which I have bunched together with asyncio.gather to feed the param). Does prefect help to ensure the task doesn't run if any upstream coroutines fail?
    k
    • 2
    • 3
  • m

    Mihai H

    11/29/2022, 1:30 PM
    I am having an issue ,while pushing lots of task via prefect deployment
  • m

    Mihai H

    11/29/2022, 1:30 PM
    I do start to encounter the following exception
  • m

    Mihai H

    11/29/2022, 1:30 PM
    t
    • 2
    • 5
  • m

    Mihai H

    11/29/2022, 1:31 PM
    Encountered exception during execution:
    Traceback (most recent call last):
      File "/home/et/projects/hiiper-heroes/polygon-venv/lib/python3.10/site-packages/prefect/engine.py", line 612, in orchestrate_flow_run
        waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
      File "/home/et/projects/hiiper-heroes/polygon-venv/lib/python3.10/site-packages/prefect/engine.py", line 1325, in wait_for_task_runs_and_report_crashes
        if not state.type == StateType.CRASHED:
    AttributeError: 'coroutine' object has no attribute 'type'
    03:25:35 PM
    
    Crash detected! Execution was interrupted by an unexpected exception: AttributeError: 'coroutine' object has no attribute 'type'
    m
    • 2
    • 1
  • m

    Mihai H

    11/29/2022, 1:32 PM
    This doesn't happens if I limit the number of tasks that have to be executed (I am using DaskTaskRunner)
    k
    • 2
    • 2
  • k

    Kelvin Garcia

    11/29/2022, 1:51 PM
    hello! is there a way to retrieve a parameter value from a function call
    on_failure
    function handler either from the
    flow
    variable or the
    state
    variable are passed to that function?
    k
    • 2
    • 1
  • m

    Mihai H

    11/29/2022, 2:41 PM
    I have tried a different task runnuer
  • m

    Mihai H

    11/29/2022, 2:42 PM
    same issue if I am suing CocncurentTaskRunner
  • m

    Mihai H

    11/29/2022, 2:42 PM
    it just hangs
    k
    • 2
    • 1
  • s

    Slackbot

    11/29/2022, 2:42 PM
    This message was deleted.
  • f

    FuETL

    11/29/2022, 2:45 PM
    Hey everyone, I have a question regarding scheduling of flows run, 1 have one agent running at moment, and i used the:
    create_flow_run(flow_id=flow_id, parameters=parameters)
    But sometimes when theres nothing to run or running, the flow is scheduled to run in like 20~30 min, even when i'm not providing the scheduled_start_time parameter, how can i make my flows always get reschedule immediately? (I was think that this is the default behaviour), maybe is the amount of agent that i'm running? Thanks.
    k
    • 2
    • 5
  • s

    Sunjay

    11/29/2022, 3:04 PM
    I have a flow running with multiple functions which are defined in a custom block. Is there any way to create upstream and downstream dependencies between these functions. Since these are not tasks I can't use wait_for attribute. Any hints or pattern to handle this use case. Orchestrate different functions built in a custom block?
    k
    • 2
    • 1
  • m

    Matt Delacour

    11/29/2022, 3:34 PM
    👋 How can I "batch" my pipeline? I would like to run the full pipeline in batches of ~10k rather than having massive updates where one failure would stop everything. And so I would like to have batches of get_user_ids() run everything in parallel (LocalDaskExecutor), save the results in Redshift, and start again. PS: Another approach would be to build and "incremental" logic from the source data PS2: I am using Prefect 1.0
    Untitled.py
    s
    k
    • 3
    • 4
  • j

    Jean-Michel Provencher

    11/29/2022, 4:40 PM
    Hello. Any plans in the future to have a terraform provider to manage organization resources? Things like workspaces, api keys, and things like that ? It would be pretty useful for enterprise that needs to manage their infrastructure with a fully automated IaC.
    ✅ 3
    ➕ 6
    💯 5
    a
    m
    m
    • 4
    • 14
  • p

    Patrick Tan

    11/29/2022, 4:59 PM
    I am testing rerunning failed flow (Prefect 2.0) and get this message: prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
    ✅ 1
    k
    m
    a
    • 4
    • 13
  • k

    Kalise Richmond

    11/29/2022, 6:03 PM
    Streaming now :youtube:
    🙌 3
    :party-parrot: 1
    💯 1
    🚀 1
    :prefect: 1
    a
    • 2
    • 2
  • b

    Bo

    11/29/2022, 10:01 PM
    Hello, for ECSTask, how can can I set up the block to use a specific task definition revision, rather than creating a new one every time? I'm running into a lot of
    RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family.
    errors
    m
    b
    • 3
    • 9
  • b

    Bradley Hurley

    11/30/2022, 4:17 AM
    Hi Prefect Folks - The documentation for artifacts indicates it supports
    Github-flavored Markdown
    , but I don’t think
    <details>
    are actually supported. I searched for existing GitHub issues, but wasn’t able to find anything.
    👀 1
    k
    • 2
    • 4
  • t

    Tim Galvin

    11/30/2022, 8:51 AM
    Has anyone notice a recent change is behavor of the
    PREFECT_LOGGING_EXTRA_LOGGERS
    mechanism? I am running a DaskTaskExecutor with a
    SLURMCluster
    backend to create the set of
    dask-workers
    . I can see that the loggers in modules I am using in my prefect2 pipeline are beingreported in the
    slurm
    stderr output that is written to disk through the
    sbatch --error log.err
    argument. These messages are following the prefect log handler configurtion ("Time | State | _ name _ - Message"), but they are not being saved to the orion database / presented by the orion UI.
    ✅ 1
    • 1
    • 1
  • v

    Vadym Dytyniak

    11/30/2022, 9:43 AM
    Hi. What is the replacement for flatten in Prefect 2?
    j
    • 2
    • 4
  • e

    eddy davies

    11/30/2022, 10:34 AM
    Is there a version of this diagram for Prefect V2?
    r
    j
    • 3
    • 2
  • r

    roady

    11/30/2022, 10:44 AM
    Would really appreciate some help with this 😁: https://prefect-community.slack.com/archives/CL09KU1K7/p1669371891611149
    p
    • 2
    • 5
  • c

    Christopher

    11/30/2022, 2:29 PM
    My general pipeline flow is "fetch data" -> do various operations on data -> persist results. My first instinct was to split those stages up into separate (potentially mapped) tasks, but I'm worried about the overhead of passing large data blobs between tasks (especially if I move to running things on a cluster of some kind in future, where those tasks could end up running on different machines). Am I right to be worried about that? Is there a best practice?
    ✅ 1
    b
    • 2
    • 1
Powered by Linen
Title
c

Christopher

11/30/2022, 2:29 PM
My general pipeline flow is "fetch data" -> do various operations on data -> persist results. My first instinct was to split those stages up into separate (potentially mapped) tasks, but I'm worried about the overhead of passing large data blobs between tasks (especially if I move to running things on a cluster of some kind in future, where those tasks could end up running on different machines). Am I right to be worried about that? Is there a best practice?
✅ 1
b

Bianca Hoch

11/30/2022, 10:04 PM
Hey Christoper, I'm going to defer to Ryan's post about this since he provided some very useful information on passing large data objects between tasks with Prefect versions 2.6.0 and up. Please reach out here if you have any additional questions on best practices. 😄
View count: 3