https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Michael

    09/10/2021, 8:08 AM
    Is it possible to know at runtime whether a Parameter has taken its default value or been overridden?
    g
    k
    • 3
    • 5
  • t

    Thomas Fredriksen

    09/10/2021, 8:10 AM
    message has been deleted
    k
    • 2
    • 2
  • t

    Terawat T

    09/10/2021, 8:50 AM
    Hi ! , Is there anyone successfully submitted a flow in Openshift ? I was getting the prefect agent up and running in the cluster and try to execute the flow in it, but the state got stuck into 'Submitted for execution'. The debug log in agent didn't help much.
    DEBUG:agent:Querying for ready flow runs...
    [xx] DEBUG - agent | Creating namespaced job prefect-job-c4a440b5
    DEBUG:agent:Creating namespaced job prefect-job-c4a440b5
    [xx] DEBUG - agent | Job prefect-job-c4a440b5 created
    DEBUG:agent:Job prefect-job-c4a440b5 created
    [xx] INFO - agent | Completed deployment of flow run 549ced42-7840-43d6-89da-6f1a1f9cb378
    INFO:agent:Completed deployment of flow run 549ced42-7840-43d6-89da-6f1a1f9cb378
    k
    • 2
    • 6
  • n

    Nacho Rodriguez

    09/10/2021, 8:53 AM
    Hi, Today, after using our CI/CD pipeline to push some changes to our PrefectCloud account, the information of the flow runs from earlier versions have vanished. The version numbers are right, but is like the olders never existed. Is this the intended behaviour?
    k
    • 2
    • 9
  • t

    Tomoyuki NAKAMURA

    09/10/2021, 8:55 AM
    Hi, I am running a flow using DaskExecutor, after the Dask cluster starts up successfully, the flow encounters an error and goes FAILED, when I rerun the flow I get the following error.
    HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"services \"foobar\" already exists","reason":"AlreadyExists","details":{"name":"foobar","kind":"services"},"code":409}
    It seems that the error occurs when I try to create a service with the same name. How can I rerun the flow when using DaskExecutor?
    m
    • 2
    • 2
  • a

    Abhishek

    09/10/2021, 9:58 AM
    Hello folks, is it fine/possible to use slack notifier from API functions (
    prefect.utilities.notifications.notifications.slack_notifier()
    ) as a state handler for Flow?
    a
    • 2
    • 5
  • j

    Jan Vlčinský

    09/10/2021, 10:31 AM
    We need
    ResourceManager
    `map`ped to list of days. We have a task, archiving data for one day. The task is using temporary dir (mapped to shared memory) and we use ResourceManager to cleanup the tmp dir when a day is processed (there is day subdir per processed day). Now we want to run the same for list of days and we struggle with mapping days to resource manager.
    k
    • 2
    • 11
  • m

    Marc

    09/10/2021, 12:43 PM
    hey community 👋 🙂 , Prefect is one of the option to adopt inside the company, but we would like to resolve our use case. A simple batch process like running a job in a EMR cluster, which would be the approach using 😛refect:? Airflow has some good adapters, but I am curious if we could do the same using prefect
    k
    • 2
    • 9
  • b

    Brad I

    09/10/2021, 12:45 PM
    Hi, does anyone know how to set the service account for the dask workers in an ephemeral cluster? This may be more of a dask question but the k8s pods are being set to
    default
    and was wondering if anyone ran into the same issue. Example executor code is in the thread.
    k
    • 2
    • 6
  • f

    Filip Lindvall

    09/10/2021, 1:01 PM
    I'm trying to configure Run schedules for flows using the API. I can't find any documentation for it. Nothing in the CLI or anything. Any pointers on where to look?
    ✅ 1
    k
    • 2
    • 3
  • n

    Nikolay

    09/10/2021, 1:32 PM
    Hi! Does anybody know what’s the best way to initiate/run prefect job from gitlab ci/cd pipeline?
    ✅ 1
    f
    m
    • 3
    • 4
  • k

    Kyle McChesney

    09/10/2021, 2:52 PM
    I am writing a small Lambda function to trigger a prefect flow run on demand. I was wondering if anyone has done similar and could let me know if these queries look right • given a flow name, get the latest version’s id
    LATEST_FLOW_BY_NAME = gql(
        '''
            query LatestFlowByName($name: String) {
              flow(
                where: {name: {_eq: $name}},
                order_by: {version: desc},
                limit: 1,
              )
              {
                id
              }
            }
        ''',
    )
    with:
    variable_values={'name': name}
    • trigger a flow run
    CREATE_FLOW_RUN = gql(
        '''
            mutation CreateFlowRun($input: create_flow_run_input!) {
                create_flow_run(input: $input) {id}
            }
        ''',
    )
    with:
    variable_values={
        'input': {
            'flow_id': flow_id,
            'parameters': parameters,
        },
    },
    My goal here is to specify as little as possible, to ensure the flow runs with the defaults configured for it. I.E. I don’t want to muck with the RunConfig, etc. I am mostly concerned about my logic for getting the latest flow run. I tried to grok the flow groups / versions tables but did not have much luck.
    k
    • 2
    • 6
  • b

    Ben Sack

    09/10/2021, 3:00 PM
    Hi, I was wondering if it is possible to grab/filter by a flow parameter’s value via a GraphQL query. I’ve been messing around with the interactive API and can see you can filter
    flow_runs
    by parameter by using something like:
    flow_runs(where:{parameters: {_has_key: "process_date"}}
    but what I would like to do is filter the query for any flows that ran on a specific process date, such as, 8/17/2021, rather than filtering for flows that have the
    process_date
    parameter. Thanks!
    k
    n
    • 3
    • 54
  • m

    Martim Lobao

    09/10/2021, 4:06 PM
    i’m not sure if this is a support question or a feature request: i’ve got a flow of flows set up, meaning that a top level flow kicks off a DAG of other flows i have created. my issue is that i can’t easily access the flow run of the subflows from within the page for the top-level flow run. i can click on
    person-build
    in the schematic view, but that just takes me to the local (“zoomed-in”) view of the task itself, i can’t see the DAG for the person-build flow run. the only workaround I’ve found is to inspect the logs and copy the link to the flow run from there. am i missing something obvious or is there no easy way to access a subflow’s details? this is basically our setup:
    person_build_flow = StartFlowRun(flow_name="person-build", project_name=get_stage(), wait=True)
    release_flow = StartFlowRun(flow_name="release", project_name=get_stage(), wait=True)
    
    with Flow(
        "build-then-release",
        executor=LocalDaskExecutor(num_workers=8),
        result=PrefectResult(),
    ) as flow:
        release_flow(upstream_tasks=[person_build_flow])
    k
    j
    • 3
    • 5
  • p

    Pedro Machado

    09/10/2021, 4:12 PM
    Hi there. I just had something strange happen on Prefect Cloud and was wondering why. I have a flow that was running every night. Another person registered a new version of the flow (version 9). Until now, I had been the only person registering this flow. Before the new flow version was registered, I could see the run history but once the new version got registered, I cannot see the history anymore. Also, I don't see the prior versions in the version drop down.
    k
    • 2
    • 3
  • p

    Pedro Machado

    09/10/2021, 4:14 PM
  • c

    Charles Liu

    09/10/2021, 5:24 PM
    Hey all, this is more of a front end observation. So on an actual day-to-day basis, if the front end is used for monitoring, I'm worried it will get exceedingly hard to tell at a glance what is scheduled and what has run, as it all seems to be color coordinated in the same way and the jobs are all obfuscated behind run pseudonyms. I'm currently running different schedules for just one pipeline, and am already not sure which one was which without clicking down to the parameters in each individual flow. Guessing at which one was which based on the height of the time-taken-to-run bar isn't exactly helpful either. Any thoughts on how to make things more clear?
    k
    • 2
    • 1
  • d

    David Jenkins

    09/10/2021, 7:08 PM
    I have a state handler that checks for is_failed() and sets the state to Skipped if that is the case. This is a requirement. Something like this
    def handler(task, old_state, new_state):
        if new_state.is_failed():
            // do stuff here
            return Skipped()
        return new_state
    the consequence of this is that the flow finishes with a SUCCESS state, even though a task actually failed, but I need to show the flow as FAILED. To solve this, I created a new flow that calls a different state handler and checks its state. If the state is skipped, I return Failed() from the handler. Something like this:
    @task(name="final check", state_handlers=[final_check_handler])
    def final_check():
        pass
    and the state handler
    def final_check_handler(task, old_state, new_state):
        if new_state.is_skipped():
            return Failed()
    This accomplishes what I want, but surely there has to be a better way.
    k
    • 2
    • 4
  • h

    Henry

    09/10/2021, 8:56 PM
    Is it possible to send PandaSerializer results to a flow triggered by StartFlowRun?
    k
    • 2
    • 5
  • a

    An Hoang

    09/10/2021, 8:57 PM
    Hi I have a flow represented by the DAG below. I ran it locally with
    flow_result = flow.run()
    and visualized it with state. The log says
    Succeed: all reference tasks succeeded
    but I can see a failure right there in the stateful DAG. Also it doesn't show the full DAG with the downstream tasks!!! I have also attached the output of
    flow_result.result
    and it doesn't show the
    get_duplicated_pairs_info
    task. Any tips on what went wrong and how I can debug from here? (Full log in comment)
    h
    k
    • 3
    • 10
  • t

    Trevor Campbell

    09/11/2021, 12:33 AM
    Hi folks! I am trying to use Prefect, and found that it is rather slow for my use case. I thought it might be a mistake I made, but I've created a simple reproducible example that illustrates the issue. Basically: it seems context creation at the start of
    task_run
    is what is causing a huge amount of overhead. I would expect context creation to be very light weight... more details in the thread below.
    c
    • 2
    • 22
  • f

    Frederick Thomas

    09/11/2021, 12:51 AM
    Hi All, Is there away to configure the dashboard to use colors other than red & green? We have a color blind individual who can't distinguish red from green. Thanks
    k
    • 2
    • 2
  • k

    Ken Nguyen

    09/11/2021, 1:48 AM
    Hey! I'm currently following this guide by George Coyne from Slate to set up dbt orchestration with prefect. Does anyone have suggestions for other guides I can read into? My biggest roadblock right now is being able to pull the dbt repo on GitHub, would love any tips y'all have!!
    k
    • 2
    • 8
  • a

    Abhishek

    09/11/2021, 7:50 AM
    Folks, i am triggering a flow run via local prefect server (through UI - “quick run button”) but it keeps scheduling (shown yellow dot in graph) and never runs. Task state always shows “pending” any idea what could be wrong?
    a
    • 2
    • 9
  • a

    Abhishek

    09/11/2021, 7:51 AM
    i have registered the project as well.
  • a

    Abhishek

    09/11/2021, 8:09 AM
    screenshot:
  • a

    Abhishek

    09/11/2021, 8:11 AM
    the flow runs if it do
    flow.run()
    in code but triggering from UI is not working 🧐
  • a

    Abhishek

    09/11/2021, 8:28 AM
    I tried universal IT solution “*Did you turn it off and on again*” - did not work :-)
  • a

    Abhishek

    09/11/2021, 9:09 AM
    on cloud the behaviour is same. this is my code snippet:
    with Flow(
        name=flow_name,
        # schedule=schedule,
        state_handlers=[slack_notifier(only_states=[Success]), flow_state_handler],
    ) as flow:
        ---some logic---
            for sync_type, cmd_parameter in sync_commands:
                ---some logic --
                sync_command = (
                    f"{command_prefix} {source_bucket} {destination_bucket} {options}"
                )
                s3_sync_task = ShellTask(
                    command=sync_command,
                    return_all=True,
                    name=task_id,
                    slug=task_id,
                    timeout=timedelta(minutes=task_execution_timeout),
                )()
    
                # Create a task to validate sync commands.
                validate_s3_sync_task = FunctionTask(
                    fn=validate_s3_sync,
                    name="".join(["validate_", task_id]),
                    slug="".join(["validate_", task_id]),
                )(
                    timeout=timedelta(minutes=task_execution_timeout),
                    s3_sync_command=sync_command,
                )
    
                # Define dependencies between tasks.
                s3_sync_task.set_downstream(validate_s3_sync_task)
                
    # flow.run()
    flow.register(project_name="demo-project", labels=tags)
    j
    • 2
    • 4
  • m

    Martim Lobao

    09/12/2021, 9:52 AM
    hi, i’m trying to restart a failed flow of flows, but restarting it always results in a not-very-helpful error. [log in thread] i’ve tried restarting the flow several times (and usually have to hit the restart button twice before it actually submits the flow for execution) and i always get the same error. the logs for first dependent flow (person-build) don’t pick up anything regarding the restart for the parent flow. this is essentially the setup i have for the parent flow: [code in thread]
    k
    • 2
    • 8
