https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • w

    William Smith

    08/24/2020, 9:16 AM
    I'm on the developer tier, why is it my flows are stuck in scheduled for a long time sometimes? I don't have anything else taking up a concurrency slot
    j
    • 2
    • 11
  • t

    Trever Mock

    08/24/2020, 1:15 PM
    Good morning! I had a quick question on registering flows. I'm trying to figure out the best way for our team to deploy our Prefect workflows in production. From what I can tell, I've seen two ways: 1 - flow.register() in Python 2- export flow to JSON and import using graphql For option 1, we have a unique situation where we can't run Python command in our production environment. For option 2, some of the docker storage info changes from development to production which means we have to edit the JSON files. So, hopefully a simple question - are there other/better ways to package a workflow for deployment that I might be missing? I can run Python commands in development, but just not in production.
    k
    • 2
    • 2
  • r

    Richard Hughes

    08/24/2020, 3:42 PM
    Hi, I had a vm that abruptly was turned off over the weekend and now I cannot get prefect server UI to work. Is there a troubleshooting guide?
    k
    • 2
    • 22
  • k

    Kyle McEntush

    08/24/2020, 4:19 PM
    Can a state handler launch another prefect task? My end goal is to handle failed tasks via another task. Right now I've implemented everything in a state handler, but I don't get Prefect logging/tracking like I do with a task.
    k
    • 2
    • 4
  • k

    Kyle McEntush

    08/24/2020, 7:30 PM
    Two quick questions: within a flow (using the imperative API), how can I view reference tasks? And how can I set a task to be a reference task or not? My code currently consists of using
    flow.add_task()
    and
    task.set_upstream()
    . Specifically, I want to make sure that any of my triggers are dependent only the tasks that I think they really are. For example, in my pipeline (image attached), I want the trigger for the next task that will come off of the valid_unit_reducer to be triggered by
    any_successful
    for the valid_unit_reducer and not by
    any_successful
    on the invalid_unit_reducer. Maybe this is the default behavior in Prefect, but my current understanding is that triggers are related to all tasks and not just the upstream task immediately before it
    k
    • 2
    • 2
  • m

    Minakshi

    08/25/2020, 12:50 AM
    Hi all, i am getting this error
    ModuleNotFoundError: No module named 'dask.system'
    while importing
    File "/Users/mkorad/PycharmProjects/altruistic-armadillo/src/**", line 1, in <module>
    from prefect import task, Flow, Parameter
    Any idea about this error?
    k
    • 2
    • 12
  • a

    Alfie

    08/25/2020, 4:15 AM
    hi, is there an easy way to figure out agents connecting to apollo? I cannot get my agent pick up a flow run, so suspecting that the flow run is picked up by some other agent started unintenciously. Thanks
    n
    • 2
    • 7
  • r

    Robin

    08/25/2020, 8:42 AM
    Hey there, Is there a general way of how to troubleshoot kubernetes agents? I spun up an
    EKS cluster
    using
    pulumi
    and created an agent locally using
    prefect agent start kubernetes --token <token_id> --label k8s
    . It seems like the kubernetes agent is running correctly (see attached images). However, when I submit a flow that has the same label
    k8s
    , it does not execute (see image)… 1. How do I make sure the kubernetes agent is set up properly? 2. Which tests does prefect cloud already perform by itself, to make sure that an agent is set up properly?
    j
    • 2
    • 11
  • w

    William Smith

    08/25/2020, 10:11 AM
    Hi all, I'm having an issue with a task that has a manual_only trigger, when I resume said task it only resumes after ~10 mins when the Lazurus process picks it up. I would have expected it starts straight away. It works fine if I run the flow on my local machine however if I run it in the cloud then it becomes an issue. I've tried adding a LocalResult to my flow but this hasn't worked, here is a pastebin with a very simple flow so hopefully you can reproduce the issue: https://dpaste.com/9DRBEHZXK
    j
    j
    • 3
    • 11
  • m

    Manuel Mourato

    08/25/2020, 2:10 PM
    Hello all Apologies if this is a basic question, but I am trying to checkpoint the output of a task, like so:
    from prefect.tasks.shell import ShellTask
    from prefect.engine.results import LocalResult
    from datetime import timedelta
    from default_task_handler import tasks_notifications_handler
    import os
    
    os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "true"
    
    a=ShellTask(max_retries=3, retry_delay=timedelta(minutes=60), timeout=1800,
                                    state_handlers=[tasks_notifications_handler],checkpoint=True, result=LocalResult(dir="/home/my-user/weekly_execution"),
                                    command="ls /home/my-user/")
    a.run()
    The task runs, and the
    weekly_execution
    directory is created, but nothing is persisted. What am I doing wrong? Is it mandatory that the task be part of a flow? Thank you UPDATE Indeed if I run the task inside a flow, checkpointing works. Is there a way to do it for individual tasks?
    j
    • 2
    • 2
  • w

    William Smith

    08/25/2020, 2:30 PM
    Are there any examples of calling the LambdaInvoke task? Can't seem to get it working....
    j
    j
    • 3
    • 4
  • l

    Lukas

    08/25/2020, 3:38 PM
    I'm running a flow via Fargate agent and all of a sudden getting this message in the Fargate logs
    [2020-08-25 15:34:14] DEBUG - prefect.CloudFlowRunner | Flow 'Fetch-Authors': start_time has not been reached; ending run.
    . The flow is
    Submitted for execution
    but fargate basically shuts down and stops the task. Any idea why this could happen? I ran this flow lot of times before, never experienced this.
    j
    • 2
    • 5
  • j

    Jason Oban

    08/25/2020, 5:01 PM
    Is there a way to programmatically add and remove extra loggers or can extra loggers only be configured via environment variables or the prefect config?
    j
    • 2
    • 2
  • k

    Kyle McEntush

    08/25/2020, 6:26 PM
    Is there a way to do a full-stop if a task fails? In my graph, the mapped
    validate_unit
    tasks all get
    TriggerFailed
    . My
    invalid_unit_reducer
    is set to trigger for any failures. Is there a way I can set it to trigger for any failures that aren't a trigger failure, but rather a hard-fail? In my setup, a trigger failure should be treated differently than a true failure.
  • k

    Kyle McEntush

    08/25/2020, 6:26 PM
    Apologies for the second post. Forgot to include the image
    j
    • 2
    • 4
  • m

    Marwan Sarieddine

    08/25/2020, 7:39 PM
    Hi folks - I am running into API errors trying to poll the status of a flow run - seems like a Cloud API issue - anyone else facing similar issues ?
    status = client.get_flow_run_info(flow_run_id)
      File "~/.pyenv/versions/3.7.7/envs/etl/lib/python3.7/site-packages/prefect/client/client.py", line 990, in get_flow_run_info
        result = self.graphql(query).data.flow_run_by_pk  # type: ignore
      File "~/.pyenv/versions/3.7.7/envs/etl/lib/python3.7/site-packages/prefect/client/client.py", line 281, in graphql
        retry_on_api_error=retry_on_api_error,
      File "~/.pyenv/versions/3.7.7/envs/etl/lib/python3.7/site-packages/prefect/client/client.py", line 237, in post
        retry_on_api_error=retry_on_api_error,
      File "~/.pyenv/versions/3.7.7/envs/etl/lib/python3.7/site-packages/prefect/client/client.py", line 373, in _request
        token = self.get_auth_token()
      File "~/.pyenv/versions/3.7.7/envs/etl/lib/python3.7/site-packages/prefect/client/client.py", line 503, in get_auth_token
        self._refresh_access_token()
      File "~/.pyenv/versions/3.7.7/envs/etl/lib/python3.7/site-packages/prefect/client/client.py", line 630, in _refresh_access_token
        token=self._refresh_token,
      File "~/.pyenv/versions/3.7.7/envs/etl/lib/python3.7/site-packages/prefect/client/client.py", line 294, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'path': ['refresh_token'], 'message': 'Unable to complete operation', 'extensions': {'code': 'API_ERROR'}}]
    j
    • 2
    • 4
  • m

    Marwan Sarieddine

    08/25/2020, 8:19 PM
    hmm - the prefect cloud UI is not reflecting the correct status of the task runs - I see from the logs that I have tasks that are going into a mapped or cached state but the gantt chart only shows pending Task Runs for some reason - even after refreshing the flow-run page (please see the attached screenshot) -
    👀 1
    j
    m
    c
    • 4
    • 7
  • a

    An Hoang

    08/25/2020, 11:07 PM
    How is everyone versioning the output based on which version of the flow/Task created it? I was thinking of recreating this approach in Prefect, where the
    Task
    object has a version and a hash that is a combination of the version of the current Task object and all Task before it. The output file will contain the final/flow's hash. Any ideas on how to approach this differently?
    j
    • 2
    • 3
  • r

    Riley Hun

    08/26/2020, 1:19 AM
    Hi everyone, Hope you're having a good week so far. Pardon my ignorance, but just a quick question - I am using a static dask cluster hosted on Google Kubernetes Engine on GCP and I have it set to auto-scale depending on CPU usage. Now, I am running a mapped task to unzip and extract some files from several zip folders, but I'm noticing that the number of Dask workers remain at 1. I guess my question is should I expect Prefect mapped tasks to spawn additional Dask workers to accomplish the parallelization? Checking the logs, it looks like it's extracting the files sequentially even though I am using the mapped task.
    m
    • 2
    • 2
  • h

    Howard Cornwell

    08/26/2020, 2:12 PM
    Hey, been searching for a while but having no luck; what’s the correct environment variable for configuring the prefect server db host? I’ve tried
    PREFECT__SERVER__DATABASE__HOST
    but it doesn’t appear to work. Thanks
    j
    • 2
    • 4
  • s

    Slackbot

    08/26/2020, 4:38 PM
    This message was deleted.
    j
    • 2
    • 1
  • p

    Paweł

    08/26/2020, 6:11 PM
    Hello all prefect folks! I have a quick question: can i somehow "force" prefect-ui to use network of container in which its deployed? I deployed whole stack on k8s and right now i need to forward apollo to my localhost to use ui ( i dont want to expose anything without auth)
    j
    • 2
    • 4
  • j

    josh

    08/26/2020, 7:47 PM
    Hey team, Prefect version 
    0.13.4
     has been released and here are a few notable changes: 📚 New databricks task library task (I couldn’t find the brick emoji) 🕵️ Custom YAML for k8s agents 🔗 Coupled versioning for Core / Server / UI A big thank you to our contributors who helped out with this release! Full changelog:
    Untitled
    :prefect: 3
    🚀 6
    :party-parrot: 1
  • r

    Rob Fowler

    08/26/2020, 10:34 PM
    I have an API question. If I have a flow that uses the same task a few times, how can I get the task results on the state when it finishes if the result dictionary is by the task name?
  • r

    Rob Fowler

    08/26/2020, 10:42 PM
    here is a non working example but what I mean
    from prefect import Flow, task, Parameter
    from flows.freq import request
    
    @task(name="Creating an order")
    def create_order(opts, user_agent, auth, js):
        return request(opts, 'POST', f"{opts.try_url}/orders", user_agent, auth, js)
    
    def create_service_requests(delivery_type):
        return {'dt': delivery_type}
    
    with Flow("azure_subscription_change_smacc") as flow:
        opts = Parameter('opts')
        user_agent = Parameter('user_agent')
        auth = Parameter('auth')
        regular_order_id_js = create_service_requests("Regular")
        accelerated_order_id_js = create_service_requests("Accelerated")
        regular_result = create_order(opts, user_agent, auth, regular_order_id_js)
        accelerated_result = create_order(opts, user_agent, auth, accelerated_order_id_js)
    
    if __name__ == '__main__':
        state = flow.run(opts={}, user_agent="blah", auth="blahauth")
  • r

    Rob Fowler

    08/26/2020, 10:44 PM
    Ideally, I'd like to get the regular_result and accelerated_result without using an index like state.result[0] etc
  • r

    Rob Fowler

    08/26/2020, 11:12 PM
    ok got it, if I have the task result objects I can use them as an index into results:
    def check(state):
        print(f"regular result: {state.result[regular_result].result}")
    👍 2
  • b

    Bob Colner

    08/27/2020, 3:17 AM
    After upgrading from prefect core
    0.11.2
    to
    0.13.4
    my flow is failing with an error related to 'Cloud' -which is strange since I'm not using prefect cloud (or local server/UI). Looks like it is related to the slack notifications. Any ideas? (full logs posted in the thread)
    c
    • 2
    • 6
  • a

    Alfie

    08/27/2020, 3:58 AM
    Hi Team, now I’m using local storage to store flows in a local storage. But to my case, it’s a proper solution to store the flow into db. Any guides to achieve on this? Thanks
    k
    • 2
    • 6
  • s

    Sandeep Aggarwal

    08/27/2020, 1:32 PM
    Hello Team, Looks like
    delete_flow_run
    mutation is clashing with Hasura's auto-generated mutation schemas. I am self hosting Prefect and as part of data retention policy, need to cleanup old data objects. I am using below mutation to cleanup old flow/task runs:
    mutation($created_before: timestamptz) {
                delete_flow_run(where: {created: {_lt: $created_before}}) {
                    affected_rows
                }
    
                delete_flow_run_state(where: {created: {_lt: $created_before}}) {
                    affected_rows
                }
    
                delete_log(where: {created: {_lt: $created_before}}) {
                    affected_rows
                }
    
                delete_task_run(where: {created: {_lt: $created_before}}) {
                    affected_rows
                }
    
                delete_task_run_state(where: {created: {_lt: $created_before}}) {
                    affected_rows
                }
            }
    The request fails with below error:
    2020-08-27T12:40:17.586Z {"message":"Unknown argument \"where\" on field \"delete_flow_run\" of type \"Mutation\".","locations":[{"line":3,"column":37}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}
    2020-08-27T12:40:17.586Z {"message":"Cannot query field \"affected_rows\" on type \"success_payload\".","locations":[{"line":7,"column":9}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}
    2020-08-27T12:40:17.586Z {"message":"Field \"delete_flow_run\" argument \"input\" of type \"delete_flow_run_input!\" is required, but it was not provided.","locations":[{"line":3,"column":21}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}
    When I remove
    delete_flow_run
    from above mutation, everything works fine.
    d
    • 2
    • 15
Powered by Linen
Title
s

Sandeep Aggarwal

08/27/2020, 1:32 PM
Hello Team, Looks like
delete_flow_run
mutation is clashing with Hasura's auto-generated mutation schemas. I am self hosting Prefect and as part of data retention policy, need to cleanup old data objects. I am using below mutation to cleanup old flow/task runs:
mutation($created_before: timestamptz) {
            delete_flow_run(where: {created: {_lt: $created_before}}) {
                affected_rows
            }

            delete_flow_run_state(where: {created: {_lt: $created_before}}) {
                affected_rows
            }

            delete_log(where: {created: {_lt: $created_before}}) {
                affected_rows
            }

            delete_task_run(where: {created: {_lt: $created_before}}) {
                affected_rows
            }

            delete_task_run_state(where: {created: {_lt: $created_before}}) {
                affected_rows
            }
        }
The request fails with below error:
2020-08-27T12:40:17.586Z {"message":"Unknown argument \"where\" on field \"delete_flow_run\" of type \"Mutation\".","locations":[{"line":3,"column":37}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}
2020-08-27T12:40:17.586Z {"message":"Cannot query field \"affected_rows\" on type \"success_payload\".","locations":[{"line":7,"column":9}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}
2020-08-27T12:40:17.586Z {"message":"Field \"delete_flow_run\" argument \"input\" of type \"delete_flow_run_input!\" is required, but it was not provided.","locations":[{"line":3,"column":21}],"extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}
When I remove
delete_flow_run
from above mutation, everything works fine.
d

Dylan

08/27/2020, 1:54 PM
Hi @Sandeep Aggarwal, I have great news! You shouldn’t have to do this manually. The database is set to delete all dependent objects on cascade. So, as long as you delete the flow runs, all task runs, logs, etc will be deleted.
Try only deleting Flow Runs with that where clause
affected_rows
does look like something from Hasura’s schema, as our mutations only have
success
and
error
Does
affected_rows
appear in the schema of the Interactive API?
s

Sandeep Aggarwal

08/27/2020, 2:00 PM
Thanks @Dylan for quick response. I am referring hasura docs for autogenerated schemas: https://hasura.io/docs/1.0/graphql/core/mutations/delete.html
This runs fine via Hasura UI, however when I run it via client, it clashes with the one provided by prefect
d

Dylan

08/27/2020, 2:07 PM
That’s correct, we actually filter Hasura’s auto-generated schemas and compose mutations so that they work to achieve a specific goal across many tables
s

Sandeep Aggarwal

08/27/2020, 2:10 PM
Makes sense. The issue is that the prefect's mutation only accepts ID of flow run to be deleted. So in order to use this one, I will have to query all flow_runs and then delete them one by one. Is there any option to perform bulk delete?
d

Dylan

08/27/2020, 2:10 PM
Not at this time, but that’s a great feature request!
Would you mind opening an issue on the server repo?
s

Sandeep Aggarwal

08/27/2020, 2:14 PM
Sure
d

Dylan

08/27/2020, 2:15 PM
Thank you!
And just to be clear, is the above mutation working in the Hasura ui as you expected?
s

Sandeep Aggarwal

08/27/2020, 2:16 PM
Yup it works as expected.
d

Dylan

08/27/2020, 2:16 PM
Great 👍
View count: 1