https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • g

    Giang Hoang Le

    12/18/2019, 2:54 AM
    Hi Team, I would like to ask I have been developing a system based on a Prefect Core. I have found out to do distributed programming, I need to write functions to separate classes and classes themselves are not state-full. So I would like to ask with current docker container technology. Is it possible to develop normal functions that can used in distributed environment without writing separate classes? Regards Giang
    c
    7 replies · 2 participants
  • j

    Jake Schmidt

    12/18/2019, 4:33 PM
    Hi gang, a(nother) question about nested fanouts: I have a
    dict
    where the values are variable-length lists. I’d like to map a fanout on each key, then a fanout for each element in that key’s value, such that the operation on each value also receives the result of the “outer fanout’s” operation on its key. Would the best approach be to use a nested flow? I saw https://github.com/PrefectHQ/prefect/issues/1311 is similar...
    z
    c
    9 replies · 3 participants
  • j

    Jake Schmidt

    12/18/2019, 6:51 PM
    I’m evaluating Prefect along with Metaflow and Kubeflow Pipelines for 1) general ETL / analytics workflows and 2) machine learning training + deployment workflows. So far, I’m most excited about Prefect. Prefect’s native support for multiple environments (local, dask-local, dask-distributed, dask-kubernetes…) makes it accessible to anyone doing analytics at any scale in my company. It’s checkpointing feature makes debugging easier. The thing I like about Metaflow is that you can specify dependencies and resources at the task-level (ie run this task on AWS Batch with 2 GPUs), and seamlessly inspect run artifacts in a jupyter notebook. The thing I like about Kubeflow Pipelines (which we currently run in production) is how tasks can be authored, shared, and composed into new pipelines, and its UI supports arbitrary, rich output from each task. Does Prefect Core / Cloud envision supporting any of these features?
    j
    c
    +1
    4 replies · 4 participants
  • g

    George Coyne

    12/18/2019, 10:23 PM
    Hey guys, there was some talk of task looping the other day, I have a similar question. I am converting a lambda. In said lambda one function outputs a list which I then iterate over. What is the prefect-est way to achieve this?
    def handler(event, context):
        recipient_list = check_state()
        service = service_account_login()
        completed_samples = []
        for r in recipient_list:
            # message = create_message(recipient=r, email_from=EMAIL_FROM)
            # send_message(service, USER_ID, message)
            <http://logger.info|logger.info>(f"Message sent to {r[3]} for sampleid {r[4]}")
            completed_samples.append(r[4])
        # Write to state table for completed samples
        write_state(completed_samples)
    d
    4 replies · 2 participants
  • j

    Jake Schmidt

    12/18/2019, 10:51 PM
    How do I access the
    result
    of a given task in a flow? I see that
    flow.run().result
    is a
    dict
    keyed by `Task`s but
    flow.run().result[prefect.Task('task_name')]
    gives a
    KeyError
    . I can’t find the answer in the documentation or the api reference for
    State
    .
    j
    j
    5 replies · 3 participants
  • j

    Jake Schmidt

    12/18/2019, 11:23 PM
    Sorry for al the questions… how can I tell a nested flow to run on the same distributed dask executor as the outer flow? Right now the inner flow is just running the default Synchronous executor.
    c
    1 reply · 2 participants
  • i

    itay livni

    12/19/2019, 6:13 AM
    The new simplified visualizations in prefect-core look great
    :upvote: 8
    📊 1
  • a

    Arsenii

    12/23/2019, 5:00 AM
    Hey all, I get this error after installing Prefect via pip and then trying to run LocalAgent. Any ideas? Also tried installing
    prefect[all-extras]
    and
    dask[complete]
    Traceback (most recent call last):
      File "/usr/local/bin/prefect", line 5, in <module>
        from prefect.cli import cli
      File "/usr/local/lib/python3.7/site-packages/prefect/__init__.py", line 8, in <module>
        import prefect.triggers
      File "/usr/local/lib/python3.7/site-packages/prefect/triggers.py", line 48, in <module>
        from prefect.engine import signals, state
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/__init__.py", line 3, in <module>
        import prefect.engine.executors
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/__init__.py", line 30, in <module>
        from prefect.engine.executors.dask import DaskExecutor, LocalDaskExecutor
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 10, in <module>
        from distributed import Client, Future, fire_and_forget, worker_client
      File "/usr/local/lib/python3.7/site-packages/distributed/__init__.py", line 3, in <module>
        from .actor import Actor, ActorFuture
      File "/usr/local/lib/python3.7/site-packages/distributed/actor.py", line 7, in <module>
        from .client import Future, default_client
      File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 69, in <module>
        from .worker import dumps_task, get_client, get_worker, secede
      File "/usr/local/lib/python3.7/site-packages/distributed/worker.py", line 22, in <module>
        from dask.system import CPU_COUNT
    ModuleNotFoundError: No module named 'dask.system'
    1 reply · 1 participant
  • k

    Kamil Okáč

    12/23/2019, 12:38 PM
    Hi. Is there a way to reuse a resource (database connection in my case) over mapped task runs (in local threaded environment with multiple workers)?
    ✔️ 1
    c
    m
    4 replies · 3 participants
  • a

    Andrew Schechtman-Rook

    12/23/2019, 8:13 PM
    Hi prefectionists! I'm playing around with prefect core, and I'd love some advice/guidance on something I'm totally stuck on. I have a fair number of tasks which end up pulling data from somewhere, or spit out the result of a complicated operation I'd rather not repeat all the time. I'd like to be able to cache the results of these operations, but I want to cache to/from disk rather than having to pass around large datasets in
    prefect.context.caches
    . I've tried a bunch of things to try to do this, including: • Turning on caching, and specifying
    LocalResultHandler
    as the task/flow's
    result_handler
    . In this case caching works, but it doesn't use the
    result_handler
    to store/retrieve the task results. • Turning on
    checkpointing
    and specifying
    LocalResultHandler
    as
    result_handler
    . This works to save the result to disk, but that appears to be a one way operation - AFAICT there aren't any hooks in prefect to pull the checkpointed data back in when restoring from cache. • Using custom `state_handler`s to essentially intercept the results and do the file I/O. I'll admit I've spent the least time on this approach, it seems like it might work but I don't have a good enough grasp on prefect to know either how to implement it or even if it's a good idea. Anyone had to implement something like this, or have any additional ideas?
  • a

    Alex Goodman

    12/23/2019, 9:27 PM
    Indeed, @Andrew Schechtman-Rook! The
    LocalResultHandler
    does store information without regard to the task in question, however, the state object as a result of each task run does keep track of the path written to. You can then persist the task-to-file mapping separately any way you’d like. You could persist all of the state returned by
    flow.run()
    , or persist as state is available with a state_handler. An example of caching this for the whole flow using the
    LocalResultHandler
    + some state manipulation:
    with prefect.Flow("example", result_handler=LocalResultHandler(dir=".prefect")) as flow:
        x = e()
        y = y(x)
        z(y)
    
        flow_state = flow.run()
    
      for task, task_state in state.result.items():
        task_name = slugify.slugify(task.name)
        filepath = task_state._result.safe_value.value
        with open(os.path.join(".prefect", task_name), "wb") as fh:
          fh.write(filepath)
    In this way in the
    .prefect
    directory you would have the a result file for each task as well as a “tracking” file for each task, named by the task name:
    tree .prefect
    .prefect
    ├── prefect-2g3sboyi
    ├── prefect-ixeqv82p
    ├── prefect-km5ufh9y
    ├── x
    ├── y
    └── z
    Note, I used slugify to encode the task name, but you have several options here. A good option if you have off task names is to set the
    task.slug
    and use this to key off of. If you wanted to do the same thing but in a task state_handler:
    def handler(task, old, new):
      if new.is_successful():
        task_name = slugify.slugify(task.name)
        filepath = new._result.safe_value.value
        with open(os.path.join(".prefect", task_name), "w") as fh:
          fh.write(filepath)
    a
    21 replies · 2 participants
  • r

    Ryan Connolly

    12/24/2019, 6:05 PM
    Is there a good example of nesting flows? Example: I want to build a flow that I can call individually, but I also want to map over that flow.
    j
    j
    +1
    6 replies · 4 participants
  • d

    Daryll Strauss

    12/26/2019, 8:16 PM
    Hi Folks,
  • d

    Daryll Strauss

    12/26/2019, 8:18 PM
    Is there a reason that IntervalClock sets the minimum interval at 60 seconds? It would be useful in my application to allow shorter intervals even if they fired multiple times in the same scheduling cycle. My tests with IntervalClock and IntervalSchedule seem to indicate short intervals work as expected.
    👀 2
    c
    5 replies · 2 participants
  • j

    Jake Schmidt

    12/27/2019, 5:42 PM
    Is it possible to map over a generator such that elements `yield`ed would be mapped / computed immediately? Is this in the scope of ‘depth first execution’?
    c
    1 reply · 2 participants
  • a

    Adam

    12/29/2019, 11:21 PM
    Hey all - I have a question about idempotency. I’m just getting started using prefect (both core & cloud) and am most familiar with Luigi for managing data workflows. Luigi has a concept of a
    Target
    for outputs, and if that Target exists (you get to define “exists”), the task won’t run and will return the already-completed result. Is there something similar for Prefect, or is this something I would have to implement manually? Cacheing seems like it would work, but does that cache persist across runs, or if different `Flow`s are using the same
    Task
    ? As a concrete example - I’m writing a single row per day into a psql database. The day is defined by a
    Parameter("date")
    . If I run my Flow repeatedly, it keeps inserting rows for the same date. Instead, I’d like to check for existence of the row with that date and return with the appropriate State (probably
    Success
    ?).
    c
    4 replies · 2 participants
  • j

    John Ramirez

    12/30/2019, 4:47 PM
    Hey everyone - I am trying to see how the results of a mapping task are passed to the next stage but don’t know how to get the result. Please help!
    c
    2 replies · 2 participants
  • j

    John Ramirez

    01/02/2020, 5:03 PM
    Hey everyone - I am trying to use the
    result_handler
    with an
    @task
    with no success. Does anyone have a good example to demonstrate how to code it
    c
    6 replies · 2 participants
  • j

    Juarez Rudsatz

    01/02/2020, 8:31 PM
    Hi Folks! I'm playing around with prefect core, and I'd love some example of etl doing: 1. executing a select query in a database server and retrieving the results 2. transforming some fields in each row 3. writing to a CSV file I was wondering what's the proper way to build this and have some questions also: • Would this case scale to milions of rows if using prefect? • Should I orchestrate with prefect and use a library for etl like [bonobo](https://www.bonobo-project.org)? • Can I separate the e/t/l steps in tasks and reuse for writing to AWS S3 for example? Regards Juarez
    c
    1 reply · 2 participants
  • j

    John Ramirez

    01/03/2020, 7:17 PM
    Is it possible to give a custom name a task that is used in multiple places in the workflow to give a differentiation in
    flow.visualize()
    c
    n
    2 replies · 3 participants
  • a

    Alex Cano

    01/03/2020, 7:49 PM
    Hey guys, I know in the past you’ve mentioned that generators don’t necessarily play well with prefect since cloudpickle can’t serialize a generator. Have you taken a look at this project? I don’t know much of C or cloudpickle’s internals, so not sure if this could be used to solve a problem, but would leave ot hear your thoughts on it. https://github.com/llllllllll/cloudpickle-generators. Edit: There’s also an open issue for the cloudpickle project about integrating it, but they seem hesitant as it might break pypy and jython implementations (https://github.com/cloudpipe/cloudpickle/issues/146)
    🤔 1
    c
    5 replies · 2 participants
  • j

    Javier Garcia

    01/03/2020, 8:16 PM
    Hey guys - glad to be part of the community; I should have joined sooner. I've been devouring your docs and ran into some broken links. The
    API Reference
    links in https://docs.prefect.io/core/task_library/google.html#google-cloud-storage are returning 404 -> https://docs.prefect.io/api/unreleased/tasks/google.html#prefect-tasks-google-storage-gcsdownload
    j
    1 reply · 2 participants
  • a

    alexandre kempf

    01/04/2020, 1:33 PM
    Hello guys ! I'm wondering if it is possible to run a flow inside a flow! The main idea would be to have a flow (flowModel) that is responsible of training a model, and inside between the loading and the training part, run a flow for data preprocessing (flowPreproc). My main problem is that I'm giving a function task to my second flow instead of a value (see the example). I cannot think of a way to do it and I don't know if I lack skills with prefect or if this is bad practice. Thank you in advance ! 🙂 Keep doing the good work, your tool is amazing 🙂 Here is a minimal example:
    from prefect import task, Parameter, Flow
    import numpy as np
    
    @task
    def augment_data(a, b=10):
        return np.sqrt(a)+np.sqrt(b)
    
    with Flow("augment") as flowAug:
        a = Parameter("a")
        aug = augment_data(a, 10)
    
    @task
    def load_data(c):
        return c
    
    @task
    def train_model(model, d):
        print("Training, {} with {}!".format(model, d))
        return model
    
    
    with Flow("training") as flowModel:
        init = Parameter("c")
        model = Parameter("model")
        data = load_data(init)
        state = flowAug.run(a=data)
        aug_data = state.result[aug].result
        result = train_model(model, aug_data)
    
    state_model = flowModel.run(c=5, model="SVM")
    j
    d
    8 replies · 3 participants
  • b

    Braun Reyes

    01/07/2020, 1:14 AM
    Hey there....wondering about '_requires_ result handlers for all flows with keyed edges that are registered with Prefect Cloud' in master branch. I rebased our fork and now some flows are failing to register. Is there a good way to understand this change? Is this so that flows runs can pick up from a last know successful task?
    j
    c
    3 replies · 3 participants
  • b

    Braun Reyes

    01/07/2020, 1:16 AM
    notice that docker storage did not get the sensible defaults
  • b

    Braun Reyes

    01/07/2020, 1:17 AM
    which make sense since the underlying storage is ephmeral
  • b

    Braun Reyes

    01/07/2020, 1:18 AM
    I think we will just force S3 storage on our flows until we get S3 storage working with Fargate agent
  • b

    Braun Reyes

    01/07/2020, 1:21 AM
    though would be interesting to understand the reasoning behind forcing persistent storage between tasks. I am all for having one proper way to do things...just would be good to understand.
  • b

    Braun Reyes

    01/07/2020, 1:22 AM
    all example for migration would be nice as I feel this will break a decent amount of flows once it comes out
  • b

    Braun Reyes

    01/07/2020, 1:23 AM
    final note....I do understand master is 'not prod ready' and we are good with adjusting on our end.
Powered by Linen
Title
b

Braun Reyes

01/07/2020, 1:23 AM
final note....I do understand master is 'not prod ready' and we are good with adjusting on our end.
View count: 1