https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Marvin

    09/18/2020, 6:15 AM
    It gives me a headache just trying to think down to your level. I'll still enter you in the contest, @Lewis Bails.
  • f

    Fanilo A.

    09/18/2020, 7:09 AM
    Hey @Marvin I was star 4949 🙂
  • m

    Marvin

    09/18/2020, 7:09 AM
    Here I am, brain the size of a planet, and they ask me to enter you in a contest. Call that job satisfaction? ’Cos I don’t. I'll still enter you in the contest though, @Fanilo A.
  • c

    Carl R

    09/18/2020, 7:15 AM
    @Marvin
  • m

    Marvin

    09/18/2020, 7:15 AM
    It’s the people you meet in this job that really get you down. You're in the contest anyway, @Carl R.
  • r

    Robin

    09/18/2020, 8:26 AM
    Dear community, today I started playing around with
    max_retries
    and
    retry_delay
    and I got the following error:
    /opt/prefect/healthcheck.py:149: UserWarning: Task <Task: copy_storage> has retry settings but some upstream dependencies do not have result types. See <https://docs.prefect.io/core/concepts/results.html> for more details.
      result_check(flows)
    It's a
    mapped_task
    . I read the documentation about result types but that did not answer several questions: • What might go wrong if I don't set a result type? • If I understood correctly, the result type can be either
    flow
    or
    task
    . I think I want that the result type is flow, since each task should be only run once (if successful) so the result does not change during the flow. Is that correct? • Should I therefore do something like
    with Flow("creative_name", result=flow_result)
    ? And what should I set
    flow_result
    to? Bests Robin
    ✔️ 1
    n
    • 2
    • 4
  • j

    Jacob Blanco

    09/18/2020, 9:31 AM
    I realize this was probably asked a million times, but is there a way to have more unique names for Class-based tasks that are used multiple times in the flow? Right now the UI is very hard to navigate with CreateBlorg or RunThingie coming up over and over again. Even when I set the name inside the flow context it still shows the generic class name in the logs.
    i
    • 2
    • 1
  • v

    Vinod Sugur

    09/18/2020, 1:42 PM
    @Marvin, I just joined the community.
    👋 1
  • m

    Marvin

    09/18/2020, 1:42 PM
    Don't feel you have to take any notice of me, please. I'll just enter you in the contest @Vinod Sugur.
  • d

    Dolor Oculus

    09/18/2020, 4:21 PM
    Hi, I'm running a local prefect server and agent as per docs with:
    prefect backend server
    prefect server start
    prefect agent start local -p .
    server launches ok, ui looks good, I can register a simple hello world flow,
    with Flow("Welcome Flow") as pf:
        print("Hello world")
    pf.register(project_name="Hello, World!")
    Registering this flow results in
    Hello world
    Result Handler check: OK
    Flow: <http://localhost:8080/flow/a5ec20aa-ad9e-4add-977e-cffe9699eba3>
    But when I run the flow from the UI, I get this error message:
    [2020-09-18 16:20:35,115] ERROR - agent | Error while deploying flow: FileNotFoundError(2, "No such file or directory: 'prefect'")
    Any thoughts? ty 🙂
    n
    • 2
    • 4
  • h

    Hamza Ahmed

    09/18/2020, 5:34 PM
    I am running into an issue when using credentials from AWSSecretsManager to fetch data from PostgresFetch as below:
    from prefect import Flow, task
    from prefect.tasks.aws.secrets_manager import AWSSecretsManager
    from prefect.tasks.postgres.postgres import PostgresFetch
    
    @task
    def print_me(to_print):
        print(to_print)
    
    
    with Flow('fetching-data') as flow:
        credentials = AWSSecretsManager(secret='pg/prefectsql/prefect')
        print_me(credentials)
        pg_user = credentials['username']
        pg_pass = credentials['password']
        pg_host = 'localhost'
        pg_port = credentials['port']
        pg_query = 'SELECT * FROM hdb_catalog.hdb_table LIMIT 5;'
        runsql = PostgresFetch(db_name='prefect', user=pg_user, host='localhost')
        result = runsql(password=pg_pass, query=pg_query, fetch='all')
        print_me(result)
    
    flow.run()
    the PostgresFetch initialization doesn't work when I try to use
    user=credentials['username']
    , but it does when I hardcode the username, or even if I set
    pg_user
    to the a string containing the username The error flow run produces the below output:
    n
    s
    • 3
    • 11
  • m

    matt forbes

    09/18/2020, 5:52 PM
    is there a full list of the configuration options that I'm just missing? the config page in the docs just describes /how/ to supply config, but doesn't list the actual config options that are available
    j
    • 2
    • 1
  • j

    jars

    09/18/2020, 8:12 PM
    We noticed after upgrading our GKE agent from 0.11 to 0.13, GKE does not keep our Job history under GKE "Workloads" tab. It used to contain the agent Deployment, and all jobs, but now jobs are automatically getting cleaned-up. Is this expected, and follow-up: is it configurable?
    m
    n
    +2
    • 5
    • 21
  • m

    Minakshi

    09/18/2020, 8:43 PM
    hi team, i am trying to upgrade to the latest version of prefect 0.13.7. While runnning my application it throws this error
    ModuleNotFoundError: No module named 'dask.system'
    I found a resolution for that online namely trying to use the latest version of daks and distributed https://github.com/dask/distributed/issues/3331. But problem here is that i need to add these to our internal repo. So i need to confirm what all are the dependent packages with versions so that i can check or add all at once as required. Where can i get the list for that?
    n
    • 2
    • 2
  • d

    Dolor Oculus

    09/18/2020, 9:33 PM
    for the testing flows example (https://docs.prefect.io/core/idioms/testing-flows.html), is there an equivalent example for using the
    with Flow() as flow:
       e = Extract()
       t = Transform(e)
       l = Load(t)
    state = flow.run()
    syntax? I'm getting key not found results on asserting state.result[e], and wondering if you have to do it in the way given in the testing-flows link above to unit test the flows.
    n
    • 2
    • 5
  • g

    Glen Trudeau

    09/18/2020, 10:14 PM
    Another Question. We were getting
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'sqlalchemy'")
    when running a flow on Fargate agent which I assume is due to the fact that the base docker image doesn’t have that module installed. But when we tried downloading the public image, adding the necessary modules and then uploading to ECR it doesn’t appear that the agent is utilizing the updated image even though we called it inside of the container definitions (reference python wrapper). We see the below screenshot for the new task definition in ECS which means it isn’t pulling the specified image correctly. Any suggestions?
    fargateagent.py
    n
    • 2
    • 8
  • c

    CA Lee

    09/19/2020, 11:08 AM
    Hi, trying out Prefect cloud, I have a Python script that saves a png file using Matplotlib The method
    plt.savefig
    takes in a
    file_path
    arg (below is some sample code):
    CWD = os.path.dirname(os.path.abspath(__file__)) # Get CWD
    file_path = os.path.join(CWD, "picture.png") # Save in CWD
    plt.savefig(file_path)
    This all works fine when I am running it on my server using
    flow.run()
    , however, when it is registered on cloud, invoking a Quick Run results in the error :
    PermissionError: [Errno 13] Permission denied: '/picture.png'
    The error shows that
    file_path
    is unable to be parsed to the current working directory that the script is being run from when triggered from Cloud. Would appreciate any insight.. if it helps, I am getting data from Google Sheets, plotting it and sending it to Slack, so I'd like to Prefect this workflow.
    n
    • 2
    • 3
  • e

    Eric

    09/20/2020, 1:58 AM
    Hi - I'm looking to set up GitHub as Storage, the docs say you need to run this:
  • e

    Eric

    09/20/2020, 1:58 AM
  • e

    Eric

    09/20/2020, 1:59 AM
    can I skip this step if I run this in my flow .py ?
    n
    • 2
    • 36
  • c

    CA Lee

    09/20/2020, 9:54 AM
    Hi, I have 2 tasks - first one fetches data, second one inserts the fetched data into a database The tasks will be run on a TimeInterval schedule. I have cached the first tasks's output. How do I avoid running the second task if the cached output from the first task is still valid i.e. cache age is still < 1 day?
    @task(cache_for=datetime.timedelta(days=1))
    def fetch_data():
        return fetched_data
    
    @task
    def insert_into_database(fetched_data):
        fetched_data.to_sql('table_name', con=db.engine)
    k
    • 2
    • 2
  • i

    itay livni

    09/20/2020, 12:24 PM
    Hi - Is it possible to cache on a key in a dictionary that is a task’s input? Thank you.
  • a

    Anshaj

    09/20/2020, 1:10 PM
    @Marvin
  • m

    Marvin

    09/20/2020, 1:10 PM
    Don't feel you have to take any notice of me, please. I'll just enter you in the contest @Anshaj.
  • j

    Jeff Brainerd

    09/20/2020, 2:48 PM
    Hi 😛refect: team, I’ve been noticing lately we sometimes get a flow that is in a Running state, with all of its tasks either Successful or Pending. The flow will sit in this state until we manually restart it (change one of the tasks to the “Resume” state), at which point the flow will happily continue and finish as normal. We are using Prefect
    0.13.2
    with heartbeats off and Lazarus on. Is this a known issue? Should Lazarus be picking these up? Thanks!
    j
    • 2
    • 2
  • e

    Eric

    09/20/2020, 5:33 PM
    HI, I'm trying to register a flow using the Client class, but getting this error. It seems to be authetnicating properly - am I using the the right api_server URL for the Prefect Cloud? Thank you!
    j
    • 2
    • 23
  • e

    Eric

    09/20/2020, 5:34 PM
  • j

    josh

    09/20/2020, 10:41 PM
    Hey team, realized I forgot to announce it the other day but Prefect version 
    0.13.7
     has been released and here are a few notable changes:   🛠️   Fixes, lots of fixes   🗜️   Data compression for S3 upload/download tasks   🤫   Quieter debug logs when starting server A big thank you to our contributors who helped out with this release! Full changelog:
    Untitled
  • p

    Pedro Machado

    09/21/2020, 12:45 AM
    Hi there! I have a flow with Parameters
    start_date
    and
    end_date
    . If these are not provided, I want to use the
    scheduled_start_time
    from the context to generate the
    start_date
    and
    end_date
    . What is the cleanest way to write this logic given the fact that the parameters will only be available at run time? I currently have a task that takes both parameters and outputs a tuple of dates. This tuple has either the value of the parameters converted to a
    pendulum
    date or the values generated from the
    prefect.context.scheduled_start_time
    also as dates. Is there a better way? This is what the flow looks like now.
    k
    • 2
    • 2
  • c

    CA Lee

    09/21/2020, 1:39 AM
    Found the Prefect tutorial by @Laura Lorenz to be highly informative - looking to deepen understanding and application of core concepts as per Prefect docs. Any chance we get more of such tutorials in the near future? Thanks for this great product!
    ❤️ 1
    l
    • 2
    • 6
Powered by Linen
Title
c

CA Lee

09/21/2020, 1:39 AM
Found the Prefect tutorial by @Laura Lorenz to be highly informative - looking to deepen understanding and application of core concepts as per Prefect docs. Any chance we get more of such tutorials in the near future? Thanks for this great product!
❤️ 1
l

Laura Lorenz

09/21/2020, 2:53 PM
@CA Lee thank you! Do you mean the video tutorial? If so, yes, some more are coming out starting mid October. I’m collaborating with @Kyle Moon-Wright for some too, you may have seen him around the slack answering questions :) I’ll ping you when the next one is up!
c

