Milton Tavares Neto
04/30/2021, 3:14 PMJenny
04/30/2021, 3:17 PMJenny
04/30/2021, 3:18 PMMilton Tavares Neto
04/30/2021, 3:18 PMKevin Kho
Flow as the convert_flow variable in the with statement. flow.executor should probably be convert_flow.executor and you should return conver_flow probably.Milton Tavares Neto
04/30/2021, 3:18 PMSCHEMA = {...}
@task(timeout=10800)
def to_parquet(dumps_path, status_table, blocksize="128MB"):
json_files_path = os.path.join(dumps_path, 'dump', status_table, '*')
bag = db.read_text(json_files_path, blocksize=blocksize).map(convert_to_json)
df = bag.to_dataframe(meta=SCHEMA)
parquet_path = os.path.join(dumps_path, 'prefect_parquet', status_table)
<http://logger.info|logger.info>(f"Saving status table as parquet @ {parquet_path}")
df.to_parquet(parquet_path, compression='snappy', schema='infer', engine='pyarrow-dataset')
def convert_to_parquet_flow():
with Flow('ConvertToParquet') as convert_flow:
dumps_path = Parameter('dumps_path', default="<s3://kdt-staging-dynamodbexport-test>")
status_table = Parameter('status_table', default='STATUS_TABLE')
blocksize = Parameter('blocksize', default="256MB")
to_parquet(dumps_path, status_table, blocksize)
convert_flow.executor = DaskExecutor()
convert_flow.run_config = UniversalRun()
return convert_flowKevin Kho
Kevin Kho
Milton Tavares Neto
04/30/2021, 3:20 PMMilton Tavares Neto
04/30/2021, 3:21 PMKevin Kho
with Flow is a deferred execution that happens in runtime. task is also deferred, but the function is not so we may be running into problems.Milton Tavares Neto
04/30/2021, 3:31 PMUnexpected error: KilledWorker('to_parquet-f5821bf3359e41ac931067976aea0a97', <Worker '<tcp://172.32.0.48:43493>', name: 0, memory: 0, processing: 1>)
Scheduler and worker logs don't help eitherKevin Kho
task.run . Is that running on local?Milton Tavares Neto
04/30/2021, 3:34 PMMilton Tavares Neto
04/30/2021, 3:34 PMKevin Kho
Milton Tavares Neto
04/30/2021, 3:35 PMKevin Kho
Milton Tavares Neto
04/30/2021, 3:36 PMMilton Tavares Neto
04/30/2021, 4:10 PMMilton Tavares Neto
04/30/2021, 4:12 PMKevin Kho
Milton Tavares Neto
04/30/2021, 4:19 PMKevin Kho
Milton Tavares Neto
04/30/2021, 4:28 PMfrom dask.distributed import Client
c = Client()
to_parquet.run(...)Milton Tavares Neto
04/30/2021, 4:29 PMKevin Kho
Milton Tavares Neto
04/30/2021, 4:38 PMMilton Tavares Neto
04/30/2021, 4:38 PMMilton Tavares Neto
04/30/2021, 4:39 PMMilton Tavares Neto
04/30/2021, 4:40 PMMilton Tavares Neto
04/30/2021, 4:43 PMMilton Tavares Neto
04/30/2021, 4:45 PMKevin Kho
task.run ? Anyway, I think to utilize the other 3 workers, you need a slight refactor such that you can map your function across some list. This will allow Prefect to distribute the task to the workers. Maybe apply your logic by breaking up the list of files into smaller lists and passing those to a task that can be mapped.Kevin Kho
Milton Tavares Neto
04/30/2021, 5:05 PMKevin Kho
Kevin Kho
Prefect map, the task sets serialized and sent to the workers. Each Dask worker already contains an independent process. When you run your code with Prefect, it already runs in one worker. This means that your Dask code is already running inside one worker as a local process, which is why it does not distribute the operation. You will either need to refactor to independent processes to leverage the Prefect map, and then combine the results in a later task or you will need to use a worker client to resubmit the Dask operations back to the Dask scheduler to get your operations parallelized with DaskMilton Tavares Neto
04/30/2021, 6:08 PMMilton Tavares Neto
04/30/2021, 6:09 PMMilton Tavares Neto
04/30/2021, 6:10 PMKevin Kho
Milton Tavares Neto
04/30/2021, 6:11 PMKevin Kho
Kevin Kho
Milton Tavares Neto
04/30/2021, 6:18 PMKevin Kho