Darragh
06/30/2020, 10:05 AMdef main(registry, name, tag, base_image, env_vars, python_dependencies, execution):
worker_count = 1
with Flow("Parallell Orchestrator") as flow:
bucket_name = Parameter("bucket_name", default="test-bucket")
data_folder = Parameter("data_folder", default="data/parallellTest")
collector_flow = Parameter("collector_flow", default="Parallell Test")
partition_count = Parameter("partition_count", default=1)
input_file = Parameter("input_file", default="data/parallellTest/output/results.jsonl")
output_folder = data_folder + "/output"
############# THE MAGIC LINE #############
worker_count = partition_count
##########################################
# Run Partition Flow
partitions = partition_json_data(bucket_name, input_file, data_folder, partition_count)
# Run Parallell
final_states = trigger_subflow.map(partitions, unmapped(bucket_name), unmapped(collector_flow))
# Run Collation Flow
collate_results(bucket_name, output_folder, final_states)
flow.storage = Docker(registry_url=registry, image_name=name, image_tag=tag, env_vars=env_vars,
base_image=base_image, python_dependencies=python_dependencies, prefect_version="master")
flow.environment = LocalEnvironment(executor=DaskExecutor(n_workers=worker_count))
josh
06/30/2020, 11:49 AMon_start
function that computes the amount of needed workers and sets it on the environment’s executorDarragh
06/30/2020, 1:36 PM