Powered by Linen
Title
m

Martim Lobao

09/12/2021, 9:52 AM
hi, i’m trying to restart a failed flow of flows, but restarting it always results in a not-very-helpful error. [log in thread] i’ve tried restarting the flow several times (and usually have to hit the restart button twice before it actually submits the flow for execution) and i always get the same error. the logs for first dependent flow (person-build) don’t pick up anything regarding the restart for the parent flow. this is essentially the setup i have for the parent flow: [code in thread]
k

Kevin Kho

09/13/2021, 3:03 PM
Hey @Martim Lobao, regarding the delay when hitting the restart button. Chatted with the team about it. The “delay” may be from the API request + waiting for the agent to pick it up. The agent polls in a 10 second loop so it may be that the agent is just taking a bit of time to pick it up. Is it one of the StartFlowRun tasks that fail? Are there more logs on the page of the sub flow run/
Also if you get the chance, could you move either the traceback or flow code to the thread to free up space in the main channel?
👍 1
m

Martim Lobao

09/13/2021, 3:05 PM
prefect log:
INFO
    martim_peopledatalabs_com restarted this flow run
INFO
    martim_peopledatalabs_com restarted this flow run
INFO agent
    Submitted for execution: Task arn:aws:ecs:us-west-2:5567...
INFO GitHub
    Downloading flow from GitHub storage - repo: 'peopledatalabs/prefect', path: 'src/pdlapps/orchestration/flows/build_then_release.py', ref: 'prefect-testing'
