https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • t

    Thomas Hoeck

    07/21/2020, 7:33 AM
    Hi all. Is there a way to get more descriptive tracebacks i Prefect Cloud? Currently I'm getting:
    Unexpected error: ZeroDivisionError('division by zero')
    Traceback (most recent call last):
    File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
    File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 952, in get_task_run_state
    self.task.run, timeout=self.task.timeout, **raw_inputs
    File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 182, in timeout_handler
    return fn(*args, **kwargs)
    File "<ipython-input-6-39643be0b44f>", line 9, in hello_task
    ZeroDivisionError: division by zero
    I know it is part of the Hybrid model that code isn't shared but is there a way to get a normal python traceback, so I can see the failed line?
    j
    3 replies · 2 participants
  • s

    Sven Teresniak

    07/21/2020, 8:22 AM
    https://docs.prefect.io/api/latest/environments/storage.html#local <- is
    stored_as_script
    and
    path
    keyword arguments removed? in every storage class? (
    Local
    in my case)
    j
    20 replies · 2 participants
  • s

    Sven Teresniak

    07/21/2020, 9:48 AM
    I don't understand the proper use of the
    context
    dict. The documentation is not more than a hello-world. I'd like to use Prefect's Context to pass some (configuration) constants to task but that's not possible.
    #!/usr/bin/env python
    # coding: utf8
    
    import prefect
    from prefect import task, Flow
    from prefect.environments.storage.local import Local
    
    
    @task
    def print_context():
        prefect.context.get("logger").info("get-val is '%s'", prefect.context.get("val"))
        prefect.context.get("logger").info("dot-val is '%s'", prefect.context.val)
    
    
    with Flow("contexttest", storage=Local(directory="/flows/.prefect/flows")) as flow:
        with prefect.context(val="SPAM"):
            print_context()
    
    if __name__ == "__main__":
        flow.register()
    Will print
    'None'
    and in the second line throws an exception. Thus, the context's
    val
    is only valid in the
    with
    block. But what's the purpose of
    Context
    if not passing some simple constants around? I can also write
    with Flow("contexttest", storage=Local(directory="/flows/.prefect/flows")) as flow:
        prefect.context.val="SPAM"
        print_context()
    with the same result: not available in task.
    j
    3 replies · 2 participants
  • l

    Luis Muniz

    07/21/2020, 10:04 AM
    Hi guys, we are trying to iterate a cursor of a long running query, chunk the results, and dynamically spawn a task that will handle this chunk. The examples I can find now in the documentation use a declarative approach, either by defining the task graph inside the Flow DSL, and using the @task decorator, or using an explicit Task instance, but here too inside the Flow itself. What we would like to have, inside a Task, that is handling a scrollable result set, is to be able to spawn dynamically a undefined number of tasks, because the total size of the result set is unknown. I hope I have been able to frame our use case properly. Here a bit of pseudo-code to illustrate it a little:
    @task 
    def get_data(chunk_size): 
        #fetch connection
        curr = connection.cursor()
        curr.execute(sql.SQL("select id from big_table"))
        collection = curr.fetch_many(chunk_size))
        while (!collection.is_empty()):
           #spawn task(collection) <----- *Here spwan a task*
           collection = curr.fetch_many(chunk_size))
    z
    j
    11 replies · 3 participants
  • b

    Ben Davison

    07/21/2020, 12:36 PM
    Question about Parameters:
    a
    3 replies · 2 participants
  • p

    Preston Marshall

    07/21/2020, 1:40 PM
    For fargate: It seems like there is a 2 minute stop timeout, is this accurate? So if you have a task that runs for longer than 2 minutes it is killed? How does that work?
    👀 1
    m
    z
    5 replies · 3 participants
  • s

    Sven Teresniak

    07/21/2020, 3:52 PM
    I'm working with Prefect Server for a week now. Its fun! I like! Prefect is my perfect connection for handling dataflows between presto, spark, postgres, s3, etc. The setup as "all Prefect components as containers in one k8s-pod" took me a day. 😞 I will soon handle about 1TB of fresh data every day for a variety of specialized services. Thanks.
    💯 1
    🚀 4
    z
    5 replies · 2 participants
  • f

    Florian L

    07/21/2020, 3:59 PM
    Hello, i just implemented the new LocalEnvironment, with a LocalDaskExecutor, but as result i've got a new problem. My context's modification are no longer shared between the different task/functions of my flows. My understanding is that they are each in their own thread and not communicating with each other. Is there a solution to that problem, aside from no longer using dask ? Ps : I'm not expecting context to be shared instantly between task executed in parralels. I'm setting context at the beginning of my flow execution, and towards the end several steps after.
    j
    2 replies · 2 participants
  • c

    Chris Goddard

    07/21/2020, 5:40 PM
    hi there! I'm working on deploying prefect on a server. in the short term I'm just wanting to run a flow as a script rather than run prefect server. that works fine locally but for some reason on the server I keep getting errors because it's expecting an API key to prefect cloud. is there a configuration setting I'm missing?
    z
    11 replies · 2 participants
  • p

    Pedro Machado

    07/21/2020, 5:43 PM
    Hi everyone. I have a question about creating deterministic flow run names. I am working on a flow that will have a schedule with up to 18 clocks. If you are wondering why that many, here is some background. This flow pulls data from a reporting API that is organized around data bundles and different granularities (daily, weekly, monthly). Not all combinations are valid but I have identified 18 valid ones so far. Since each combination is available at different times with different frequencies, we need different schedules for each combination of parameters. The flow structure/logic is the same for all the combinations so it makes sense to have a single flow. Is there a way to define flow run names based on a combination of parameters and
    scheduled_start_time?
    More generally it would be great to be able to use context + parameters to define the flow run name.
    j
    1 reply · 2 participants
  • m

    Matt Wong-Kemp

    07/21/2020, 7:24 PM
    What's the 'proper' way to do async inside a prefect task? I'm dual-testing in Jupyter, so I'd like to keep the async bit working nicely, and as far as I can tell Prefect isn't running an event loop. At the minute I've got something that looks like this:
    async def do_thing_async_impl(a,b,c):
        await ...
        ...
    
    @task
    def do_thing(a,b,c):
        return asyncio.run_until.complete(do_thing_async(a,b,c))
    but I'm getting an error:
    Future <Future pending cb=[BaseSelectorEventLoop._sock_connect_done(9)()]> attached to a different loop')
    Before I got digging into event loop fun, is there an easier way to do an async task?
    j
    2 replies · 2 participants
  • k

    karteekaddanki

    07/21/2020, 10:27 PM
    Hey guys, I am trying to use result targets to cache some of my results as suggested in https://docs.prefect.io/core/idioms/targets.html. However a lot of my heavyweight tasks are run in
    C++
    and are invoked via Python. How can I use targets effectively in this case? I've tried to
    return None
    but as expected, this causes my task to run always. I've worked around this issue by creating a target a layer of indirection by returning a path to a file that contains the filename of the actual file generated by my
    C++
    program (similar to empty targets in make, the presence of this file is indicates that the task is run and the contents of this file point to the location of the task output). It would be nice if I can avoid this indirection and directly be able to return a
    Result
    object that doesn't necessarily correspond to a serialized python object. All downstream processes that consume these results treat them as locations. In other words, I am looking for a behavior identical to Luigi. Thanks in advance.
    z
    2 replies · 2 participants
  • t

    Thomas La Piana

    07/22/2020, 5:29 AM
    I have my graphql URL behind traefik and am using auth with it. I'd like to be able to register a flow by just passing the auth headers to the register() method the same way that I can pass it to the graphql queries. Should I make a PR for this or is there another option to register the flow? Is the serialize + graphql option I've seen the correct solution here?
    j
    6 replies · 2 participants
  • a

    Adrien Boutreau

    07/22/2020, 8:56 AM
    Hello! We install your Prefect Core on EC2 instance, and our customer is really happy about this product and graphic design! congrat's! Only one issue to fix is prefect agent : I started to run in background (prefet agent start &) but it seems to disappear at one moment and I don't understand : any idea on how I should run it ?
    j
    2 replies · 2 participants
  • m

    Matias Godoy

    07/22/2020, 9:54 AM
    Hello! I don't know if this has been reported before, but I found that the Parameters panel in the run section is broken (see attached image). Maybe it's because we're using really long parameters (a JWT token)
    m
    j
    3 replies · 3 participants
  • i

    Iain Dillingham

    07/22/2020, 10:09 AM
    Hi community. I'm trying to get Prefect server (v0.12.5) working locally but GraphQL can't connect to Postgres. There are a couple of other questions in this channel that describe a similar issue, but both relate to config.toml. I haven't written config.toml and I haven't set any
    PREFECT__
    environment variables, so I'm using defaults.
    j
    12 replies · 2 participants
  • l

    Lance Haig

    07/22/2020, 10:20 AM
    Hi, I was looking at the project on github and I noticed that in this pull request https://github.com/PrefectHQ/prefect/pull/2492 The Nomad agent was removed. I am just curious why this is the case?
    j
    2 replies · 2 participants
  • m

    Marwan Sarieddine

    07/22/2020, 1:28 PM
    Hi folks, is it possible to change a flow’s execution environment after a flow is registered ?
    j
    7 replies · 2 participants
  • s

    Sven Teresniak

    07/22/2020, 2:00 PM
    Can I call a task from within another task? Is this allowed? Bad style? No problem at all? Any caveats? I suppose its problematic because this could take the A out of DAG…
    i
    3 replies · 2 participants
  • r

    Richard Hughes

    07/22/2020, 2:48 PM
    Hi, I was wondering how does scheduling the same task to run with different parameters at the same time work? It seems to not work as I imagined it should. Maybe it seems there is a limitation to only have one task at a specific time of day. Does anyone have any insight on this setup?
    j
    3 replies · 2 participants
  • m

    Matt Wong-Kemp

    07/22/2020, 3:12 PM
    Is there a way to set default context values for a flow? I'm wanting to put the endpoint to hit for API services into the context, and at the minute I'm entering this by hand every time I want to run from the UI. I guess ideally any prefect context values set when registering the flow would set them as defaults.
    j
    16 replies · 2 participants
  • m

    Michael C. Grant

    07/22/2020, 3:49 PM
    Hey folks, I'm experimenting with using a Dask Gateway cluster to handle workloads. Currently we're using a local Dask execution environment with success so we're good there. Does anyone happen to have a custom Dask Gateway worker with prefect preinstalled they'd be willing to share?
    👋 2
    👏 1
    💯 3
    j
    s
    25 replies · 3 participants
  • s

    Shawn Marhanka

    07/22/2020, 9:59 PM
    Hi, I’ve been playing with Prefect Core for the past few weeks and recently moved over to experimenting with Prefect Cloud. We currently have all of our prefect flows/tasks in a separate repo. If we register all of the flows in that repo and use docker storage, can other apps in our ecosystem programmatically call those flows once they have authenticated to Prefect Cloud. I found
    client.create_flow_run(flow_id, parameters=parameters)
    , but I cannot find how to get the flow_id without registering. Is there a way to get all of the flow mappings (name + id) from a cloud project and then use that when creating flow runs. Or am I going about this all wrong? Thanks for the help.
    z
    4 replies · 2 participants
  • j

    James Bennett Saxon

    07/23/2020, 1:58 AM
    I've read the intro docs and went through the tutorial and was trying out some Prefect Tasks. First off was MySqlFetch because, well MySql.... I feel like I've got things setup right but I'm running into an unexpected error in the task runner trying to do a
    MySqlFetch.run()
    . :
    ERROR:prefect.TaskRunner:Unexpected error: AttributeError('__enter__',)
    So I didn't want to get into debugging this because my code could be totally wrong. I was hoping to find some examples of using this and other Prefect Tasks. Are there examples for how to use this and other tasks?
    c
    j
    15 replies · 3 participants
  • s

    Sven Teresniak

    07/23/2020, 7:59 AM
    Hi, I'm getting familiar with Prefect but now I have Flow I need some help with to make it elegant. I have something like this:
    def complex_task_generating_function(singleelement):
      case(sometask, foo):
        anothertask(…)
        …
    
    with Flow("foo") as flow:
      param = Parameter("param", required=False)
      
      # generates a list of strings, based on param. len is 0…n
      elements_to_process = maybe_generate_work_items(param) 
      
      # when this evaluates to False, all the following is skipped, the apply_map as well!
      case(isempty_task(elements_to_process), True):
        # now I either want to add one default element or 
        # somehow do the processing based on the following result
        generated_default = default_value_generator_task()
        
        # maybe so?
        elements_to_process = task(lambda x: [x])(generated_default)
    
      # now the tricky part.
      # elements_to_process is either a list or just one (runtime dependent) default value
      result = apply_map(complex_task_generating_function, elements_to_process)
    Problem is:
    apply_map
    does not know
    skip_on_upstream_skip
    . I cannot just use
    map()
    because
    complex_task_generating_function
    is not a task (its the beef of the flow so to say and in fact the logic of the flow). I found a workaround by doing something like this:
    @task(name="hack", skip_on_upstream_skip=False)
    def merger_hack(elements, default):
      return elements or [default]
    
    with Flow("foo") as flow:
      param = Parameter("param", required=False)  # same as above
      elements_to_process = maybe_generate_work_items(param)  # same as above
      
      case(isempty_task(elements_to_process), True):
        generated_default = default_value_generator_task()
    
      final_list = merger_hack(elements_to_process, generated_default)
      result = apply_map(complex_task_generating_function, final_list)
    But to write code like the hack-task that basically checks if the flow ran through the isempty-case or not seems odd. I don't want to "check" whether or not the flow used one path or another. The run path through the flow should decide this. How can I write this elegant and easy? Sorry for the long question but I want to learn how to use Prefect properly because in the future I'm going to write a lot of flows.
  • b

    bruno.corucho

    07/23/2020, 9:32 AM
    Hey guys, I'd like to read millions of database records using Dask's read_sql_table function (which works using Dask alone) within prefect, while still partitioning my data in n partitions, computing them in parallel and merge them altogether, in the end. Do you guys have any best approach/practices for Dask-specific functionalities within Prefect? How would the procedure be after my method*:*
    df = read_sql_table(table='peanuts', uri=connection,
                                    index_col="peanut_id", columns=["peanut_details, peanut_date"],
                                    npartitions=1000)
    Should I return these partitions and do the delaying and computing using a Prefect's map() from within the flow's scope definition? Thanks in advance! 🙂 And have a great weekend!
    c
    1 reply · 2 participants
  • s

    Sven Teresniak

    07/23/2020, 11:41 AM
    Hmmm maybe "I'm holding it wrong", but I need a "else" functionality for
    case
    . The
    ifelse
    -Task seems not to fit.
  • t

    Thomas Hoeck

    07/23/2020, 12:16 PM
    Hi all! Is there a way to limit which repositories the Docker Agent is allowed to pull from? Because as I see it, if someone got access to your Prefect account they could schedule your Docker Agent to run any image of their liking. This would have some pretty big security implications as you probably have provided your Docker Agent with secrets and that it probably is ruining on your on-prem network. As I see it this gives the Prefect Team (in theory) access to running code on all on-prem networks and extract the secrets set on the Docker Agent through env-vars.
    s
    j
    +1
    22 replies · 4 participants
  • k

    Klemen Strojan

    07/23/2020, 12:18 PM
    Is it possible for a scheduled flow to run on multiple agents at the same time, if all labels match? Is this expected or is it a bug? We are using Cloud. https://prefect-community.slack.com/archives/CL09KU1K7/p1595309512466100
    j
    7 replies · 2 participants
  • a

    Adam

    07/23/2020, 4:50 PM
    Hi everyone! My company is thinking of using Prefect but we'd like to talk through some of our use cases and see if it's a good fit. Who's the best person to talk with?
    👀 1
    l
    j
    8 replies · 3 participants
