• Rob Fowler

    Rob Fowler

    1 year ago
    I have an issue with LocalDaskExecutor, I have reduced my code down to a simple example. In this example, if I run at tasks, each sleeping for 3 seconds, the flow does not map some of the tasks. I suspect it's my use of a mapped task. The range is simple a list of numbers. What happens is it does not run the 'slow_task' for every item in the list.
    I get: | Flow run FAILED: some reference tasks failed on account of it never scheduling one of the slow_task workers.If there is anything but no load on the machine it fails. If the machine is 1% CPU it works.# python slow.py --range=10 --sleep_time=3
    from time import sleep
    import argparse
    
    from prefect import Flow, Parameter, unmapped, task, context
    from prefect.engine.executors import LocalDaskExecutor
    
    
    @task(timeout=9)
    def slow_task(opts, item, scripts):
        logger = context.get('logger')
        <http://logger.info|logger.info>(f"==== IN TASK {item} Sleeping {opts.sleep_time}")
        sleep(opts.sleep_time)
        <http://logger.info|logger.info>(f"## Awake {item}")
        return item
    
    
    @task
    def produce_range(opts):
        return range(opts.range)
    
    
    with Flow("PS Version") as flow:
        scripts = Parameter('scripts')
        opts = Parameter('opts')
    
        nrange = produce_range(opts)
        results = slow_task.map(item=nrange,
                                scripts=unmapped(scripts),
                                opts=unmapped(opts))
    
    
    if __name__ == '__main__':
    
        parser = argparse.ArgumentParser(description='test pywinrm')
        parser.add_argument('--workers', type=int, default=10)
        parser.add_argument('--sleep_time', type=int, default=2)
        parser.add_argument('--range', type=int, default=10)
    
        opts = parser.parse_args()
    
        executor = LocalDaskExecutor(num_workers=opts.workers)
        flow.run(executor=executor,
                 scripts="hello",
                 opts=opts)
    Rob Fowler
    Chris White
    +2
    38 replies
    Copy to Clipboard
  • Rob Fowler

    Rob Fowler

    1 year ago
    another thread, let's say I can't get the above fixed. I was thinking I can just use a concurrent futures executor. The submit and wait model seems pretty normal and I can submit a thread for execution simply with a concurrent.futures.ThreadPoolExecutor, but the wait needs a bit more than the documentation provides, as it does not pass the 'future' produced by the submit and expects something more complex returned (I can wait on it) is there anyone who can point me to some shortcut on this:https://gist.github.com/mianos/b89e70d23b812930daf115d2b76cf71b
    Rob Fowler
    Michael Adkins
    9 replies
    Copy to Clipboard
  • y

    Yanghui Ou

    1 year ago
    Hi, I am trying to replace a make based work flow with prefect. I was wondering how I can implement a file centric work flow. If an upstream task doesn’t have a return value but generates a file instead, and the downstream task takes that file as input and process that file, what is the best way to specify such dependency? Here’s a simple mock-up for my question:
    class GenerateFile( Task ):
      def run( self ):
        with open( 'result.txt', 'w' ) as f:
          f.write( f'This file is generated by {self.name}.' )
    
    class ProcessFile( Task ):
      def run( self ):
        with open( 'result.txt', 'r' ) as f:
          print( f.read() )
    
    gen_task   = GenerateFile()
    print_task = PrintFile()
    
    with Flow( 'test caching' ) as flow:
      gen_result   = gen_task()
      print_result = print_task( upstream_tasks=[ gen_result ] )
    Is there a better way to do it other than manually set the
    upstream_tasks
    ? Another question is how can I specify the generated file as target such that I get the same caching behavior as make? I tried
    gen_task   = GenerateFile( target='result.txt', checkpoint=True, result=LocalResult( dir='.' ) )
    but it does not seem to work.
    y
    emre
    3 replies
    Copy to Clipboard
  • m

    Marwan Sarieddine

    1 year ago
    Hi dask folks - is there an environment variable I can set for dask-kubernetes not to use a nanny ?
    m
    Michael Adkins
    +1
    15 replies
    Copy to Clipboard
  • raphBL

    raphBL

    1 year ago
    Hi, I had the misfortune on advocating for Prefect against Airflow at a Data-Scientist weekly meeeting, and ended up with a small workshop/demo to do next time. Do you have any material to share with me? In particular, I found your demo with the meteo posted on slack, really cool, if it’s available somewhere. Of course, I’ll start with the Hello-World and I’ll refer theem to the awessome doc 🙌 Wish you all a nice day/night, around the world !
    raphBL
    Dylan
    +1
    9 replies
    Copy to Clipboard
  • Newskooler

    Newskooler

    1 year ago
    Hi 👋, If I want to do a consitional retry of a task based on provided paramter, is this the way to do it: https://docs.prefect.io/core/examples/parameterized_flow.html or is there e a way to define this in this decorator itself (or otherwise)?
    Newskooler
    Dylan
    10 replies
    Copy to Clipboard
  • Newskooler

    Newskooler

    1 year ago
    Hi (again) 👋 , A question in regards to execution order of mapped (some of which fail and retry). I observed that with 1 worker on LocalDaskExecutor, if a task fails during the retry wait time, no other mapped task is executed. Is this the expected behaviour and is it possible to have it such that other mapped tasks keep going until the one waiting to be retried is in this “wait” stage? I read through here (https://docs.prefect.io/core/examples/retries_with_mapping.html) and it seems to not address this.
    Newskooler
    Dylan
    5 replies
    Copy to Clipboard
  • a

    Alberto de Santos

    1 year ago
    Hi community! Can anyone share a (best-practice) script to setup Agents, Flows and create_flow_run? Not the standard, but just a prod-like script, allowing for instance to setup an agent if down, ensure the agnet is working properly and so on…
    a
    Dylan
    60 replies
    Copy to Clipboard
  • Alexander

    Alexander

    1 year ago
    Heya, community 🖖. I've been poking prefect for couple of weeks already, trying to understand how it can be used in production environment. I like the almost cloud native support via docker. But it has its quircks though. The most difficult part in setupping production CI process with prefect is flows registration. I just dont get it. It works nice in a local environment when you run prefect server and agents locally from the same python venv and your code is in one place.1. To register flow, you have to actually execute python file. This means your flow registration environment must be equal to your flow production execution environment. Which gives you no choice but use docker for your production environment. With some CI which do not support docker-in-docker, this makes everything harder. 2. If you have many flows, you have to register all flows one by one, you need to write some script which will register all flows in a folder or maintain a singe script where all flows are registered which needs to maintain. I need to write considerable amount of code to maintain more than 1 flow. 3. Local agent is just not enough for production. If you use LocalAgent, it must run it in flow production environment. If you update flow production environment (added new dependency), you need to restart local agent. But you cant because it may be executing some tasks. 4. Docker agent, this is my favourite. It has its own quircks. For example, i was extremely surprized when i found that it will override some settings in task execution container with its own settings (like logging level). Other thing is again a multi-flow registration. You either have distinct docker storages in every flow object which means 100 flows 100 docker images built, or you have one docker storage for all flows, which means again you must have central flow registration script which will create storage, assign flows to it, build it, then assign storage to flows, then register them. And you need to write this script by yourself. 5. Every time you register you bump a new flow version in UI. If you dont want that, you need to come up with some checks or hash comparisons to undestand if flow is changed and you need a register or not. Again you need to do it yourself. I was able to solve all problems by coming up with this workflow: 1. Build flow production environment docker image 2.
    docker run
    this image and call flows registration script (written myself) 3. In this script, i iterate over all python scripts in flows folder; import this scripts instead of exec approach used in extract_flow_from_file or whatever; put its flow object into a list 4. Create docker storage with desired settings - it uses same production environment dockerfile which used in step #1; add all flows to this storage 5. Build this storage 6. Assign built storage object to all flows 7. Register all flows. I am lucky that all flows are in the same project and have same registration settings (for now). It will be painful to come up with approach to how do per-flow registration customization in such generic script All this required significant experimentation, prefect source code reading (it is magnificent, no jokes. I had a real pleasure reading it). I wish there were best practices put in prefect docs about flow registration and production CI setup. I was curious what are best practices in your prefect community for production flows registration? What your best choice of running tasks? How do you deliver flows source code to prod?
    Alexander
    Dylan
    7 replies
    Copy to Clipboard
  • Alexander

    Alexander

    1 year ago
    I think a solution here could be some functionality in agents to monitor some directory and register flows from it if file is changed. This will solve several problems: flow version will be bumped only when file is changed; no need to write anything yourself; flows registration is streamlined and similar to how airflow scheduler works. you just put flows source code in some directory and dont care. What do you think? I would love to try to implement this myself 😎
    Alexander
    Billy McMonagle
    +1
    4 replies
    Copy to Clipboard