https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Mihai H

    12/09/2019, 3:17 PM
    Hi there!
    👋 4
  • m

    Mihai H

    12/09/2019, 3:18 PM
    We are a startup from Netherlands that is currently evaluating different workflow engines
  • m

    Mihai H

    12/09/2019, 3:19 PM
    Prefect seems really interesting to us and it seems that it overcomes some of Airflow limitations
  • m

    Mihai H

    12/09/2019, 3:19 PM
    But I do have some quick questions for the time being.
    n
    1 reply · 2 participants
  • m

    Mihai H

    12/09/2019, 3:20 PM
    I haven't seen any CLI tool available if Prefect is run on premises...am I missing something?
    n
    c
    4 replies · 3 participants
  • m

    Mihai H

    12/09/2019, 3:20 PM
    How can I view running tasks and their status?
  • m

    Mihai H

    12/09/2019, 3:20 PM
    How can I peek into the execution flow?
  • m

    Mihai H

    12/09/2019, 3:21 PM
    How about querying a task state?
  • l

    Luke Orland

    12/09/2019, 5:04 PM
    How can we specify an upstream task for all copies of a task created by
    .map
    ?
    c
    1 reply · 2 participants
  • w

    Walter Gillett

    12/09/2019, 10:49 PM
    Having read through the Prefect docs, I love the lightweight philosophy, Python-friendliness, and improvements over Airflow. But the documentation is lacking, are there plans to expand it? For our computationally heavy use case, we would want each task to run in its own Docker container and independently on a collection of worker nodes (e.g., as a k8s job). The documentation doesn't address this common use case. There is barebones k8s API reference but no conceptual material or examples. The closest thing I can find is https://docs.prefect.io/core/tutorials/dask-cluster.html which says "Take this example to the next level by storing your flow in a Docker container and deploying it with Dask on Kubernetes using the excellent dask-kubernetes project! Details are left as an exercise to the reader. 😉" Ideally this exercise would not be left to the reader. But beyond that, we don't want to store the entire flow in a single Docker container, rather each task gets its own Docker container since each task has different computation requirements (CPU-heavy vs. RAM-heavy vs. i/o-heavy vs. needing access to a large reference DB vs. ...), also parallel tasks should be able to run on different workers. Please advise. Also: Prefect Cloud sounds appealing as a persistence solution, but do we have the safety net of being able to implement our own persistence - are there API hooks to support that?
    c
    2 replies · 2 participants
  • c

    CJ Wright

    12/09/2019, 10:52 PM
    Hi Walter, these are all excellent questions! I'll be right back with an answer for you
  • a

    Arsenii

    12/10/2019, 11:58 AM
    Hi all, a question regarding triggers and Prefect-specific best practices. I'm writing a dynamically-built flow that basically consists of three tasks: 1. Get the list of items to work with 2. Map a function to the items on that list. The task might raise a
    SKIP
    halfway if some condition is met 3. Map a function to the results of step 2 I'm coming from an airflow background and was expecting a
    none_skipped
    trigger to apply to step 3, with the expected behavior that it would run if and only if step 2 succeeded. But as far as I understood, all triggers in Prefect treat skipping as success, which is not what I want. So my question is, is this designed to defer people from not using Prefect in a non-Prefect way? Should I instead use branching logic? Or is it a known missing feature to be implemented later? This is easily workaround-able with some manual checking logic in step 3 but still. Thanks!
    e
    c
    5 replies · 3 participants
  • d

    DiffyBron

    12/10/2019, 12:25 PM
    Hi, i'm trying to get the output from an upstream task (task_a). How do i so in this example? I want the rand_num to show when it is successful. How can I retrieve it through a flow?
    #!/usr/bin/env python
    
    import random
    
    from prefect.triggers import all_successful, all_failed
    from prefect import task, Flow
    
    @task(name="Task A")
    def task_a():
        rand_num = float(random.random())
        curr_limits = float(0.5)
        if rand_num < curr_limits:
            raise ValueError(f'{rand_num} is less than {curr_limits}')
        return rand_num
    
    @task(name="Task B", trigger=all_successful)
    def task_b():
        pass
    
    @task(name="Task C", trigger=all_failed)
    def task_c():
        pass
    
    if __name__ == '__main__':
        with Flow('My triggers') as flow:
            success = task_b(upstream_tasks=[task_a])
            fail = task_c(upstream_tasks=[task_a])
    
        flow.set_reference_tasks([success])
        flow_state = flow.run()
    j
    9 replies · 2 participants
  • d

    David Ojeda

    12/10/2019, 8:02 PM
    Hello Prefects, so Metaflow (by Netflix) was recently released. I skimmed the docs a bit and there are some similarities with Prefect (like having a DAG), while other completely different approaches (like how flows are defined with the next and join member functions of FlowSpec)… I was wondering what you guys think at Prefect? Competition is always good (IMO) to make things improve on both sides. Are there any functionalities that may be interesting to port to Prefect?
    z
    n
    2 replies · 3 participants
  • d

    DiffyBron

    12/10/2019, 8:30 PM
    Howdy, how do i set the schedule in New York times
    schedules.Schedule(
            # fire every day
            clocks=[schedules.clocks.IntervalClock(timedelta(days=1))],
            # but only on weekdays
            filters=[filters.is_weekday],
            # and only at 8.15am or 3pm
            or_filters=[
                filters.between_times(pendulum.time(hour=8, minute=15), pendulum.time(hour=8, minute=15)),
                filters.between_times(pendulum.time(hour=9, minute=30), pendulum.time(hour=9,minute=30)),
                filters.between_times(pendulum.time(hour=15, minute=50), pendulum.time(hour=15,minute=50)),
                filters.between_times(pendulum.time(hour=16), pendulum.time(hour=16)),
            ],
            not_filters=[
                filters.between_dates(12, 25, 12, 25)
            ]
        )
    z
    4 replies · 2 participants
  • g

    Giang Hoang Le

    12/10/2019, 11:03 PM
    OOP (polymorphism) in Prefect Hi Guys, I would like to ask is there any OOP method in Prefect Framework. I would like to create a base structure for all various methods in different sources. So I would like to ask is this possible to do in Prefect framework? Kind Regards Giang
    j
    18 replies · 2 participants
  • b

    Brett Naul

    12/11/2019, 12:50 AM
    having a weird issue that I haven’t seen anything similar to before: I’m trying to run a
    BigQueryLoadGoogleCloudStorage
    task (which I wrote but that isn’t really helping 😬) and passing in
    schema
    as a list; by the time
    schema
    is used in
    run
    , somehow the order of the list is being altered, causing the columns to be mis-identified. any idea what might be messing with the
    schema
    input to my task? some kind of argument validation or copy maybe…? context: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/tasks/google/bigquery.py#L346-L409
    j
    5 replies · 2 participants
  • i

    itay livni

    12/11/2019, 6:31 AM
    Hi - I ran the example shown in Dynamic DAGs: Task Looping -- the visualization is not showing a loop. Is there a way to overcome this? For clarity and understanding, visualizing loops are important. They should become first class citizens with
    ifelse
    and
    select case
    (IMO) 🙂
    c
    2 replies · 2 participants
  • d

    DiffyBron

    12/11/2019, 9:30 AM
    Hi, I'm trying to make the app say "hello world" when : 1. it is a weekday 2. at 8.15am, 9.30am, 3.50pm, 4pm 3. not during Christmas everyday. 4. In New York timezone Is the following code the right approach?
    #!/usr/bin/env python
    
    import pendulum
    
    from datetime import timedelta
    from prefect import task, Flow, schedules
    from prefect.schedules import filters, Schedule
    
    @task
    def say_hello():
        print("hello world")
    
    if __name__ == '__main__':
        curr_schedule = Schedule(
            # Fire every min
            clocks=[schedules.clocks.IntervalClock(interval=timedelta(minutes=1), start_date=pendulum.datetime(2019, 1, 1, tz='America/New_York'))],
            # Only on weekdays
            filters=[filters.is_weekday],
            # and only at 8.15am, 9.30am, 3.50pm, 4pm
            or_filters=[
                filters.between_times(pendulum.time(hour=8, minute=15), pendulum.time(hour=8, minute=15)),
                filters.between_times(pendulum.time(hour=9, minute=30), pendulum.time(hour=9,minute=30)),
                filters.between_times(pendulum.time(hour=15, minute=50), pendulum.time(hour=15,minute=50)),
                filters.between_times(pendulum.time(hour=16), pendulum.time(hour=16)),
            ],
            # do not run on Christmas
            not_filters=[
                filters.between_dates(12, 8, 12, 25)
            ]
        )
    
        with Flow('Sounds alerts', curr_schedule) as flow:
            say_hello()
    
        flow.run()
    j
    19 replies · 2 participants
  • d

    Dean Magee

    12/12/2019, 12:22 AM
    Two questions... is there a way to trigger a flow to run and pass in specific parameters that change each time? Is there a way to run a flow from inside another flow?
    c
    3 replies · 2 participants
  • p

    Philip Billaudelle

    12/12/2019, 11:44 AM
    Hi 👋 I have been running into the following
    DeprecationWarning
    , which seems to lead to an
    ERROR
    in my prefect Runner:
    ERROR - prefect.FlowRunner | Unexpected error: DeprecationWarning("Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working")
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 482, in get_flow_run_state
        key_states = set(flatten_seq([all_final_states[t] for t in reference_tasks]))
      File "/usr/local/lib/python3.7/site-packages/prefect/utilities/collections.py", line 29, in flatten_seq
        if isinstance(item, collections.Iterable) and not isinstance(
      File "/usr/local/lib/python3.7/collections/__init__.py", line 52, in __getattr__
        DeprecationWarning, stacklevel=2)
    DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
    j
    5 replies · 2 participants
  • w

    Wayne

    12/12/2019, 2:09 PM
    Hello! I'm in the process of building an integration between finance and salesforce. Is there any documentation or examples of working with Prefect and Salesforce? I didn't see salesforce in the task library...
    j
    j
    7 replies · 3 participants
  • a

    Andrew Vaccaro

    12/12/2019, 5:28 PM
    How would I go about using Parameters with the imperative API? I have a few tasks that need to run in order but don't have data dependencies (all in a database already).
    c
    m
    8 replies · 3 participants
  • b

    Braun Reyes

    12/13/2019, 6:12 AM
    I am working on change to the FargateTaskEnvironment to add the ability to manage prefect flow task definitions using task definition revisions instead of just adding a new family everytime. Does anyone on the prefect team know if I can get at the flow_id and version from within the Environment setup code? part of my process involves adding a tag to the task definition that gets created for flow_id and version
    c
    3 replies · 2 participants
  • b

    Braun Reyes

    12/13/2019, 6:14 AM
    trying to do something like this
    flow_id = prefect.context.get("flow_id", "unknown")[:8]
    flow_version = int(prefect.context.get("version", "0"))
  • b

    Braun Reyes

    12/13/2019, 3:41 PM
    Hello, I have gotten this error when executing a flow in fargate
    2019-12-13 1:40am	prefect.CloudFlowRunner	INFO	 Beginning Flow run for 'dbt_run' 
    2019-12-13 1:40am	prefect.CloudFlowRunner	INFO	 Starting flow run. 
    2019-12-13 1:40am	prefect.CloudFlowRunner	ERROR	 Unexpected error: TypeError("start() missing 1 required positional argument: 'self'")
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 400, in get_flow_run_state
        with executor.start():
      File "/usr/local/lib/python3.7/contextlib.py", line 239, in helper
        return _GeneratorContextManager(func, args, kwds)
      File "/usr/local/lib/python3.7/contextlib.py", line 82, in __init__
        self.gen = func(*args, **kwds)
    TypeError: start() missing 1 required positional argument: 'self'
    This was a fargate task started by the fargatetaskenvironment
    j
    25 replies · 2 participants
  • s

    Stuart Young

    12/13/2019, 5:13 PM
    Hi there. I was wondering if it is possible connect two flows. For instance, if a flow completes and all of the terminal tasks are successful, can I get it to trigger another flow?
    i
    2 replies · 2 participants
  • o

    orcaman

    12/15/2019, 11:46 AM
    Hi everybody. Was wondering what's the proper way to implement the following: I would like to stop the entire flow as soon as any task fails (even of there are tasks that do not depend on the failed task). Any suggestion?
    j
    1 reply · 2 participants
  • o

    orcaman

    12/15/2019, 11:48 AM
    I thought I should somehow message the flow from a state change handler to tell it to stop, but I'm not sure how to do this.
  • a

    Aliza Rayman

    12/15/2019, 5:14 PM
    Hi! I've been encountering this problem both on Core and Cloud; when I map a lot of tasks I've been getting an
    lz4 error
    and my whole flow fails. Anyone have advice?
    c
    5 replies · 2 participants
Powered by Linen
Title
a

Aliza Rayman

12/15/2019, 5:14 PM
Hi! I've been encountering this problem both on Core and Cloud; when I map a lot of tasks I've been getting an
lz4 error
and my whole flow fails. Anyone have advice?
c

Chris White

12/15/2019, 5:15 PM
Hi @Aliza Rayman - would you mind posting the full traceback?
a

Aliza Rayman

12/15/2019, 5:18 PM
Untitled
@Chris White here it is
👀 1
c

Chris White

12/15/2019, 5:23 PM
This error suggests that your dask workers do not have the same software installed as the machine from which you are submitting jobs; check out this dask issue for more info: https://github.com/dask/distributed/issues/2605
a

Aliza Rayman

12/15/2019, 5:29 PM
Thank you, I'll check it out!
👍 1
View count: 1