• a

    Avi A

    2 years ago
    Dask Scheduler: I’m using a
    LocalDaskScheduler
    with the default
    scheduler=threads
    but it only runs one task at a time (mapped tasks). Am I missing some extra argument that allows tasks to run concurrently?
    a
    1 replies
    Copy to Clipboard
  • Pedro Machado

    Pedro Machado

    2 years ago
    Hi everyone. I am trying to get a better understanding of mapped tasks and memory implications/best practices when using a large number of them. I have a pipeline that 1) queries a DB for a list of app IDs. It usually gets about 25k 2) Calls an API for each app N times where N is currently 14 different countries. The json response is not too big for each app. A single dictionary with several columns. 3) combines and stores the output of all tasks in s3 This is currently implemented in Airflow with one branch per country. Each branch queries the API repeatedly for each app and stores the results in a single file for each country. If the task fails, all apps need to be reprocessed. What I am wondering is: * If I create a mapped task that gets the list of apps + countries, does it create all the 350k (14 * 25k) child tasks in memory at once and they are put in some sort of queue or are they lazily created? * I suppose that if I did nothing special regarding caching the results to an external system like s3, it would hold all the data in memory until it gets to the reducer task that dumps the output to a file. This may require a lot of memory because the reducer won't start until all children finish. Correct? * Would this be alleviated if I use caching to s3? Would the memory be released once each task results are persisted to s3? * Each child task output would be pretty small and it seems that having that may s3 files with a little data in each is not great. Would you recommend that instead of having the child task process a single app, it processes a small batch of apps say 50? * I suppose there is no garbage collection on persisted results. Is the recommendation to use s3 life cycle rules to clear old task outputs? Thanks!
    Pedro Machado
    Chris White
    +1
    6 replies
    Copy to Clipboard
  • a

    Avi A

    2 years ago
    Hey community! I’m having a problem with
    LocalDaskExecutor
    . I keep getting the following error messages, which are probably related:
    Error message: can't start new thread
    Error message: 'DummyProcess' object has no attribute 'terminate'
    BlockingIOError: [Errno 11] Resource temporarily unavailable
    a
    a
    4 replies
    Copy to Clipboard
  • jars

    jars

    2 years ago
    Quick question - docs (https://docs.prefect.io/api/latest/environments/storage.html#docker) say the Docker parameter:
    secrets(List[str], optional)
    a list of Prefect Secrets which will be used to populate 
    prefect.context
     for each flow run.
    Used primarily for providing authentication credentials.
    It's not clear how I can access these secrets inside my flow. I've tried, exploring the
    prefect.context
    object, but can't seem to find anything. Any examples or guidance?
    jars
    Chris White
    5 replies
    Copy to Clipboard
  • s

    Sumant Agnihotri

    2 years ago
    Hi all. I'm new to Prefect (and development). I had a doubt. I understand that one can use prefect to set up flows that run tasks in a particular order among other things. Does it also implement queueing? So far in my projects I've only used redis-celery to run big tasks in a queue. PS, I've mostly worked in web dev.
    s
    Jeremiah
    4 replies
    Copy to Clipboard
  • a

    Avi A

    2 years ago
    Core UI - how do I stop a running flow? I couldn’t find any reference to it, and the only way for me to do this right now is to hard kill the agent itself. Any way of doing so from the UI / CLI?
    a
    nicholas
    +1
    6 replies
    Copy to Clipboard
  • m

    Max Lei

    2 years ago
    Hi all, is there a recommended approach for objects that cannot be pickled? For example in a machine learning pipeline where you need to do some ETL. Some of the steps require tools that have objects that cannot be pickled so I imagine I might put all those items in one step (SFrames, Spark, etc). Then pipe the results to a CSV file, and the next step we can train a model using the file location of the CSV file, but then I may want one more step to create metrics and send the metrics to MLFlow, but it may be possible that the model cannot be pickled easily, is prefect the wrong tool for this job? But I also would like a cronjob and a UI to look at the results. It seems that I may need to engineer the problem around Prefect vs a single script and Linux crontab. Any suggestions?
    m
    Jeremiah
    +1
    4 replies
    Copy to Clipboard
  • r

    Richard Pitre

    2 years ago
    I have the same task running at different node of my graph. Now I have a task that must run after one of them is finish. How can I setup this dependency. I am using the Flow framework
    r
    1 replies
    Copy to Clipboard
  • p

    philip

    2 years ago
    I want to get my Parameter at state_handler, It is OK on local test. But it have error on prefect server like this:
    Exception raised while calling state handlers: KeyError(<Parameter: fisrt_num>,)
    how can i solve this problem? Here is my code
    @task
    def add(x, y):
        return x+y
    
    def post_to_slack(task, old_state, new_state):
        if new_state.is_failed():
            msg = "Task {0} finished in state {1}".format(task, new_state)
            
        if new_state.is_finished():
            p = new_state.result[num].result
            print(p)
            
        return new_state
    
    with Flow("Tutorial", state_handlers=[post_to_slack]) as flow:
        x = Parameter("fisrt_num")
        y = Parameter("Second_num")
    
        num = add(x,y)
    p
    Chris White
    2 replies
    Copy to Clipboard
  • Arsenii

    Arsenii

    2 years ago
    A kind of a general question to the dev team: are there any plans to allow more granular control over flow execution rights? For example, some of the flows I made are exclusively run automatically by a script (through
    client.create_flow_run(_version_group_id_=...)
    and there's a chance to screw up some important data if any user tries to run it manually. Another use case I can think of is letting certain members of a team access only specific projects. Thanks!
    Arsenii
    1 replies
    Copy to Clipboard