Thread
#prefect-community
    Darragh

    Darragh

    2 years ago
    Hey guys, question around getting Parameter values and creating the execution environment for a flow. I need to use an input Parameter as a config value for creating a number of Dask workers, but it seems like I can’t get at the value till runtime. Sample code below. Any thoughts on how to get around it?
    def main(registry, name, tag, base_image, env_vars, python_dependencies, execution):
        worker_count = 1
    
        with Flow("Parallell Orchestrator") as flow:
            bucket_name = Parameter("bucket_name", default="test-bucket")
            data_folder = Parameter("data_folder", default="data/parallellTest")
            collector_flow = Parameter("collector_flow", default="Parallell Test")
            partition_count = Parameter("partition_count", default=1)
            input_file = Parameter("input_file", default="data/parallellTest/output/results.jsonl")
    
            output_folder = data_folder + "/output"
    
            ############# THE MAGIC LINE #############
            worker_count = partition_count
            ##########################################
     
            # Run Partition Flow
            partitions = partition_json_data(bucket_name, input_file, data_folder, partition_count)
    
            # Run Parallell
            final_states = trigger_subflow.map(partitions, unmapped(bucket_name), unmapped(collector_flow))
    
            #  Run Collation Flow
            collate_results(bucket_name, output_folder, final_states)
    
        flow.storage = Docker(registry_url=registry, image_name=name, image_tag=tag, env_vars=env_vars,
                              base_image=base_image, python_dependencies=python_dependencies, prefect_version="master")
    
        flow.environment = LocalEnvironment(executor=DaskExecutor(n_workers=worker_count))
    j

    josh

    2 years ago
    Unfortunately not, as it stands right now Parameter values are only used at runtime and the environment is instantiated at initialization. There are some planned enhancements coming down the pipe with regards to more environment customization / kwargs overriding at runtime that may be related to this. For now, environment callbacks may be able to satisfy what you’re after https://docs.prefect.io/orchestration/execution/overview.html#environment-callbacks where you can provide an
    on_start
    function that computes the amount of needed workers and sets it on the environment’s executor
    Darragh

    Darragh

    2 years ago
    Thanks Josh, I’ll have a read and see what comes out of it!