b

    Benjamin

    2 years ago
    Hello everyone. I'm running a POC with prefect in a very simple flow: read data from s3, standardize numbers and write result to s3. For the moment we're trying local backend server (and not prefect cloud version) and we're getting a strange behavior: everything runs fine when use
    flow.run(executor=executor)
    but not with
    flow.register()
    . We're using using the FargateCluster to create a new cluster for the flow and it's setup properyl with flow.register but no processing is done. I had asked this before a couple of days ago here but now I have a reproducible exemple. I'll provide the details in this thread. Thanks a lot
    We're using the docker file below based on dask docker image but adding prefect and dask-ml modules:
    FROM continuumio/miniconda3:4.7.12
    
    RUN conda install --yes \
        -c conda-forge \
        python==3.8 \
        python-blosc \
        cytoolz \
        dask==2.19.0 \
        lz4 \
        nomkl \
        numpy==1.18.1 \
        pandas==1.0.1 \
        tini==0.18.0 \
        dask-ml==1.5.0 \
        prefect==0.12.1 \
        pyarrow==0.17.1 \
        python-snappy==0.5.4 \
        s3fs==0.4.2 \
        && conda clean -tipsy \
        && find /opt/conda/ -type f,l -name '*.a' -delete \
        && find /opt/conda/ -type f,l -name '*.pyc' -delete \
        && find /opt/conda/ -type f,l -name '*.js.map' -delete \
        && find /opt/conda/lib/python*/site-packages/bokeh/server/static -type f,l -name '*.js' -not -name '*.min.js' -delete \
        && rm -rf /opt/conda/pkgs
    
    COPY prepare.sh /usr/bin/prepare.sh
    
    RUN mkdir /opt/app
    
    ENTRYPOINT ["tini", "-g", "--", "/usr/bin/prepare.sh"]
    First we need to create some dummy data:
    import dask.dataframe as dd
    import numpy as np
    import pandas as pd
    def create_dataset(bucket, filepath, npartitions=10, nrows_per_partition=100000, ncols=100, mean=100., stddev=10.):
    	data = np.random.random((nrows_per_partition*npartitions, ncols))*stddev+mean
    	df = dd.from_array(data, chunksize=nrows_per_partition, columns=['col{0}'.format(i) for i in range(ncols)])
    	df.to_parquet('s3://{bucket}/{filepath}'.format(bucket=bucket, filepath=filepath), 
    		          engine='pyarrow', compression='snappy')
    And we build the flow:
    import dask.dataframe as dd
    import numpy as np
    import pandas as pd
    
    from prefect import Flow, Parameter
    from prefect.environments.execution.local import LocalEnvironment
    from prefect.engine.executors import DaskExecutor
    from dask_ml.preprocessing import StandardScaler
    from prefect import task
    
    @task
    def read_files(bucket, input_filepath):
        file_pattern = 's3://{bucket}/{input_filepath}/*.parquet'.format(bucket=bucket, input_filepath=input_filepath)
        return dd.read_parquet(file_pattern, engine='pyarrow')
    
    @task
    def scale(feats_df):
        return StandardScaler().fit_transform(feats_df)
    
    @task
    def write_csv(feats_df, bucket, outfile):
        full_path = 's3://{bucket}/{outfile}'.format(bucket=bucket, outfile=outfile)
        feats_df.to_csv(full_path, encoding='utf-8', index=False)
    
    
    with Flow("standardization") as flow:
        bucket = Parameter('bucket', default='') # need to add the bucket where dataset is here
        input_filepath = Parameter('input_filepath', default='dataset')
        output_file = Parameter('output_file', default='scaled_data')
        df = read_files(bucket=bucket, input_filepath=input_filepath)
        scaled_df = scale(df)
        write_csv(scaled_df, bucket=bucket, outfile=output_file)
    
    cluster_kwargs={
    	'image': ''. #docker image here 
    	'cluster_name_template': 'prefect-test-{uuid}', 
    	'task_role_policies': ['arn:aws:iam::aws:policy/AmazonS3FullAccess'],
    	'n_workers': 1
    }
    executor = DaskExecutor(cluster_class='dask_cloudprovider.FargateCluster', cluster_kwargs=cluster_kwargs, 
    					    client_kwargs={'timeout': 300})
    Now if we run it with:
    flow.run(executor=executor)
    I works like a charm. But running with
    flow.environment = LocalEnvironment(executor=executor)
    flow.register()
    The cluster is created and dask scheduler receives the tasks names but no work is done.
    Everything is summed up here:
    I'm sorry I couldn't open a publicj S3 Bucket nor docker image within my organization. But is it enough for you to investigate? Once again thanks for the help
    Chris White

    Chris White

    2 years ago
    Hi @Benjamin - since you are working with a locally hosted version of Server, can you confirm that both your Fargate environment and your Dask cluster have access to the API?
    b

    Benjamin

    2 years ago
    Ah maybe not actually
    So the dask cluster should be able to access the prefect server graphql api?
    Chris White

    Chris White

    2 years ago
    it sounds like your Agent does have access (because it picks up the flow) but maybe your Flow’s environment does not. Yup, every Prefect Task needs to be able to communicate to the API to report back state + logs
    b

    Benjamin

    2 years ago
    I thought that the agent would do all communication with prefect server
    but it makes a lot of sense now
    I'll try fixing that and it'll probably work
    Thanks a lot
    Chris White

    Chris White

    2 years ago
    Yea, no worries that’s a common misunderstanding; most of our current documentation is centered around Cloud’s API (which doesn’t require any networking setup) and we’re actively working on improving our documentation + deployment story around the open source API as well
    anytime!