INFO GitHub
    Flow successfully downloaded. Using commit: bac24...
INFO CloudFlowRunner
    Beginning Flow run for 'build-then-release'
INFO CloudTaskRunner
    Task 'Flow person-build': Starting task run...
INFO Flow person-build
    Flow Run: <https://cloud.prefect.io/pdl/flow-run/5133d6be>...
INFO CloudTaskRunner
    FAIL signal raised: FAIL('5133d6be... finished in state <Failed: "Some reference tasks failed.">')
flow setup:
person_build_flow = StartFlowRun(flow_name="person-build", project_name=get_stage(), wait=True)
release_flow = StartFlowRun(flow_name="release", project_name=get_stage(), wait=True)

with Flow(
    "build-then-release",
    executor=LocalDaskExecutor(num_workers=8),
    result=PrefectResult(),
    state_handlers=[slack_notifier, terminate_on_cancel],
) as flow:
    release_flow(upstream_tasks=[person_build_flow])
k

Kevin Kho

09/13/2021, 3:09 PM
Thanks for moving! Yeah I think the failed logs and error would appear in the subflow logs
m

Martim Lobao

09/13/2021, 3:11 PM
hey @Kevin Kho, thanks for the reply! I’ve noticed that when i first click the retry button, I often get an error message at the bottom of the page with a message along the lines of “unfortunately, the job was not able to restart”. I’ll copy the message next time it pops up, but it’s been happening pretty consistently. the `StartFlowRun`s function normally when first called, this is a case where the job failed for whatever reason and needs to be restarted. in this case, I try to restart the parent “build-then-release” flow, but it appears to get an error when trying to restart the child flows. here, it seems to try to start the failed “person-build” flow run but fails because “some reference tasks failed”. The person-build run that was started by the parent flow doesn’t contain any log after it first failed.
k

Kevin Kho

09/13/2021, 3:16 PM
Ohh I know what you are saying. Restarting a main flow just returns the result of the sub flows. Here is why:
StartFlowRun
has an idempotency key by default. The idempotency key will try to start a new flow run, but because there was already an existing run with the same idempotency key so it won’t restart that sub flow. The subflow needs to be individually restarted
So what needs to happen is that you need to cache the results of the subflow and then kick off a new flow run from the main flow by supplying a different idempotency key.
m

Martim Lobao

09/13/2021, 3:19 PM
hmm, is that intended? so the restart button in a flow of flows will never work and i have to create a new run each time?
View count: 1