https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    John Shearer

    12/17/2021, 2:33 PM
    Hi, is there a way to delete a specific flow-run? or the logs associated with a specific flow-run? (for the case where logs may have non-redacted private data in, for example)
    a
    • 2
    • 1
  • p

    Pedro Machado

    12/17/2021, 3:30 PM
    Hi. I have a mapped task that pulls data from an API and passes it to a reduce task that saves the data to S3. Some of the API calls may fail and that is OK. I am using a
    FilterTask
    as explained here to allow the reduce task to save the data. It works ok with small results, but when I try it with the real API output, I am getting
    State payload is too large
    I tried setting
    result=None
    for this task but that did not fix it. Any suggestions?
    Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State payload is too large.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers
        state = self.client.set_task_run_state(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1917, in set_task_run_state
        result = self.graphql(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 569, in graphql
        raise ClientError(result["errors"])
    prefect.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State payload is too large.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    I also tried setting the trigger on the reduce task to
    trigger=any_successful
    and it worked but I wonder why the filter task is not working. Thanks!
    k
    a
    • 3
    • 2
  • a

    Alejandro Sanchez Losa

    12/17/2021, 3:44 PM
    hi there !!! nice to meet you all guys
    šŸ‘‹ 4
    šŸ‘ 3
    k
    • 2
    • 2
  • a

    Alejandro Sanchez Losa

    12/17/2021, 4:12 PM
    I am trying to get the flow ID after register… is there any way ?
    k
    a
    • 3
    • 36
  • j

    Jadei

    12/17/2021, 5:45 PM
    hello, is it possible to run dependent flows (flow of flows) like shown here https://docs.prefect.io/core/idioms/flow-to-flow.html without registering to the BE server. For example create the flow runs and call
    from prefect import Flow
    from prefect.schedules import CronSchedule
    from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
    
    
    weekday_schedule = CronSchedule(
        "30 9 * * 1-5", start_date=pendulum.now(tz="US/Eastern")
    )
    
    
    with Flow("parent-flow", schedule=weekday_schedule) as flow:
        
        # assumes you have registered the following flows in a project named "examples"
        flow_a = create_flow_run(flow_name="A", project_name="examples")
        wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)
        
        flow_b = create_flow_run(flow_name="B", project_name="examples")
        wait_for_flow_b = wait_for_flow_run(flow_b, raise_final_state=True)
        
        flow_c = create_flow_run(flow_name="C", project_name="examples")
        wait_for_flow_c = wait_for_flow_run(flow_c, raise_final_state=True)
        
        flow_d = create_flow_run(flow_name="D", project_name="examples")
        wait_for_flow_d = wait_for_flow_run(flow_d, raise_final_state=True)
        
        b = wait_for_flow_b(upstream_tasks=[wait_for_flow_a])
        c = wait_for_flow_c(upstream_tasks=[wait_for_flow_a])
        d = wait_for_flow_d(upstream_tasks=[b, c])
    flow.run()
    m
    k
    • 3
    • 15
  • c

    Constantino Schillebeeckx

    12/17/2021, 5:52 PM
    I'm trying to do a conditional flow like the one shown below:
    @task
    def check_cond():
        return False
    
    
    @task
    def run_task(anchor_date, days_from):
    
        logger = prefect.context.logger
        logger.critical(f"{days_from=}")
    
    
    with Flow('foo') as flow:
    
        anchor_date = Parameter(name="anchor_date", default=None)
        cond = check_cond()
    
        with case(cond, True):
            anchor_date = run_task(anchor_date, days_from=0)
    
        with case(cond, False):
            anchor_date = run_task(anchor_date, days_from=-1)
    however the
    False
    condition never gets run. When
    check_cond
    returns
    True
    - the flows runs as expected. Am I missing something?
    k
    • 2
    • 18
  • c

    Chris Reuter

    12/17/2021, 8:55 PM
    Join us

    on Youtubeā–¾

    in 5 minutes for a FIRE~sale~side Chat with @Jeremiah and @Chris White! We'll be discussing šŸ”„ topics and, as promised, The Matrix 4 (I'm not joking).
    :upvote: 3
    šŸ”„ 1
  • d

    Danny Vilela

    12/17/2021, 9:51 PM
    Hi — when is the
    today
    template filled in a result
    location
    or task
    target
    string? I keep getting a `KeyError`:
    from prefect.engine.results import LocalResult
    
    # Create a new local result pointing to current working directory.
    result = LocalResult(dir=".", location="{today}.txt")
    assert result.location == "{today}.txt"
    
    # This raises `KeyError: 'today'`.
    result.write(value_="hello!")
    I’m following this doc: https://docs.prefect.io/core/concepts/results.html#choose-a-serializer (under ā€œTemplating
    Result
    locationsā€)
    k
    • 2
    • 8
  • o

    Ovo Ojameruaye

    12/17/2021, 10:44 PM
    Hi, I am experiencing some weird behavior using prefect server. I can run a flow on prefect core, it executes completely but when I spin up the UI and run the exact same flow on a local agent (same machine and environment), I get an ModuleNotFoundError. I have modules I import as part of my flow
    k
    a
    r
    • 4
    • 9
  • d

    Danny Vilela

    12/17/2021, 10:58 PM
    Is there a way to reference a task output in a result
    location
    or task
    target
    ? I know the Prefect context gives you a bunch of variables – notably including
    parameters
    , which looks promising! – but what if we have some data (the strings
    "foo"
    or
    "bar"
    ) computed by some task
    compute_foo_or_bar
    and we’d like to reference that output in a subsequent task’s output target?
    k
    • 2
    • 4
  • j

    Jelle Vegter

    12/18/2021, 12:40 PM
    Hey all, my docker agent stopped being able to auth (error 500) to my Gitlab storage. I don't think I changed anything to the Prefect Agent/Gitlab settings so I don't know what caused it. Any ideas?
    a
    • 2
    • 7
  • a

    Alejandro Sanchez Losa

    12/18/2021, 2:30 PM
    hi guys i am not sure I am using correctly prefect … anyone know where I can buy a course to learn more easy and deeply about prefect and best practices ?
    a
    • 2
    • 2
  • o

    Ovo Ojameruaye

    12/19/2021, 1:31 AM
    Hi everyone, I am running prefect server on my linux and I want someone else in my team to be able view the UI on their own machine. Typically to do this, I simply share <IP>:<port_number> and this works like a charm (I do this with airflow and streamlit while testing). We are on the same network. I tried the same for prefect (<IP>:8080) but it can't be reached. I am still learning about prefect and not sure why this behavior is different. Would appreciate any help Update: I tried to start a local agent from a different machine and I get
    ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it
    . I can't tell if this is related. I created a config.toml file
    # base configuration directory 
    home_dir = "C:/Users/XXX/.prefect"
    
    backend = "server"
    
    [server]
    host = "<http://XXXXXX>"
    port = "4200"
    host_port = "4200"
    endpoint = "${server.host}:${server.port}"
    I am certain the server is running on the host ip address
    a
    • 2
    • 8
  • a

    Anh Nguyen

    12/19/2021, 12:47 PM
    Hi all! how to kick-off flow defend on another flow is success? Thanks
    a
    • 2
    • 1
  • y

    Yash

    12/19/2021, 1:19 PM
    Hi, is there any prefect graphql api video tutorial or blog posts.. Thanks!!!
    a
    • 2
    • 3
  • a

    ale

    12/20/2021, 9:47 AM
    Hey folks šŸ˜’imple_smile: We are using ECS Agent to execute flows. Is there a way to apply custom tags to the ECS task that is executed?
    a
    • 2
    • 9
  • s

    Samuel Hinton

    12/20/2021, 1:14 PM
    Hi all! Is anyone aware of a way of timing out a flow itself and not just the tasks? Ie Task has
    timeout
    which we can pass in, but Im currently experience some odd issues where tasks seem to be lost in dask somewhere (they are submitted to dask but never come back, never time out), and this means my flows never end. Ideally Ill try to dig into our env and dask and prefect and figure out what is causing silent untracked failure, but as an interim solution, does anyone know of a way I can say ā€œCancel the flows and all tasks if its been an hour since you started?ā€
    a
    s
    • 3
    • 9
  • b

    Brian Phillips

    12/20/2021, 1:52 PM
    Is anyone else having trouble logging into the UI? Status page is all green, but no one on my team can login
    :plus-one: 7
    j
    m
    k
    • 4
    • 6
  • j

    jars

    12/20/2021, 1:54 PM
    Yes
  • a

    Anna Geller

    12/20/2021, 1:59 PM
    @jars @Brian Phillips and everyone else: We are aware of the issue and investigating it. We’ll update in the announcements channel once we know more.
    šŸ™ 8
    šŸ–– 2
    j
    • 2
    • 3
  • a

    Anna Geller

    12/20/2021, 2:24 PM
    All services are available again - we’ve updated the status page.
    šŸ™Œ 7
  • j

    jars

    12/20/2021, 2:29 PM
    Thank you!
    šŸ‘ 2
  • d

    Daniel Komisar

    12/20/2021, 3:21 PM
    Hey everyone, what is the best way of finding out what labels a flow run was run with while executing inside the flow?
    a
    • 2
    • 2
  • d

    Dekel R

    12/20/2021, 3:25 PM
    Hey everyone, I have a couple of flows that are running just fine and now I get this new error when trying to register a new flow -
    raise InterruptedError(line.get("error"))
          InterruptedError: name invalid: Request contains an invalid argument.
    In order to register my flow I use the following command -
    prefect register --project "Project_name" --path path_to_flow_file.py --name "brands-recognition"
    Project_name is the project I have at Prefect cloud path_to_flow_file is the path to the file which contains my flow brands-recognition is the flow name. This is my flow snippet (for debug purposes - just 2 tasks)-
    schedule = IntervalSchedule(interval=timedelta(days=7))
    
    with Flow("brands-recognition",
              storage=Docker(registry_url="us-central1-docker.pkg.dev/xxx/",
                             dockerfile="./Dockerfile"), schedule=schedule) as flow:  # , schedule=schedule
        mode = Parameter('FULL_UPDATE', default=None)
        task_a(mode)
    
    flow.run_config = VertexRun(machine_type='e2-standard-8', labels=["ml"],
                                service_account='prefect-integration@xx')
    
    # flow.run()
    Now when running locally - the flow runs smoothly and generates a result. It seems like the problem is only at the pushing phase (when I register the flow). What am I missing? Thanks.
    a
    • 2
    • 8
  • c

    Constantino Schillebeeckx

    12/20/2021, 3:35 PM
    hi, is this behavior normal in the UI? I find it pretty frustrating that the
    Run History
    keeps re-materializing as I scroll
    a
    м
    • 3
    • 7
  • a

    Alejandro Sanchez Losa

    12/20/2021, 4:39 PM
    Hi every body !!! Let me share with you all an issue I have: First that is my code
    a
    • 2
    • 15
  • a

    Alejandro Sanchez Losa

    12/20/2021, 4:41 PM
    This is the python that run the flow after …
    a
    • 2
    • 4
  • b

    Billy McMonagle

    12/20/2021, 4:51 PM
    Good morning, I have a question about the Prefect Cloud incident earlier today...
    m
    y
    • 3
    • 12
  • v

    Vipul

    12/20/2021, 5:00 PM
    Hello, Is it possible to get a run history for all the flows using the GraphQL query rather than looking at each flow?
    a
    • 2
    • 8
  • k

    Kyle Heath

    12/20/2021, 9:01 PM
    I have a question about prefect cloud billing. If I run a single map task on a list of 10K items. Do I get charged for 1 task or for 10K tasks?
    a
    • 2
    • 2
Powered by Linen
Title
k

Kyle Heath

12/20/2021, 9:01 PM
I have a question about prefect cloud billing. If I run a single map task on a list of 10K items. Do I get charged for 1 task or for 10K tasks?
a

Anna Geller

12/20/2021, 9:14 PM
@Kyle Heath a direct answer: yes, you will be charged for 10k tasks (as long as those mapped tasks are successful and take longer than 1 second) because mapping generates first-class task run objects that work the same way as any other tasks and include their own retries, observability, etc. an indirect answer: I wouldn’t view this as being priced per task really. You pay for the platform in general - the reason why pricing is based on the number of task runs is because this is a visible metric of Prefect adoption. As you run increasingly more tasks, you’ll probably also gain more and more value out of Prefect
btw if you contact sales@prefect.io we can figure out the best plan based on your use case
View count: 5