https://prefect.io logo
#prefect-community
Title
# prefect-community
d

Darragh

06/30/2020, 10:05 AM
Hey guys, question around getting Parameter values and creating the execution environment for a flow. I need to use an input Parameter as a config value for creating a number of Dask workers, but it seems like I can’t get at the value till runtime. Sample code below. Any thoughts on how to get around it?
Copy code
def main(registry, name, tag, base_image, env_vars, python_dependencies, execution):
    worker_count = 1

    with Flow("Parallell Orchestrator") as flow:
        bucket_name = Parameter("bucket_name", default="test-bucket")
        data_folder = Parameter("data_folder", default="data/parallellTest")
        collector_flow = Parameter("collector_flow", default="Parallell Test")
        partition_count = Parameter("partition_count", default=1)
        input_file = Parameter("input_file", default="data/parallellTest/output/results.jsonl")

        output_folder = data_folder + "/output"

        ############# THE MAGIC LINE #############
        worker_count = partition_count
        ##########################################
 
        # Run Partition Flow
        partitions = partition_json_data(bucket_name, input_file, data_folder, partition_count)

        # Run Parallell
        final_states = trigger_subflow.map(partitions, unmapped(bucket_name), unmapped(collector_flow))

        #  Run Collation Flow
        collate_results(bucket_name, output_folder, final_states)

    flow.storage = Docker(registry_url=registry, image_name=name, image_tag=tag, env_vars=env_vars,
                          base_image=base_image, python_dependencies=python_dependencies, prefect_version="master")

    flow.environment = LocalEnvironment(executor=DaskExecutor(n_workers=worker_count))
j

josh

06/30/2020, 11:49 AM
Unfortunately not, as it stands right now Parameter values are only used at runtime and the environment is instantiated at initialization. There are some planned enhancements coming down the pipe with regards to more environment customization / kwargs overriding at runtime that may be related to this. For now, environment callbacks may be able to satisfy what you’re after https://docs.prefect.io/orchestration/execution/overview.html#environment-callbacks where you can provide an
on_start
function that computes the amount of needed workers and sets it on the environment’s executor
d

Darragh

06/30/2020, 1:36 PM
Thanks Josh, I’ll have a read and see what comes out of it!