Powered by Linen
Title
a

Adam

07/23/2020, 4:50 PM
Hi everyone! My company is thinking of using Prefect but we'd like to talk through some of our use cases and see if it's a good fit. Who's the best person to talk with?
👀 1
l

Laura Lorenz (she/her)

07/23/2020, 5:00 PM
Hi @Adam! Welcome to Prefect!! 🚀 Feel free to share a bit here if you like for community input, but if you are looking more for someone to talk directly to you can email david at prefect dot io to set up an intro call; a technical resource will be on the call too to walk through your engineering specifics as necessary 🙂
j

Joe Schmid

07/23/2020, 5:07 PM
Hi @Adam, welcome to the community! I'm CTO at a healthcare startup doing machine learning and data engineering pipelines with Prefect. We've been using Prefect for a while (more than a year) and would be happy to share our experience and answer questions. If it's helpful, feel free to post here or DM me any time.
:marvin: 2
a

Adam

07/27/2020, 9:08 AM
Thanks @Laura Lorenz (she/her) and @Joe Schmid. After reading through the docs it seems Prefect would be a great fit for our needs. A follow up question though. I'm trying to understand the different options for deploying Prefect. We'd like to try Prefect Cloud + use one of our existing Kubernetes cluster to run the jobs. I'm just a bit confused by the "Execution Environments" and "Agents" section in the Orchestration docs. How do these concepts relate to each other? Do you have a guide to a best practice setup for running on Kubernetes?
l

