https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    source_creator

    07/19/2019, 9:05 PM
    Hi everybody! Is there a way to run multiple schedules? I would like a schedule that runs every hour, and then one that runs twice a day, both that call the same function, but handle the result differently 🙂
    c
    m
    • 3
    • 11
  • r

    Robert Medeiros

    07/19/2019, 10:23 PM
    Hey folks, n00b question: I'd like to create a number of flows dynamically (based on the content of a configuration file). Is there an example somewhere of how best to accomplish this?
    c
    • 2
    • 5
  • l

    LucaSoato

    07/22/2019, 8:20 AM
    Hi guys, first of all good job with Prefect! I'm wondering if it's possible to install it in a distributed mode, to avoid single point of failures.
    j
    • 2
    • 1
  • c

    Chris White

    07/23/2019, 9:45 PM
    Hey everybody! Just wanted to make a small announcement that a new Tutorial has been added to our documentation that covers many common questions we’ve heard, such as: - how / when Tasks are auto-added to Flows - recommended best practices in writing Tasks - how to better test your custom tasks - and a lot of other info about Prefect Tasks The link is here: https://docs.prefect.io/guide/tutorials/task-guide.html Enjoy!!
    🎉 7
    👍 1
  • w

    Wai Kiat Tan

    07/24/2019, 7:03 AM
    Hi guys! I came across Prefect while looking for a suitable workflow tool and I love using Prefect so much. I recently upgraded to 0.6.0 and my existing workflow keeps getting this warning error
    [2019-07-24 06:56:03,439] WARNING - prefect.TaskRunner | Task 'x': can't use cache because it is now invalid
    Any idea?
    d
    d
    • 3
    • 8
  • o

    orcaman

    07/24/2019, 8:44 AM
    Hi everyone. I have this situation where after refactoring some of my code that builds the flow by adding tasks, there seems to be some memory leak as a result of an infinite loop cycle. I would like to compare the flow generated by the new code to the one generated by the old code. Is there a way of doing this? My dot visualization is too complex to inspect by hand (very big graph, about a hundred noes). I've tried using the
    flow.sorted_tasks
    function and output that result to a CSV to be able to compare the two projects, but it seems that the function returns different files even for the same program (I guess the internal sorting mechanism doesn't generate the same sort at every run). Any ideas would be appreciated. Thanks!
    d
    • 2
    • 1
  • m

    Mathis Börner

    07/24/2019, 10:00 AM
    Hi everyone. I am interested in using prefect for a more event-based workflow. To be more precise the flow has to be initialized when a new files comes in and run/retried a bit later in the program. I read the PIN-8 multiple times and was wondering is someone already working on it and if yes how can I contribute?
    d
    • 2
    • 3
  • m

    Mathis Börner

    07/24/2019, 10:04 AM
    Another question I've got: I have to run the same flow for each file but different configs. I wanted to use prefect.context but struggled to save a flow with its context. So my idea was to have a Parameter('config') flowed by a manual_only task to "setup" the task: run the parameter task -> sleep the flow -> store the flow to fully process it later. It feels kind of off to do it this way, so is there a better way to achieve what I need?
    c
    • 2
    • 3
  • m

    Mathis Börner

    07/24/2019, 10:29 AM
    import time
    
    import prefect
    from prefect import task, Flow, Parameter
    from prefect import tags
    from prefect.engine import signals
    from prefect.environments.storage import LocalStorage
    from prefect.environments import LocalEnvironment
    from prefect.utilities.tasks import pause_task
    from prefect.engine.state import Resume
    
    
    @task
    def hold(cfg):
        pause_task()
        return cfg
    
    @task
    def process(cfg):
        try:
            return prefect.contex.key + cfg['value']
        except:
            raise signals.FAIL()
    
    def prepare_flow(flow, cfg, key, name, storage):
        with prefect.context(key=key):
            flow.run(cfg=cfg)
            previous_name, flow.name = flow.name, name
            storage_name = storage.add_flow(flow)
        flow.name = previous_name
        return storage_name
    
    
    def run_flow(flow_name, storage):
        flow_test = storage.get_flow(flow_name)
        resume_states = {t: Resume()
                         for t in flow_test.get_tasks(tags='holdpoint')}
        flow_test.run(task_states=resume_states)
    
    
    if __name__ == '__main__':
        env = LocalEnvironment()
        storage = LocalStorage('storage')
        with Flow('Using Context', environment=env) as flow:
            cfg = Parameter('cfg')
            with tags('holdpoint'):
                cfg = hold(cfg)
            process(cfg)
        cfg = {'value': 'def'}
        key = 'abc'
        flow_name = prepare_flow(flow, cfg, key, 'test_flow', storage)
        time.sleep(5)
        run_flow(flow_name, storage)
    • 1
    • 1
  • m

    Mathis Börner

    07/24/2019, 10:29 AM
    This is my current test flow.
  • b

    Brian McFeeley

    07/24/2019, 8:50 PM
    is it possible to map a task over a list of results generated by a previous task, where the task has a signature like
    @task
    def do_something(x, y):
        # X is the same for each invocation
        # Y is the result we're mapping over
    I tried something like
    foo.map("two", "parameters")
    , but when I print from inside these functions, only the second param contains what I expect
    c
    m
    • 3
    • 4
  • j

    Joe Schmid

    07/25/2019, 3:01 PM
    Does anyone have a working example of specifying a
    result_handler
    and having
    write()
    &
    read()
    functions called? (Mainly interested in
    result_handler
    on a Task, but Flow-level or overall default is fine, too.) I can't seem to get my ResultHandler called and I'm sure it's something silly I'm doing. (I can post a simple example if it helps, but I suspect others have this working just fine.)
    a
    c
    m
    • 4
    • 14
  • b

    Brian McFeeley

    07/25/2019, 3:30 PM
    I've got a question about nested mapping. Consider a trivial example:
    @task
    def extract():
        return [1, 2, 3]
    
    @task
    def transform1(x):
        return [x, x * 2, x * 3]
    
    @task
    def load(x):
        print(f"output: {x}")
    
    
    if __name__ == '__main__':
        with Flow("mapping test") as flow:
            e = extract()
            t1 = transform1.map(e)
            l = load(t1)
        flow.run()
    This works like we expect and generates a new list for each of the items in the original list, with output
    [[1, 2, 3], [2, 4, 6], [3, 6, 9]]
    . However, let's say I wanted to then map over each of the items in each of these sublists. If I add a new task:
    @task
    def transform2(x):
        return x * 2
    And then change my flow to:
    e = extract()
            t1 = transform1.map(e)
            t2 = transform2.map(t1)
            l = load(t2)
    I see that the second tasks received the lists as a whole in their mapped execution, rather than mapping over the internal scalars, so the output is
    [[1, 2, 3, 1, 2, 3], [2, 4, 6, 2, 4, 6], [3, 6, 9, 3, 6, 9]]
    i.e. we duplicated the list instead of multiplying each element by 2. An example from real life here is maybe: • generate or grab list of s3 buckets • get list of files in each bucket • process each file individually Any ideas? My docs search is coming up dry.
    c
    j
    m
    • 4
    • 8
  • b

    Brian McFeeley

    07/25/2019, 5:43 PM
    If my tasks need to make some writes/updates to a database, are there any recommendations for managing connections to prevent contention and not creating a new connection per invocation? looking at https://docs.prefect.io/guide/core_concepts/tasks.html#overview I wonder if it might be a reasonable idea to create one connection per task as state that lives inside the task. We've done that with celery in the past. Does that make sense?
    c
    • 2
    • 1
  • c

    Chris Hart

    07/26/2019, 1:44 AM
    heyy, so I'm running a task that fetches data from a third party api, returns the response, which is passed to the next task (to index in elasticsearch)... now I need to add pagination support in the first task, and I'm wondering if it's feasible to make the task itself repeat for however many pages are needed, so as not to blow memory or connection limits downstream.. if so could someone recommend the patterns in the docs for this?
    c
    j
    • 3
    • 8
  • c

    Chris Hart

    07/26/2019, 1:45 AM
    I would also like to pass in parameters to the first task to help drive the api calls
  • c

    Chris

    07/26/2019, 7:10 AM
    Hi everyone, has anyone had any issues with leaked semaphores when running a flow with the Dask Executor? It looks like somewhere during the flow, one of the Dask workers fails (might be some connection/timeout issue). Just before it fails I get a
    there appear to be 1 leaked semaphores to clean up at shutdown
    . Also, rather than killing the flow, or restarting the failed task, it restarts upstream tasks which were successfully run. Does anyone know how what the cause of failure might be and whether it’s possible to only restart the failed task?
    j
    • 2
    • 8
  • p

    paularmand

    07/26/2019, 1:42 PM
    yes
  • p

    paularmand

    07/26/2019, 1:42 PM
    yes on the leaked semaphores.
  • c

    Chris Hart

    07/26/2019, 5:14 PM
    regarding doing looping for pagination using task mappings.. that seems like maybe the only way, since I can't seem to access the raw data returned from the initial task run from within the flow definition (outside the task), unless I'm missing something, but poking around the Task object in a debugger doesn't show the result data
  • c

    Chris Hart

    07/26/2019, 5:16 PM
    I have some "page info" data in that payload such as
    has_next_page: true/false
    that could be used for optionally triggering the task again by passing in the limit/offset/cursor etc.. are there any tutorials or examples of mapping with loops?
  • c

    Chris Hart

    07/26/2019, 5:24 PM
    seems like I might need triggers and/or indexing to determine if the task should be called again
  • c

    Chris Hart

    07/26/2019, 6:46 PM
    this seems to be the most analogous example for pagination looping with tasks: https://docs.prefect.io/guide/tutorials/advanced-mapping.html#scaling-to-all-episodes only instead of URL lists, I'm mapping cursors and offsets
    c
    j
    • 3
    • 11
  • c

    Chris Hart

    07/29/2019, 4:26 PM
    random Q about Executor plans (I caught wind of Dask maybe not being the designated future/best thing).. but am coming up on a need to move from LocalExecutor to something more parallel. Is there an up-to-date recommendation for that?
    b
    • 2
    • 1
  • c

    Chris White

    07/29/2019, 5:44 PM
    Hi @Chris Hart posting here just so my response is visible: Dask is absolutely still our recommended executor for parallel / distributed workloads. The re-run behavior described above only occurs if a dask worker dies, which is typically caused by a resource constraint. Additionally, if the workflow is running on Cloud we very explicitly prevent task reruns so it’s not an issue at all (other than some noisy logs). For this reason we recommend users understand the memory requirements of their tasks and flows. That being said, I do plan on opening an issue on the distributed repository to try and prevent this behavior since it is annoying.
    😎 1
    c
    • 2
    • 4
  • b

    Brian McFeeley

    07/29/2019, 7:02 PM
    I'm running into what looks like a pythonpath/importing error when attempting to run on a local dask cluster:
    Unexpected error occured in FlowRunner: ModuleNotFoundError("No module named 'utils'")
    , referencing shared static functions in a package called
    utils
    . Any ideas how I might go about debugging this to make sure that code is available at the time of execution? When I look at the barf from the dask workers themselves, it looks like it's crapping out trying to deserialize a task:
    distributed.worker - WARNING - Could not deserialize task
    Traceback (most recent call last):
      File "/Users/bmcfeeley/.virtualenvs/spark3.7/lib/python3.7/site-packages/distributed/worker.py", line 1272, in add_task
        self.tasks[key] = _deserialize(function, args, kwargs, task)
      File "/Users/bmcfeeley/.virtualenvs/spark3.7/lib/python3.7/site-packages/distributed/worker.py", line 3060, in _deserialize
        function = pickle.loads(function)
      File "/Users/bmcfeeley/.virtualenvs/spark3.7/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 61, in loads
        return pickle.loads(x)
    ModuleNotFoundError: No module named 'utils'
    Does all the code have to live in the same file as the flow/task definitions somehow?
    c
    • 2
    • 4
  • b

    Brian McFeeley

    07/29/2019, 9:45 PM
    Running into a bit of an inscrutable error that i strongly suspect is not really prefect's issue, but rather an issue with my dask cluster setup. When submitting a flow run to my cluster, I see these warnings repeatedly:
    distributed.client - WARNING - Couldn't gather 4 keys, rescheduling {'extract-2e6b9db4-5567-486a-af2f-8ad07bc6a77b': ('<tcp://172.17.0.2:43849>',), 'transform2-b5e3d1d0-fb14-44f6-afe6-6b47ec5ab277': ('<tcp://172.17.0.2:43849>',), 'transform1-8156dfd1-1a53-4213-bf66-c70d1aab724f': ('<tcp://172.17.0.2:43849>',), 'load-fa2ae950-213e-4044-9773-fdacd7b057b4': ('<tcp://172.17.0.2:43849>',)}
    I'm trying to deploy my cluster in aptible, and I suspect i've not set up enough plumbing on exposing the right ports so the scheduler and worker processes can communicate. Does this ring any bells?
    c
    m
    • 3
    • 6
  • b

    Brett Papineau

    07/29/2019, 10:29 PM
    Hello. So I'm investigating if we can utilize prefect as our job(task) scheduler in a function driven data warehouse. This investigation comes after determining that making significant work arounds for airflow is unsustainable debt. On preliminary investigation I've noticed a few problems and have a few questions if you have the time. I will be putting them in a topic reply.
    c
    • 2
    • 14
  • j

    Jie Lou

    07/30/2019, 4:19 PM
    Hello. My flow contains a dask-ml prediction model such as logistic regression/xgboost etc. Is it possible to use dask executor to run the flow also? It just failed for both algorithms. Or should I just use normal sklearn model inside flow? Without specifying the execute=dask_executor, the flow just works fine.
    c
    m
    t
    • 4
    • 20
  • c

    Chris Hart

    07/30/2019, 10:16 PM
    a followup question on mapping tasks for pagination: I’m using the technique of having an upstream task that builds a giant dictionary of all possible pages, and then maps that into the task that then ingests (“hydrates”) the data from an API.. it is working yay! my question now is: any suggestions for how I can chunk the pages list or yield/stream it to the next task? reason being is that I want to a) limit the memory footprint of the downstream task and b) limit the size of failures that would then need to be retried, preventing duplicate fetching at scale
    c
    • 2
    • 4
