https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Arlo Bryer

    10/29/2019, 3:58 PM
    I'd be very curious to discuss a few points/user experience with a couple of people if possible/before committing.
    j
    4 replies · 2 participants
  • g

    George Coyne

    10/30/2019, 5:35 PM
    Hey everyone, what’s the best way to set a task to run last and always run, I am using the
    all_finished
    trigger but for whatever reason the task is running first. Thanks in advance
    c
    a
    +1
    8 replies · 4 participants
  • a

    Aliza Rayman

    10/31/2019, 2:20 PM
    Hi everyone, I'm new to Prefect and stumbled upon it after using Airflow for my current project and feeling dissatisfied. I am creating workflows to do production data monitoring/ business rule checking and alerting on big data. Has anyone else used Prefect for a similar problem and could share feedback or advice? Thank you!
    a
    7 replies · 2 participants
  • w

    Wai Kiat Tan

    11/01/2019, 1:54 AM
    Just got invited to Prefect Cloud! Thanks to @Chris White and the team!
    🚀 3
    :marvin: 3
    c
    1 reply · 2 participants
  • i

    itay livni

    11/03/2019, 7:25 PM
    Hi - A quibble with the error message received when parameters are unused in a flow
    ValueError: Flow.run received the following unexpected parameters: unused_param1, unused_param5
    Perhaps there can be an error message for unused params?
  • i

    itay livni

    11/03/2019, 7:26 PM
    Basically I find myself establishing Parameters in flows before I write some of the functions
    j
    2 replies · 2 participants
  • a

    Argemiro Neto

    11/04/2019, 11:38 PM
    Hi there, quick question: I saw once how to capture inside a task that the upstream tasks were SKIPPED but I'm finding any more. Does anyone knows where I can find it?
    c
    6 replies · 2 participants
  • t

    Tsang Yong

    11/05/2019, 5:45 PM
    Hi Community, can someone help me understand task map a bit better. If I have a list that got mutated in size during a flow run (like from 1 -> 2 dimensional), will map automatically break up the new mutated 2D list into tasks?
  • t

    Tsang Yong

    11/05/2019, 5:46 PM
    so far in my attempts I can map out the original 1D list in my task run but not the 2D list.
  • t

    Tsang Yong

    11/05/2019, 5:48 PM
    import IPython
    from prefect import task, Flow, Parameter
    
    
    my_list = [1, 3, 4]
    
    
    @task
    def map1_fn(item):
        new_item = [item+1, item+1]
        print("map1_fn: {}".format(item))
        return new_item
    
    
    @task
    def map2_fn(item):
        # expecting item = 2 but got [2, 2] etc
        print("map2_fn: {}".format(item))
        return item
    
    
    with Flow("workflow") as flow:
        my_other_list = map1_fn.map(item=my_list)
        my_another_list = map2_fn.map(item=my_other_list)
    
    flow.run()
    c
    8 replies · 2 participants
  • r

    RyanB

    11/06/2019, 4:51 PM
    It appears I can't use the context supplied logger when using the DaskExecutor as I get an exception that the context cannot be pickled?
    j
    j
    +1
    11 replies · 4 participants
  • d

    dhume

    11/06/2019, 7:56 PM
    I’m running into a problem when building my Docker storage object. I have my flows and some python files of shared functions. However I cannot
    add_flow
    any flow that relies on those shared functions. Is there a simple work around without turning the shared functionality into it’s own library and adding it to
    python_dependencies
    c
    1 reply · 2 participants
  • t

    Tsang Yong

    11/07/2019, 12:10 AM
    Is there a recommended way to build workflow code and organize local imports. how should I properly organize the workflow code / utils imports over a dask cluster? I’m new to python and I find myself having to build eggs + upload them while these are not my familiar territory.
    c
    3 replies · 2 participants
  • m

    Maikel Penz

    11/07/2019, 3:13 AM
    Hey.. question about
    task retries
    . Shouldn't the code below
    retry
    only every 30 seconds ? When I run it the output goes from 1 to 40 in less than a second and finishes. I expected it would take 30(seconds) x 40 runs to finish (as per my
    if loop_count == 40:
    to stop)
    from prefect import Flow, task, context
    from prefect.engine.signals import LOOP
    import datetime
    import time
    
    @task(max_retries=50, retry_delay=datetime.timedelta(seconds=30))
    def test_retry():
        loop_count = context.get("task_loop_count", {})
        print(loop_count)
    
        if loop_count == 40:
            return "finished"
    
        raise LOOP(message="Next run")
    
    with Flow("test-pipeline") as flow:
        test_retry()
    
    flow.run()
    c
    5 replies · 2 participants
  • t

    Tsang Yong

    11/07/2019, 5:25 PM
    Is there an example on how to catch bad shelltask in prefect? looking at the docs page it’s not clear to me how to catch these other a raise reference prefect.engine.signals.FAIL
    j
    4 replies · 2 participants
  • t

    Tsang Yong

    11/07/2019, 7:57 PM
    Hello. Have another question while checking on ifelse() flow. What is my best approach if I have a lot of downstream tasks based on a ifelse decision. Do I wrap all the downstream tasks with a ifelse()?
    c
    b
    +1
    10 replies · 4 participants
  • m

    Melanie Kwon

    11/07/2019, 8:52 PM
    Hello, quick question: prefect’s dask scheduler does not seem to support the security specification that allows for things like ssl auth. Do you guys have any general recommendations in terms of executor authentication/security?
    z
    c
    +1
    5 replies · 4 participants
  • m

    Mitchell Bregman

    11/08/2019, 9:48 PM
    Hi there! My team is exploring Prefect as a workflow engine to support hundreds of data integrity checks on an internal survey management system containing tens of thousands of survey responses. We like Prefect because of its clean implementation, seemingly lower learning curve, and the ability to connect complex dependencies. I am tasked with building a prototype "flow" which can serve as an example that supports thousands of API calls, data models/checks, db reads and writes. My goal is to hit the ground running with the Prefect Core framework, using a threaded environment that can schedule tasks (i.e. API calls) in parallel, read and write to PG in bulk, and perform other various tasks such as existence checking, data integrity, etc. Coming from a Luigi background, a lot of these things are taken care of for me. Our biggest pain point with Luigi is its dependency management model + rigid existence checking, which can be a huge time suck as these checks are performed on 1 thread. I am seeking scalable granularity in this workflow. As I read through these docs, I am seeing your concept of
    Executors
    as well as the
    DaskExecutor
    object - which seems to be the proper choice. Now, when I start exploring this idea of
    mapping
    and connecting these task dependencies together, I get a little flustered without a more complex Prefect pipeline example... If it were possible, would you be able to point me to a larger scale example on GH or elsewhere; something that has multiple modules + a nicely defined project structure?
    j
    c
    6 replies · 3 participants
  • b

    Brad

    11/09/2019, 12:19 AM
    Hi team, I just got onto prefect cloud and tried installing the prefect cli via pipx (https://github.com/pipxproject/pipx) but the install didn’t pick up the
    click
    requirement from dask
    🧐 1
    j
    3 replies · 2 participants
  • b

    Braun Reyes

    11/11/2019, 4:43 AM
    omg! just touched the Dask project for the first time...its pretty darn cool.
    🙌 1
    🚀 2
  • b

    Braun Reyes

    11/11/2019, 4:44 AM
    that delayed object usage looks awfully familiar 🤔
  • b

    Brett Naul

    11/11/2019, 4:55 AM
    bit of a philosophical q: why are `Parameter`s required to be unique rather than just all receiving the input? I mentioned to @Chris White that I need to duplicate a large part of one flow in multiple places, so I was toying with a pattern like
    def add(x, y):
        return x + y
    
    def add_to_x(y):
        x = Parameter('x')
        total = task(add)(x, y)
    
    with Flow('flow') as f:
        add_to_x(1)
        add_to_x(2)
    in this case you could just pass in
    x
    to the “factory” function instead, but in practice I have lots of parameters so it feels a bit clumsy. I’m sure there was a good reason for enforcing that parameters be unique; but doesn’t the fact that we’re able to assert that it’s unique mean that we could also just grab a reference to the already-existing task? 🤔
    c
    3 replies · 2 participants
  • p

    Phoebe Bright

    11/11/2019, 1:58 PM
    Hi, I've just started using Prefect with Django and I have a couple of questions 1. Is there some way of running a flow that includes a Django context so I can access the model methods? I can use API calls instead. Is this a better way to do it? 2. How to I trigger the correct workflow? The overall flow is like this: Check to see if any new records have been added to a Django model/postgres table For each new record, work out what type it is and therefore which workflow to run, eg if type == 1 flow1.run() @taska @taskb @taskc elif type == 2 flow2.run() @taska @taskz What is the best approach? - Should this all be in Prefect - not sure how to do that - Should I have a piece of external code that checks for new records and what type they are and then calls the correct prefect flow with the parameter of the new record - Is there a better way of doing this!?
    j
    6 replies · 2 participants
  • j

    Jason

    11/11/2019, 4:19 PM
    Does prefect support a yield generator model of passing data from 1 task to the next?
  • j

    Jason

    11/11/2019, 4:20 PM
    For instance, when processing a CSV, can you yield per row?
    j
    c
    3 replies · 3 participants
  • m

    Mitchell Bregman

    11/11/2019, 9:28 PM
    hey guys, just out of curiosity... in
    get_module_metadata
    i am returning a class object; is this the reason for the doubly directed dependency arrow? it seems as though in all other tasks, where I am returning standard python data types, do not have a doubly...
    j
    c
    7 replies · 3 participants
  • t

    Tsang Yong

    11/12/2019, 12:26 AM
    hey, I’m getting these in the logs. is this something I in my workflow code or dask config or prefect?
    dask-worker[65048]: distributed.core - INFO - Event loop was unresponsive in Worker for 3.21s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
    j
    j
    +1
    6 replies · 4 participants
  • i

    itay livni

    11/12/2019, 8:49 PM
    Hi - What is the best practice for running a
    flow
    from a
    flow
    ? Is it advisable? if so should I call
    main()
    or the
    flow
    directly?
    main()
    being a function in the module with the other
    flow
    .
    z
    j
    4 replies · 3 participants
  • r

    RyanB

    11/13/2019, 12:16 AM
    question on dask-kubernetes, I get a ERROR - prefect.FlowRunner | Unexpected error: ValueError(“Unexpected keyword arguments: [‘processes’, ‘silence_logs’]“,) Traceback (most recent call last): File “/usr/local/lib/python3.6/dist-packages/prefect/engine/runner.py”, line 48, in inner new_state = method(self, state, *args, **kwargs) File “/usr/local/lib/python3.6/dist-packages/prefect/engine/flow_runner.py”, line 393, in get_flow_run_state with executor.start(): File “/usr/lib/python3.6/contextlib.py”, line 81, in enter return next(self.gen) File “/usr/local/lib/python3.6/dist-packages/prefect/engine/executors/dask.py”, line 74, in start self.address, processes=self.local_processes, **self.kwargs File “/usr/local/lib/python3.6/dist-packages/distributed/client.py”, line 651, in init “Unexpected keyword arguments: {}“.format(str(sorted(kwargs))) ValueError: Unexpected keyword arguments: [‘processes’, ‘silence_logs’] when trying to connect to a local dask scheduler created using KubeCluster()
    c
    j
    +1
    8 replies · 4 participants
  • b

    Brett Naul

    11/13/2019, 5:05 PM
    just curious, any eta for 0.7.2?
    j
    c
    4 replies · 3 participants
Powered by Linen
Title
b

Brett Naul

11/13/2019, 5:05 PM
just curious, any eta for 0.7.2?
j

josh

11/13/2019, 5:06 PM
This week 😄
c

Chris White

11/13/2019, 5:06 PM
Friday is the plan!
b

Brett Naul

11/13/2019, 5:06 PM
thanks!
c

Chris White

11/15/2019, 9:49 PM
Hey @Brett Naul - FYI 0.7.2 is out! Your feedback / actual PRs are a big part of this release: - failed heartbeat thread logging - ability to name flow runs from the Python Client - your bug fix - no autogeneration of constants anymore to name a few haha
🤩 1
View count: 1