https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Jonah Benton

    08/22/2019, 8:02 PM
    Hi folks, I'm working locally with Prefect core, stringing together a bunch of functions that hit twitter's API- with a screen name, get the user object; get the user's followees; get tweets of the user's followees; get urls mentioned in tweets, etc. Each function is task decorated, and since these are fan out operations each step in the flow uses map on the results of the previous step. Since twitter has strict rate limits I want to cache outputs of each task during testing, so am using the cache_for option in the task decorator, with an hour as the duration...and this does not seem to be working. Every flow run hits the twitter api rather than pulling results from a cache, and I see "cache is now invalid" in the logs: 2019-08-22 19:31:25,037] INFO - prefect.TaskRunner | Task 'auth_name_to_id': Starting task run... [2019-08-22 19:31:25,038] INFO - prefect.TaskRunner | Task 'auth_name_to_id[1]': Starting task run... [2019-08-22 19:31:25,038] WARNING - prefect.TaskRunner | Task 'auth_name_to_id[1]': can't use cache because it is now invalid [2019-08-22 19:31:25,200] INFO - prefect.TaskRunner | Task 'auth_name_to_id[1]': finished task run for task with final state: 'Cached' [2019-08-22 19:31:25,200] INFO - prefect.TaskRunner | Task 'auth_name_to_id[0]': Starting task run... [2019-08-22 19:31:25,200] WARNING - prefect.TaskRunner | Task 'auth_name_to_id[0]': can't use cache because it is now invalid [2019-08-22 19:31:25,358] INFO - prefect.TaskRunner | Task 'auth_name_to_id[0]': finished task run for task with final state: 'Cached' [2019-08-22 19:31:25,359] INFO - prefect.TaskRunner | Task 'auth_name_to_id': finished task run for task with final state: 'Mapped' [2019-08-22 19:31:25,359] INFO - prefect.TaskRunner | Task 'auth_to_friend': Starting task run... [2019-08-22 19:31:25,360] INFO - prefect.TaskRunner | Task 'auth_to_friend[0]': Starting task run... [2019-08-22 19:31:25,360] WARNING - prefect.TaskRunner | Task 'auth_to_friend[0]': can't use cache because it is now invalid [2019-08-22 19:31:25,605] INFO - prefect.TaskRunner | Unexpected error: TwitterError([{'message': 'Rate limit exceeded', 'code': 88}]) [2019-08-22 19:31:25,605] INFO - prefect.TaskRunner | Task 'auth_to_friend[1]': finished task run for task with final state: 'Retrying' .... Should something like this just work? Without looking too closely at the code, the thought that comes to mind is that the functions themselves take dicts, pulling out particular keys, and return lists of dicts. I don't work a lot in python and don't know how it handles equality tests of dicts- is the use of dicts in the task calls potentially screwing up caching?
    j
    c
    • 3
    • 18
  • j

    Jonah Benton

    08/23/2019, 12:54 AM
    Hi folks, same context as my question above. Am realizing I am misunderstanding how task mapping/iterating currently works. On first quick read of the Prefect docs I assumed that given a fan out structure: results1 = task1.map(input_list) results2 = task2.map(results1) results3 = task3.map(results2) where task1 and task2 return lists: I assumed a copy of task2 gets launched for every item in every list produced by task1; and similarly that a copy of task3 gets launched for every item in every list produced by every instance of task2. This doesn't seem to be the case; instead, a copy of task2 is launched for every instance of task1, and a copy of task3 is launched for every instance of task2. This means that task1 gets to process individual items, but tasks2 and 3 have to process lists of items. Is there some simple magic, e.g. by making a custom Task class, where I can achieve the behavior I'm looking for, where individual tasks process individual items, and lists returned from tasks are automatically "flattened" by the Prefect machinery?
    j
    • 2
    • 13
  • m

    Markus Binsteiner

    08/23/2019, 9:26 AM
    Hi there. I'm having problems with a dask 'deserializing' error, but can't for the life of me figure out what's wrong. Pretty sure it's something silly I did, but I have no idea where to start looking. It seems to happen only for the last task, when I want to get to the results, I'm getting a huge error stack, the important bits seem to be:
  • m

    Markus Binsteiner

    08/23/2019, 9:27 AM
    Untitled
  • m

    Markus Binsteiner

    08/23/2019, 9:27 AM
    It somehow seems to try to serialize an object of the prefect "Success" class....
  • m

    Markus Binsteiner

    08/23/2019, 9:29 AM
    Any idea where I could start looking as to what's wrong?
  • m

    Markus Binsteiner

    08/23/2019, 9:30 AM
    I've inserted some debug statements into the cloudpickle source code, and can confirm that it indeed is choking on a "Success" object. I never return one in my tasks, only dicts and a simple class that should be easy to deserialize...
    c
    • 2
    • 4
  • j

    Jerry Thomas

    08/23/2019, 10:22 AM
    Say I want to handle errors by skipping the items with error. I saw https://docs.prefect.io/api/0.5.4/tasks/control_flow.html#filtertask but it doesn’t have an example of how to use it. How do I use FilterTask to be applied for the task results of add so that next task in the flow gets the values that were not skipped?
    def skipped(item):
        return isinstance(item, prefect.engine.signals.SKIP) or isinstance(item, prefect.engine.result.NoResultType)
    
    @task
    def add(x,y)
       try:
           return x / y
       except:
           raise prefect.engine.signals.SKIP
    
    with Flow("filter-skipped") as flow:
          res = add.map([10, 2, 4], [1, 0, 3])
          # res = FilterTask(res)   #
    c
    • 2
    • 1
  • j

    Jerry Thomas

    08/23/2019, 11:31 AM
    This code just hangs after the logger. It seems to have something to do with the fact that I am reading a file.
    python
    def get_data():
        with open("data/data.json","r") as f:
            input = json.load(f)
        return input
    
    @task
    def test(config):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(config)
    
    def main(values):
      with Flow("custom") as flow:
          logger = get_logger("Flow")
          <http://logger.info|logger.info>("flow started")
          <http://logger.info|logger.info>(values)  # this works
          test(values)  # this does not start
    
        flow.run()
    
    main(get_data())
    If I use the flow logger to print the values fetched from a file it works, but the task does not seem to accept the values when it is fetched from a file. If I convert get_data() into a task and run it on a distributed environment where would it read from?
    c
    • 2
    • 7
  • b

    Brad

    08/23/2019, 11:34 AM
    (Copying this from caching thread above) Hi guys, has there been any update to the caching mechanisms in the past few weeks? I’m interested in persisting results across runs to a local (or remote) cache. I would also like to be able to update a tasks status to cached based on the input parameters. I’ve been playing around with the
    state_handlers
    and
    trigger
    hooks but I can’t seem to achieve what I want
    c
    • 2
    • 4
  • d

    Dmitry Dorofeev

    08/24/2019, 9:58 PM
    Hi folks, If I created periodic workflow to run every hour, how can I stop it ? Assume I run in dask.
    c
    j
    • 3
    • 6
  • m

    Mikhail Akimov

    08/24/2019, 11:26 PM
    Hi! When flow/task is retrying, in, say, 1 minute, it's scheduled_start_time in context is +1 minute from the failed one. I'm using scheduled_start_time kind of how I would use DagRun's timestamp key in Airflow (in an SQL template) so this behaviour isn't helpful. Am I doing it wrong? Can you get the original task's metadata/context?
    j
    • 2
    • 26
  • a

    Akshay Verma

    08/25/2019, 4:48 PM
    Hi, How can we combine the logging of
    Prefect
    with the logging of the tasks it is running?
    c
    • 2
    • 1
  • b

    Braun Reyes

    08/26/2019, 5:42 AM
    hello...newbie question...why doesn't flow failure return a non zero exit code. If one were to run this self contained inside of a docker container without using prefect cloud to start...how do you get the process to exit the docker run with error
    c
    j
    • 3
    • 8
  • g

    Gopal

    08/26/2019, 12:56 PM
    Hi, I am just starting to use Prefect for workflow management ( migrating from Celery). I have few basic questions, 1) Is it possible to run a task asynchronously just like some_celery_task.delay()? 2) Is it possible to deploy Prefect flows without using Prefect cloud server? 3) For periodic tasks using Prefect schedules, it appears that flow.run(schedule=periodic_schedule) is a blocking call. Is there something like celery periodic task which gets triggered automatically in the background? Thanks
    j
    c
    • 3
    • 13
  • m

    Michael Adkins

    08/26/2019, 3:29 PM
    Hi! I'm exploring prefect for genomics on a kubernetes cluster. Unforunately, genomics involves a lot of outside tooling which needs to be called from the shell. It looks like both prefect and dask have poor support for managing files and shell applications as they are designed for in-memory python datasets. Additionally, it appears that there's not a straightforward way to specify containers for specific tasks (e.g. we have a container with tooling installed) or to create file system resource requirements for a worker. Just want to check in if my understanding is correct or if prefect could be extended to this use-case.
    c
    j
    • 3
    • 13
  • k

    Ke Qiang

    08/26/2019, 4:10 PM
    Hi, Do you think is a good time to consider using
    prefect
    in the product environment now? Is there some famous success use case?
    j
    • 2
    • 3
  • a

    An Hoang

    08/26/2019, 8:51 PM
    Hi, to read all the related material on dask's interaction with prefect, do we just search for
    dask
    in this channel and then issues on github? Is there going to be a Dask-Prefect FAQ page to synthesize all the common problems/tips and tricks when using both?
    c
    • 2
    • 12
  • f

    Feliks Krawczyk

    08/27/2019, 5:26 AM
    Hey there, was wondering if someone who knows about prefect can let me know if prefect would actually work in my particular use-case. Currently I’m using Airflow but I’m convinced it’s scheduler is broken in terms of what I want to do, and I don’t believe I’d ever have the time to try fix it. Context: - I have a service where I allow people to submit a bunch of parameters that I convert into Airflow dags. - Essentially every DAG is the same as it’s all templated, and they aren’t particular complicated. 95% of them are 3 operators: Start (send a message) -> Send a spark-submit job to EMR or Databricks -> Finish (send a Success / Fail message) ** In more complicated cases the I let people chain multiple send spark-submit jobs, but like I said its all templated. - I have ~2000 DAGs, and it’s likely to grow, my service is “self-service” so I let people create / delete / edit as much as they please. So I let DAGs be mutable. Airflow has done a pretty good job so far, but it’s creaking and adding hours of over head to scheduled dags. It’s got plenty of provisioned resources, but most of them sit idle instead of actually processing. For clarity: I have provisioned it to run up to 128 tasks at once over 8 workers (8 workers *16 parallel tasks). Yet it will only ever reach that peak when it’s got over 200 DAGs scheduled (think 00 UTC). However, when I have 70 DAGs scheduled to run, it’ll crawl through maybe 8-15 a time, instead of using all the resources, so my DAG queue is forever, instead of depleting. It makes backfills incredibly slow. For someone to backfill something over a years time at daily intervals will take over a week. What I would of expected is that Airflow will use all the resources at its disposal, but alas it does not and I’m frustrated with it. In terms of the architecture, I have 8 worker nodes and 1 node for the scheduler and webserver. I use celery and redis as my message queue and broker. Currently I use EFS to keep DAGs in sync across all workers. My question is.. is prefect ready for a use-case like mine? I need: - A scheduler / workers that will run at capacity that they are allotted to - Something that can handle 1000s of DAGs - Allow easily mutabilty of DAGs, i.e changing schedules / adding and deleting tasks I don’t need all of Airflow’s bells and whistles. My operators are currently extensions of the BashOperator / DatabricksOperator and some simple Python functions. Happy to answer questions. I am really keen to PoC (proof of concept) Prefect, but I want to know if its ready for me.
    j
    c
    • 3
    • 10
  • a

    Akash

    08/28/2019, 10:03 AM
    Hi, my team is currently looking at alternatives to Airflow for our workflow management needs. Couple of high-level pain-points for us are Airflow's scheduler and low flexibility in terms of components such as the metastore DB. Apart from Prefect, Pachyderm comes to mind as an alternative. Can someone experienced with both Prefect and Pachyderm weigh the two against each other? Or maybe provide a resource that may help?
    m
    j
    • 3
    • 5
  • h

    hayssam

    08/29/2019, 12:06 AM
    Hi, just getting started evaluating Prefect, looking good so far, as simple and expressive as needed to be useable 😉 Quick noob question about the flow semantics: Suppose I define this ETL like flow :
    @task
    def parse_file():
    [...]
    
    @task(skip_on_upstream_skip=False)
    def compute_aggregates_from_db():
    [...]
    
    with Flow('ETL ISM') as flow:
        e = parse_file() 
        t = process_dataframe(e) 
        l = store_dataframe_in_db(t, table_name="test_prefect") 
        ifelse(should_refresh_table("test_prefect"), e, Constant("No need"))
        statistics = compute_aggregates_from_db(upstream_tasks=[l])  
    flow.run()
    The flow is expected to output the statistics at every run. Suppose that I want to conditionally run the [e,t,l] part of the flow only if the file has been modified since than the last insertion in postgres. Rows in postgres are timestamped, and
    should_refresh_table
    indicate whether an upade is required or not. Should I : a. Perform the check in
    store_dataframe_in_db
    and raise a
    prefect.engine.signals.SKIP
    accordingly => I can avoid the
    l
    part of the flow, but
    e
    and
    t
    are still executed b. Add a
    prefect.tasks.control_flow.conditional.ifelse
    on
    l
    : same result c. Add a
    prefect.tasks.control_flow.conditional.ifelse
    on
    e
    : all its downstreams are skipped, which is the desired behavior Is this the approach you would recommend ? What was a bit surprising for me is that the condition is applied on the upstream
    e
    task, while I (wrongly) tried to condition the downstream
    l
    task, expecting that all upstreams are skipped as they are not needed.
    j
    • 2
    • 3
  • f

    Feliks Krawczyk

    08/29/2019, 1:57 AM
    Sorry this might be a silly question, but still coming from the Airflow world. In your documentation:
    from prefect import task, Flow
    
    @task
    def create_cluster():
        cluster = create_spark_cluster()
        return cluster
    
    @task
    def run_spark_job(cluster):
        submit_job(cluster)
    
    @task
    def tear_down_cluster(cluster):
        tear_down(cluster)
    
    
    with Flow("Spark") as flow:
        # define data dependencies
        cluster = create_cluster()
        submitted = run_spark_job(cluster)
        tear_down_cluster(cluster)
    
        # wait for the job to finish before tearing down the cluster
        tear_down_cluster.set_upstream(submitted)
    Wouldn’t you also need to add
    submitted.set_upstream(cluster)
    to ensure this works? Or is there some hidden logic that means when you define a flow that it runs the lines in order, so
    submitted
    runs only after
    cluster
    ? If that is the case, how do you get tasks to run in parallel?
    c
    • 2
    • 8
  • f

    Feliks Krawczyk

    08/29/2019, 1:57 AM
    Sorry this question may be answered in further documentation… But I’m going through each doco page 1 by 1
  • f

    Feliks Krawczyk

    08/29/2019, 1:57 AM
    And this isn’t clear to me yet
  • e

    emre

    08/29/2019, 11:34 AM
    Hello everyone, I started getting a warning for each task in any of my flows, logging: “can’t use cache because it is now invalid” The odd thing is that I am not using caches at all. This warning should be triggered
    if task.cache_for is not None
    according to the
    task_runner.py
    source code. I double checked that all my tasks have a
    cache_for
    value of
    None
    . I’ve been away from Prefect for about a month and don’t remember getting this warning at all, And I haven’t changed versions. Has anyone experienced a similar issue?
    j
    • 2
    • 6
  • a

    An Hoang

    08/29/2019, 12:48 PM
    Quick question: If I build a flow that takes all the csvs in a directory and generates a report, is there anyway that Prefect could detect a new csv being added and rerun the pipeline? Or is scheduling it to do every couple of hours the way to go(not reactive). Also, how would you structure and link parameter file with each csv (maybe some csvs share the same parameter file) to parameterize the Flow to be processed a certain way?
    j
    • 2
    • 4
  • j

    jazzydag

    08/29/2019, 3:42 PM
    Hi everyone! My name is Damien. I'm a Data Scientist and I'm quite familiar with luigi https://github.com/spotify/luigi (I contributed once to fix a tiny bug). prefect seems fun and reliable. I'll try it. Maybe I would have some questions
    💯 2
    🎉 1
    a
    j
    • 3
    • 3
  • k

    Kyle Foreman (Convoy)

    08/29/2019, 5:16 PM
    any advice on testing Retry handlers? for something that's hard for me to make intentionally fail but I want to see how the retry handler behaves is there a convenient way to set the state to failed manually?
    c
    j
    k
    • 4
    • 34
  • g

    Gopal

    08/30/2019, 3:28 AM
    Is it possible to have dependencies btw two or more flows in Prefect? basically a Flow/s inside Flow
    c
    • 2
    • 1
  • d

    Derek Izuel

    08/30/2019, 7:59 PM
    Hello, I am trying to use the 'gmail_notifier' utility to send me a message when my process fails. I used the code from the example as a state handler for an individual task. just like this: @task(state_handlers=[gmail_notifier(only_states=[Failed])]) and I get the following error: NameError: name 'Failed' is not defined I have tried many different combinations of state classes and ignore/only parameters - I get the same message. Thank you.
    c
    • 2
    • 3
Powered by Linen
Title
d

Derek Izuel

08/30/2019, 7:59 PM
Hello, I am trying to use the 'gmail_notifier' utility to send me a message when my process fails. I used the code from the example as a state handler for an individual task. just like this: @task(state_handlers=[gmail_notifier(only_states=[Failed])]) and I get the following error: NameError: name 'Failed' is not defined I have tried many different combinations of state classes and ignore/only parameters - I get the same message. Thank you.
c

Chris White

08/30/2019, 8:09 PM
Hi @Derek Izuel if you haven’t imported the
Failed
state object, then python doesn’t know what you’re referring to. You’ll need to do:
from prefect.engine.state import Failed
d

Derek Izuel

08/30/2019, 8:28 PM
totally missed that - thank you!
c

Chris White

08/30/2019, 8:29 PM
anytime!
View count: 2