Powered by Linen
Title
c

Chris Hart

07/30/2019, 10:16 PM
a followup question on mapping tasks for pagination: I’m using the technique of having an upstream task that builds a giant dictionary of all possible pages, and then maps that into the task that then ingests (“hydrates”) the data from an API.. it is working yay! my question now is: any suggestions for how I can chunk the pages list or yield/stream it to the next task? reason being is that I want to a) limit the memory footprint of the downstream task and b) limit the size of failures that would then need to be retried, preventing duplicate fetching at scale
it seems that the generation of subtasks for the mapped items is working and I wonder if just switching it to the Dask Executor will allow for the parallelization of the downstream one to proceed, since right now the one that hydrates all records is going sequentially & locally
(and thus keeping all data from all pages in memory before proceeding to the last task map)
c

Chris White

07/31/2019, 4:42 AM
Great questions; a few things to call out: - at this exact moment, “chunking” or batching of mapped tasks is not a first-class feature, but it’s something we recently began considering implementing. Consequently, to do this today you need to manually create the batches yourself (i.e., map over a list-of-lists). Feel free to open an issue formally requesting this! - using the
DaskExecutor
will certainly gain you parallelism of the mapped tasks at each level of mapping; I’m not 100% sure if this is all you’re asking about, so let me know if there was something more subtle here that I’m missing - one caveat with mapping is that the results (outputs) of all the mapped tasks will be “reduced”, i.e., gathered into memory, so just be mindful of this when considering your resource requirements
c

Chris Hart

07/31/2019, 4:07 PM
ah ok cool thanks! will start by adding a dedicated batching/chunking task for that
💯 1
View count: 1