Laura Lorenz (she/her)

07/27/2020, 2:58 PM
Hi @Adam, there are a ton of different ways to deploy Prefect as overall we try to be platform agnostic, but as a result it can be hard to get started on one specific deployment 🙂 Based on what you are saying I would recommend a Kubernetes Agent deployed in yoru cluster, and if you want to customize the worker jobs that spawn from that agent you can do so with the KubernetesJobEnvironment (or the DaskKubernetesEnvironment if you want high parallelism for your tasks by leveraging Dask). Here are a few other resources I recommend: blog on deploying with Prefect cloud + Kubernetes on AWS: https://medium.com/the-prefect-blog/seamless-move-from-local-to-aws-kubernetes-cluster-with-prefect-f263a4573c56 execution layer demo video (

https://youtu.be/50S4RqeEVVo▾

) and companion guide (https://coda.io/@laura-lorenz/setting-up-your-execution-layer-a-companion-guide) the last slide in these slides from Pycon 2020 open space, which has an architecture diagram of Cloud + your execution layer: https://docs.google.com/presentation/d/1TfOsYmsjgbwXRkiItb2ZeTW_oYxXWAWKMtEnEFOyPiA/edit?usp=sharing
a

Adam

07/27/2020, 3:21 PM
Thanks @Laura Lorenz (she/her). The KubernetesAgent does indeed sound like a great place to start for us. Regarding the jobs it creates, I assume it uses the Docker container that was built right?
And thanks for those videos, love your execution layer demo video!
l

Laura Lorenz (she/her)

07/27/2020, 10:59 PM
Exactly, it will use the Docker container built for your flow storage. (Here’s a github ref if you are interested for where that image starts getting set in the agent’s code: https://github.com/PrefectHQ/prefect/blob/f250e90340480d285fce8d25d5139a943e3d42a4/src/prefect/agent/kubernetes/agent.py#L115). Glad you like the videos! 😇
a

Adam

07/28/2020, 1:34 PM
Thank you @Laura Lorenz (she/her), that makes a lot more sense now!
View count: 1