• i

    itay livni

    2 years ago
    Hi - In a
    Flow
    I am looking to: (1) run a set of
    tasks
    (2) do something with the result in another taski.e. munge data. What is the suggested way to implement this? Is it using
    set_upstream
    ? the example I see
    say_hello.set_upstream(second_add, flow=flow)
    waits for a specific task to finish not a set of tasks. Does this get handled under the hood?
    i
    Chris White
    2 replies
    Copy to Clipboard
  • k

    Kristoffer Sahlin

    2 years ago
    Hi, I have a recursive workflow problem that needs to be parallelized. Is it possible to handle it with prefect? Specifically, I have n batches of input labeled 1,...,n, and a function merge_consecutive_batches. Assume that n is a power of 2. The function merge_consecutive_batches takes two consecutive batches, 1 and 2, 3 and 4, etc, and produces merged output for each pair, labeled say, 1_2, 3_4, … Now I want to apply merge_consecutive_batches recursively until I'm down to a single output batch. For example, in the second iteration, merge_consecutive_batches will take as input the two consecutive batches 1_2 and 3_4 and produce the output batch 1_2_3_4. Naturally, the jobs producing 1_2 and 3_4 have to be completed before starting this job (and they have to be run in parallel for scalability reasons). It there a solution with prefect for this scenario?
  • j

    Jeff Yun

    2 years ago
    Hi! Given an unrun flow, what's the best way to get the inputs passed into the tasks (like the output of
    task_runner.get_task_inputs(state, upstream_states)
    but without having to run the flow)?
    j
    Chris White
    14 replies
    Copy to Clipboard
  • d

    dmo

    2 years ago
    Hello! I'm looking into Prefect and have a hard time finding a good way to handle shared code. Let's say I have utility function in my_utility.py:
    def get_answer_to_everything():
        return 42
    And then I define my flow in one_of_many_flows.py:
    import my_utility
    from prefect import Flow, task 
    
    
    @task
    def run_imported_function():
        return my_utility.get_answer_to_everything()
    
    
    with Flow("Imported function") as flow1:
        run_imported_function()
    flow1.run() # 42
    flow1.run(executor=DaskExecutor(address=IP)) # ModuleNotFoundError: No module named 'my_utility'
    I know this is due to how the serialization works in Python. However, Dask provides a way to upload files to workers through Executor.upload_file:https://stackoverflow.com/questions/39295200/can-i-use-functions-imported-from-py-files-in-dask-distributed However, I have found no way to perform anything similar through the Prefect API. Am I looking in the wrong place or is this use case out of scope?
    d
    Chris White
    +1
    14 replies
    Copy to Clipboard
  • c

    Chris Eberly

    2 years ago
    hello again, one more question, is there a way to “yield” a batch of results to start feeding to a mapping process? the example would be, in your guys’ x-files tutorial, if we got say 10 episode urls at a time, can we start the scraping process asynchronously and then collect the results at the end? apologies if i’m missing something obvious, or if this is not a good use case
    c
    Chris White
    6 replies
    Copy to Clipboard
  • i

    itay livni

    2 years ago
    Hi - Is it possible to have a class with x number of methods as tasks?
    i
    Chris White
    5 replies
    Copy to Clipboard
  • i

    itay livni

    2 years ago
    Hi - A couple of general questions: (1) What is the use case for an imperative flow vs. functional flow? (2) When is mixing the two appropriate?
    i
    Chris White
    3 replies
    Copy to Clipboard
  • j

    John Faucett

    2 years ago
    Hi, is there any more document on parameters? other than here: https://docs.prefect.io/core/concepts/parameters.html, I’m having a confusing time getting them to work especially when I need multiple
    j
    Chris White
    20 replies
    Copy to Clipboard
  • j

    John Faucett

    2 years ago
    Really helpfull would be an imperative example
  • a

    An Hoang

    2 years ago
    I'm trying to run this example: https://docs.prefect.io/core/tutorials/dask-cluster.html with my HPC Dask cluster. I get this error:
    [2019-10-21 21:17:43,141] INFO - prefect.FlowRunner | Beginning Flow run for 'dask-example'
    [2019-10-21 21:17:43,144] INFO - prefect.FlowRunner | Starting flow run.
    [2019-10-21 21:17:43,145] ERROR - prefect.FlowRunner | Unexpected error: ValueError("Unexpected keyword arguments: ['processes', 'silence_logs']")
    Traceback (most recent call last):
      File "/lab/corradin_data/FOR_AN/anaconda3/envs/learn-new-pkgs/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/lab/corradin_data/FOR_AN/anaconda3/envs/learn-new-pkgs/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 393, in get_flow_run_state
        with executor.start():
      File "/lab/corradin_data/FOR_AN/anaconda3/envs/learn-new-pkgs/lib/python3.7/contextlib.py", line 112, in __enter__
        return next(self.gen)
      File "/lab/corradin_data/FOR_AN/anaconda3/envs/learn-new-pkgs/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 74, in start
        self.address, processes=self.local_processes, **self.kwargs
      File "/lab/corradin_data/FOR_AN/anaconda3/envs/learn-new-pkgs/lib/python3.7/site-packages/distributed/client.py", line 649, in __init__
        "Unexpected keyword arguments: {}".format(str(sorted(kwargs)))
    ValueError: Unexpected keyword arguments: ['processes', 'silence_logs']
    [2019-10-21 21:17:43,149] ERROR - prefect.Flow: dask-example | Unexpected error occured in FlowRunner: ValueError("Unexpected keyword arguments: ['processes', 'silence_logs']")
    conda versions:
    # Name                    Version                   Build  Channel
    dask                      2.6.0                      py_0    conda-forge
    dask-core                 2.6.0                      py_0    conda-forge
    dask-glm                  0.2.0                      py_1    conda-forge
    dask-jobqueue             0.6.3                      py_0    conda-forge
    dask-ml                   1.0.0                      py_1    conda-forge
    
    # Name                    Version                   Build  Channel
    prefect                   0.6.6                      py_0    conda-forge
    cluster configuration:
    python
    from dask_jobqueue import LSFCluster
    cluster = LSFCluster(queue='all_corradin',
                             cores= 47,
                             #processes = 2,
                             walltime='5000:00',
                             memory='250GB',
                             local_directory = "/tmp",
                             job_extra=['-o /dev/null', '-e /dev/null'],
                             scheduler_port = 8786,
                             worker_dashboard_address = 8788
                             )
    cluster.scale(17)
    a
    Chris White
    4 replies
    Copy to Clipboard