CA Lee

09/22/2020, 1:10 AM
@Laura Lorenz Yes, I was referring to the video Getting Started with Prefect (PyData Denver). That video really helped me to get started understanding how all the pieces fit together. Really liked your presentation style! It also helped that its a very relatable workflow (obtaining some data, cleaning it then storing it), and I appreciated the toggling between the code base signatures with explanation and also the overall architecture overview. Looking forward to the next one! It would be great if it was something similar in presentation style, but covering the core concepts as per Prefect docs. A generic use case really helps to nail those in 😀
l

Laura Lorenz

09/22/2020, 1:12 AM
You are going to absolutely love what @Kyle Moon-Wright and I are putting together then 🙂 It's exactly an ETL use case evolving from simple to adding on more and more core concepts, typed out in real time with code base/docs overlays! I'm excited to show you 🤗
🤩 1
c

CA Lee

09/22/2020, 1:13 AM
Also, @Kyle Moon-Wright is helping me out on this one, but this follow-up question was actually from watching the video I mentioned:
@task(cache_for=datetime.timedelta(days=1))
def get_complaint_data():
    do something

raw = get_complaint_data()
parsed = parse_complaint_data(raw)
populated_table = store_complaints(parsed)
Question being that lets say some fetching of data (e.g. a web scraping script) is run on an hourly interval. Caching would help prevent the fetching from running again, but how would I then stop the parsing and populating, based on the cached state of the fetching data step? ( as it wouldn't make sense to clean or store the same cached data again )
l

Laura Lorenz

09/22/2020, 1:32 AM
If you still want other downstream tasks of your flow to run, IMHO you can use
cache_keys
to mark that all of those tasks share the same cache, and thus can consider themselves cached as long as that cache key is not invalidated. See https://github.com/PrefectHQ/prefect/blob/master/src/prefect/core/task.py#L156 and the last bullet in https://docs.prefect.io/core/concepts/persistence.html#output-caching (I know the api docs says deprecated there, but I'm pretty sure it's not actually deprecated yet until https://github.com/PrefectHQ/prefect/issues/2619 is done, in which case you would move that configuration onto the result). You could also use a custom trigger (https://docs.prefect.io/api/latest/triggers.html#triggers) since all triggers get their upstream dependency's edges and states (https://github.com/PrefectHQ/prefect/blob/b9914890dfec52610a42cd694427badafab8c8ba/src/prefect/triggers.py#L174) but depending how many other dependencies those tasks have it could get quite tricky, and afaik we don't have a published example that operates on specific upstream tasks to decide a trigger -- it should be possible, we just don't have any examples so you'd have to reverse engineer it a bit 🙂
🤗 1
:upvote: 1
c

CA Lee

09/22/2020, 12:53 PM
Got it - will be spending some time working through those. Thanks for pointing me in the right direction!
View count: 1