https://prefect.io logo
Title
b

Benjamin

07/02/2020, 7:30 PM
Hello everyone. I'm running a POC with prefect in a very simple flow: read data from s3, standardize numbers and write result to s3. For the moment we're trying local backend server (and not prefect cloud version) and we're getting a strange behavior: everything runs fine when use
flow.run(executor=executor)
but not with
flow.register()
. We're using using the FargateCluster to create a new cluster for the flow and it's setup properyl with flow.register but no processing is done. I had asked this before a couple of days ago here but now I have a reproducible exemple. I'll provide the details in this thread. Thanks a lot
đź‘€ 1
We're using the docker file below based on dask docker image but adding prefect and dask-ml modules:
FROM continuumio/miniconda3:4.7.12

RUN conda install --yes \
    -c conda-forge \
    python==3.8 \
    python-blosc \
    cytoolz \
    dask==2.19.0 \
    lz4 \
    nomkl \
    numpy==1.18.1 \
    pandas==1.0.1 \
    tini==0.18.0 \
    dask-ml==1.5.0 \
    prefect==0.12.1 \
    pyarrow==0.17.1 \
    python-snappy==0.5.4 \
    s3fs==0.4.2 \
    && conda clean -tipsy \
    && find /opt/conda/ -type f,l -name '*.a' -delete \
    && find /opt/conda/ -type f,l -name '*.pyc' -delete \
    && find /opt/conda/ -type f,l -name '*.js.map' -delete \
    && find /opt/conda/lib/python*/site-packages/bokeh/server/static -type f,l -name '*.js' -not -name '*.min.js' -delete \
    && rm -rf /opt/conda/pkgs

COPY prepare.sh /usr/bin/prepare.sh

RUN mkdir /opt/app

ENTRYPOINT ["tini", "-g", "--", "/usr/bin/prepare.sh"]
First we need to create some dummy data:
import dask.dataframe as dd
import numpy as np
import pandas as pd
def create_dataset(bucket, filepath, npartitions=10, nrows_per_partition=100000, ncols=100, mean=100., stddev=10.):
	data = np.random.random((nrows_per_partition*npartitions, ncols))*stddev+mean
	df = dd.from_array(data, chunksize=nrows_per_partition, columns=['col{0}'.format(i) for i in range(ncols)])
	df.to_parquet('s3://{bucket}/{filepath}'.format(bucket=bucket, filepath=filepath), 
		          engine='pyarrow', compression='snappy')
And we build the flow:
import dask.dataframe as dd
import numpy as np
import pandas as pd

from prefect import Flow, Parameter
from prefect.environments.execution.local import LocalEnvironment
from prefect.engine.executors import DaskExecutor
from dask_ml.preprocessing import StandardScaler
from prefect import task

@task
def read_files(bucket, input_filepath):
    file_pattern = 's3://{bucket}/{input_filepath}/*.parquet'.format(bucket=bucket, input_filepath=input_filepath)
    return dd.read_parquet(file_pattern, engine='pyarrow')

@task
def scale(feats_df):
    return StandardScaler().fit_transform(feats_df)

@task
def write_csv(feats_df, bucket, outfile):
    full_path = 's3://{bucket}/{outfile}'.format(bucket=bucket, outfile=outfile)
    feats_df.to_csv(full_path, encoding='utf-8', index=False)


with Flow("standardization") as flow:
    bucket = Parameter('bucket', default='') # need to add the bucket where dataset is here
    input_filepath = Parameter('input_filepath', default='dataset')
    output_file = Parameter('output_file', default='scaled_data')
    df = read_files(bucket=bucket, input_filepath=input_filepath)
    scaled_df = scale(df)
    write_csv(scaled_df, bucket=bucket, outfile=output_file)

cluster_kwargs={
	'image': ''. #docker image here 
	'cluster_name_template': 'prefect-test-{uuid}', 
	'task_role_policies': ['arn:aws:iam::aws:policy/AmazonS3FullAccess'],
	'n_workers': 1
}
executor = DaskExecutor(cluster_class='dask_cloudprovider.FargateCluster', cluster_kwargs=cluster_kwargs, 
					    client_kwargs={'timeout': 300})
Now if we run it with:
flow.run(executor=executor)
I works like a charm. But running with
flow.environment = LocalEnvironment(executor=executor)
flow.register()
The cluster is created and dask scheduler receives the tasks names but no work is done.
Everything is summed up here:
I'm sorry I couldn't open a publicj S3 Bucket nor docker image within my organization. But is it enough for you to investigate? Once again thanks for the help
c

Chris White

07/02/2020, 7:46 PM
Hi @Benjamin - since you are working with a locally hosted version of Server, can you confirm that both your Fargate environment and your Dask cluster have access to the API?
b

Benjamin

07/02/2020, 7:47 PM
Ah maybe not actually
So the dask cluster should be able to access the prefect server graphql api?
c

Chris White

07/02/2020, 7:48 PM
it sounds like your Agent does have access (because it picks up the flow) but maybe your Flow’s environment does not. Yup, every Prefect Task needs to be able to communicate to the API to report back state + logs
b

Benjamin

07/02/2020, 7:49 PM
I thought that the agent would do all communication with prefect server
but it makes a lot of sense now
I'll try fixing that and it'll probably work
Thanks a lot
c

Chris White

07/02/2020, 7:50 PM
Yea, no worries that’s a common misunderstanding; most of our current documentation is centered around Cloud’s API (which doesn’t require any networking setup) and we’re actively working on improving our documentation + deployment story around the open source API as well
anytime!