Benjamin
07/02/2020, 7:30 PMflow.run(executor=executor)
but not with flow.register()
.
We're using using the FargateCluster to create a new cluster for the flow and it's setup properyl with flow.register but no processing is done. I had asked this before a couple of days ago here but now I have a reproducible exemple.
I'll provide the details in this thread.
Thanks a lotFROM continuumio/miniconda3:4.7.12
RUN conda install --yes \
-c conda-forge \
python==3.8 \
python-blosc \
cytoolz \
dask==2.19.0 \
lz4 \
nomkl \
numpy==1.18.1 \
pandas==1.0.1 \
tini==0.18.0 \
dask-ml==1.5.0 \
prefect==0.12.1 \
pyarrow==0.17.1 \
python-snappy==0.5.4 \
s3fs==0.4.2 \
&& conda clean -tipsy \
&& find /opt/conda/ -type f,l -name '*.a' -delete \
&& find /opt/conda/ -type f,l -name '*.pyc' -delete \
&& find /opt/conda/ -type f,l -name '*.js.map' -delete \
&& find /opt/conda/lib/python*/site-packages/bokeh/server/static -type f,l -name '*.js' -not -name '*.min.js' -delete \
&& rm -rf /opt/conda/pkgs
COPY prepare.sh /usr/bin/prepare.sh
RUN mkdir /opt/app
ENTRYPOINT ["tini", "-g", "--", "/usr/bin/prepare.sh"]
import dask.dataframe as dd
import numpy as np
import pandas as pd
def create_dataset(bucket, filepath, npartitions=10, nrows_per_partition=100000, ncols=100, mean=100., stddev=10.):
data = np.random.random((nrows_per_partition*npartitions, ncols))*stddev+mean
df = dd.from_array(data, chunksize=nrows_per_partition, columns=['col{0}'.format(i) for i in range(ncols)])
df.to_parquet('s3://{bucket}/{filepath}'.format(bucket=bucket, filepath=filepath),
engine='pyarrow', compression='snappy')
import dask.dataframe as dd
import numpy as np
import pandas as pd
from prefect import Flow, Parameter
from prefect.environments.execution.local import LocalEnvironment
from prefect.engine.executors import DaskExecutor
from dask_ml.preprocessing import StandardScaler
from prefect import task
@task
def read_files(bucket, input_filepath):
file_pattern = 's3://{bucket}/{input_filepath}/*.parquet'.format(bucket=bucket, input_filepath=input_filepath)
return dd.read_parquet(file_pattern, engine='pyarrow')
@task
def scale(feats_df):
return StandardScaler().fit_transform(feats_df)
@task
def write_csv(feats_df, bucket, outfile):
full_path = 's3://{bucket}/{outfile}'.format(bucket=bucket, outfile=outfile)
feats_df.to_csv(full_path, encoding='utf-8', index=False)
with Flow("standardization") as flow:
bucket = Parameter('bucket', default='') # need to add the bucket where dataset is here
input_filepath = Parameter('input_filepath', default='dataset')
output_file = Parameter('output_file', default='scaled_data')
df = read_files(bucket=bucket, input_filepath=input_filepath)
scaled_df = scale(df)
write_csv(scaled_df, bucket=bucket, outfile=output_file)
cluster_kwargs={
'image': ''. #docker image here
'cluster_name_template': 'prefect-test-{uuid}',
'task_role_policies': ['arn:aws:iam::aws:policy/AmazonS3FullAccess'],
'n_workers': 1
}
executor = DaskExecutor(cluster_class='dask_cloudprovider.FargateCluster', cluster_kwargs=cluster_kwargs,
client_kwargs={'timeout': 300})
flow.run(executor=executor)
flow.environment = LocalEnvironment(executor=executor)
flow.register()
Chris White
07/02/2020, 7:46 PMBenjamin
07/02/2020, 7:47 PMChris White
07/02/2020, 7:48 PMBenjamin
07/02/2020, 7:49 PMChris White
07/02/2020, 7:50 PM