Milton Tavares Neto
04/30/2021, 3:14 PMJenny
04/30/2021, 3:17 PMJenny
04/30/2021, 3:18 PMMilton Tavares Neto
04/30/2021, 3:18 PMKevin Kho
Flow
as the convert_flow
variable in the with
statement. flow.executor
should probably be convert_flow.executor
and you should return conver_flow
probably.Milton Tavares Neto
04/30/2021, 3:18 PMSCHEMA = {...}
@task(timeout=10800)
def to_parquet(dumps_path, status_table, blocksize="128MB"):
json_files_path = os.path.join(dumps_path, 'dump', status_table, '*')
bag = db.read_text(json_files_path, blocksize=blocksize).map(convert_to_json)
df = bag.to_dataframe(meta=SCHEMA)
parquet_path = os.path.join(dumps_path, 'prefect_parquet', status_table)
<http://logger.info|logger.info>(f"Saving status table as parquet @ {parquet_path}")
df.to_parquet(parquet_path, compression='snappy', schema='infer', engine='pyarrow-dataset')
def convert_to_parquet_flow():
with Flow('ConvertToParquet') as convert_flow:
dumps_path = Parameter('dumps_path', default="<s3://kdt-staging-dynamodbexport-test>")
status_table = Parameter('status_table', default='STATUS_TABLE')
blocksize = Parameter('blocksize', default="256MB")
to_parquet(dumps_path, status_table, blocksize)
convert_flow.executor = DaskExecutor()
convert_flow.run_config = UniversalRun()
return convert_flow
Kevin Kho
Kevin Kho
Milton Tavares Neto
04/30/2021, 3:20 PMMilton Tavares Neto
04/30/2021, 3:21 PMKevin Kho
with Flow
is a deferred execution that happens in runtime. task
is also deferred, but the function is not so we may be running into problems.Milton Tavares Neto
04/30/2021, 3:31 PMUnexpected error: KilledWorker('to_parquet-f5821bf3359e41ac931067976aea0a97', <Worker '<tcp://172.32.0.48:43493>', name: 0, memory: 0, processing: 1>)
Scheduler and worker logs don't help eitherKevin Kho
task.run
. Is that running on local?Milton Tavares Neto
04/30/2021, 3:34 PMMilton Tavares Neto
04/30/2021, 3:34 PMKevin Kho
Milton Tavares Neto
04/30/2021, 3:35 PMKevin Kho
Milton Tavares Neto
04/30/2021, 3:36 PMMilton Tavares Neto
04/30/2021, 4:10 PMMilton Tavares Neto
04/30/2021, 4:12 PMKevin Kho
Milton Tavares Neto
04/30/2021, 4:19 PMKevin Kho
Milton Tavares Neto
04/30/2021, 4:28 PMfrom dask.distributed import Client
c = Client()
to_parquet.run(...)
Milton Tavares Neto
04/30/2021, 4:29 PMKevin Kho
Milton Tavares Neto
04/30/2021, 4:38 PMMilton Tavares Neto
04/30/2021, 4:38 PMMilton Tavares Neto
04/30/2021, 4:39 PMMilton Tavares Neto
04/30/2021, 4:40 PMMilton Tavares Neto
04/30/2021, 4:43 PMMilton Tavares Neto
04/30/2021, 4:45 PMKevin Kho
task.run
? Anyway, I think to utilize the other 3 workers, you need a slight refactor such that you can map
your function across some list. This will allow Prefect to distribute the task to the workers. Maybe apply your logic by breaking up the list of files into smaller lists and passing those to a task that can be mapped.Kevin Kho
Milton Tavares Neto
04/30/2021, 5:05 PMKevin Kho
Kevin Kho
Prefect
map
, the task sets serialized and sent to the workers. Each Dask
worker already contains an independent process. When you run your code with Prefect
, it already runs in one worker. This means that your Dask code is already running inside one worker as a local process, which is why it does not distribute the operation. You will either need to refactor to independent processes to leverage the Prefect map, and then combine the results in a later task or you will need to use a worker client to resubmit the Dask operations back to the Dask scheduler to get your operations parallelized with DaskMilton Tavares Neto
04/30/2021, 6:08 PMMilton Tavares Neto
04/30/2021, 6:09 PMMilton Tavares Neto
04/30/2021, 6:10 PMKevin Kho
Milton Tavares Neto
04/30/2021, 6:11 PMKevin Kho
Kevin Kho
Milton Tavares Neto
04/30/2021, 6:18 PMKevin Kho