https://prefect.io logo
m

mithalee mohapatra

12/24/2020, 1:41 AM
Has anyone faced issue in mapping few tasks which uses SFTP connection and LocalDaskExecutor(). I see that the process hangs up when hits the mapped tasks. The mapped task works fine with a Dask cluster and just flow.run. The same workflow works fine process works for an FTP connection though.
d

Dylan

12/24/2020, 3:27 PM
Hi @mithalee mohapatra! Would you mind sharing a reproducible example? Once we can replicate what you’re seeing, we can help fix it 😄
m

mithalee mohapatra

12/28/2020, 6:10 PM
from prefect import task, Flow from datetime import timedelta #from prefect.schedules import IntervalSchedule from prefect.schedules import Schedule from prefect.schedules.clocks import IntervalClock import pendulum from prefect.engine.executors import LocalDaskExecutor from prefect.engine.executors import DaskExecutor import prefect import boto3 from prefect.environments.execution.local import LocalEnvironment import pysftp from boto3.session import Session from test.ftp_interface import SFTPConn from lbx.lib.s3 import get_directories from prefect import Flow, Parameter, flatten, task, unmapped from prefect.tasks.aws.secrets_manager import AWSSecretsManager from prefect.engine.results import S3Result from prefect import task, Flow, Parameter import tempfile from prefect.engine.executors import DaskExecutor from prefect import Flow, task #import botocore #SFTP Credentials: def get_sftp_credentials(): secret_credential = AWSSecretsManager(secret='TEST/FTP/prefect').run() SFTP_USER = secret_credential['UserName'] SFTP_PASS = secret_credential['Password'] SFTP_HOST = 'ftp.test.com' return SFTP_USER,SFTP_PASS,SFTP_HOST #Get list of directories within a directory @task def top_level_dir(newbatchid): """ This function returns list of subdirectory present in the top level folder i.e the new batch to be processed. """ SFTP_USER,SFTP_PASS,SFTP_HOST= get_sftp_credentials() conn = SFTPConn(SFTP_host=SFTP_HOST,SFTP_user=SFTP_USER,SFTP_pass=SFTP_PASS) dir_list_all=conn.get_dirlist(newbatchid) return dir_list_all #Get list of files in directory @task def second_level_dir(sub_dir): """ This function returns list of all the files present in each of the subdirectory. """ print('test') SFTP_USER,SFTP_PASS,SFTP_HOST= get_sftp_credentials() conn = SFTPConn(SFTP_host=SFTP_HOST,SFTP_user=SFTP_USER,SFTP_pass=SFTP_PASS) files=conn.get_dirlist(sub_dir) #print(files) return files #Upload to S3 bucket @task def third_level_dir(filename,bucket): """ This function takes each file and stores in a temp directory and then uplooads to S3 from the temp location. """ s3 = boto3.resource('s3') s3_connect = boto3.client('s3') try: s3.meta.client.head_bucket(Bucket=bucket) except Exception as e: raise signals.FAIL(e) dir = os.path.dirname(filename) with tempfile.TemporaryDirectory() as tmpdirname: local_path=tmpdirname + dir filename_new=tmpdirname + filename #Check dir if exists else create New, Refresh,Update if not os.path.exists(local_path): os.makedirs(local_path) SFTP_USER,SFTP_PASS,SFTP_HOST= get_sftp_credentials() conn = SFTPConn(SFTP_host=SFTP_HOST,SFTP_user=SFTP_USER,SFTP_pass=SFTP_PASS) sftp=conn.get_connection(SFTP_host=SFTP_HOST,SFTP_user=SFTP_USER,SFTP_pass=SFTP_PASS) try: sftp.get(filename, filename_new, preserve_mtime=True) except: raise signals.FAIL(f'Failed to download {filename}') try: s3_path= 'ABCD/'+ filename[1:] s3_connect.upload_file(filename_new, bucket, s3_path) except: raise signals.FAIL(f'Failed to upload to S3 {filename_new}') return success if name == '__main__': with Flow("Hello") as flow: bucket = Parameter('bucket', default ='test-sandbox') top=top_level_dir(newbatchid='20201221') second=second_level_dir.map(sub_dir=top) third=third_level_dir.map(flatten(second),unmapped(bucket)) executor=LocalDaskExecutor(scheduler="processes", num_workers=6) #flow.register(project_name="Hello, World_mm!") flow.run(executor=executor)
Hi this is the sample code I am trying to run. Get the SFTP credentials and retrieve list of directories and upload the files in each of the directories. Looks like if I use "flatten" my process just hangs after second task. With distributed dask and only flow.run() the process works .But with Local dask it just hangs without throwing any error.
k

Kyle Moon-Wright

12/28/2020, 6:46 PM
Hey @mithalee mohapatra, From what I’m reading - SFTP may not be thread safe. Is using the LocalDaskExecutor a requirement for your flow? If so, this may require a refactor that passes a single connection.
m

mithalee mohapatra

12/28/2020, 6:49 PM
Thank you Kyle. For now I have to use LocalDaskExecutor. I can try passing a single connection but wont that block the parallel processing in dask?
k

Kyle Moon-Wright

12/28/2020, 6:52 PM
Hmm, great question - I don’t think it will block parallelism AFAIK.
m

mithalee mohapatra

12/28/2020, 6:53 PM
Thank you. Let me try.
from prefect import task, Flow from datetime import timedelta #from prefect.schedules import IntervalSchedule from prefect.schedules import Schedule from prefect.schedules.clocks import IntervalClock import pendulum from prefect.engine.executors import LocalDaskExecutor from prefect.engine.executors import DaskExecutor import prefect import boto3 from prefect.environments.execution.local import LocalEnvironment import pysftp from boto3.session import Session from test.ftp_interface import SFTPConn from test.s3 import get_directories from prefect import Flow, Parameter, flatten, task, unmapped from prefect.tasks.aws.secrets_manager import AWSSecretsManager from prefect.engine.results import S3Result from prefect import task, Flow, Parameter import tempfile from prefect.engine.executors import DaskExecutor from prefect import Flow, task #import botocore @task def get_sftp_credentials(): secret_credential = AWSSecretsManager(secret='TEST/FTP/prefect').run() SFTP_USER = secret_credential['UserName'] SFTP_PASS = secret_credential['Password'] SFTP_HOST = 'ftp.test.com' #new conn = SFTPConn(SFTP_host=SFTP_HOST,SFTP_user=SFTP_USER,SFTP_pass=SFTP_PASS) #return SFTP_USER,SFTP_PASS,SFTP_HOST return conn @task def top_level_dir(conn,newbatchid): """ This function returns list of subdirectory present in the top level folder i.e the new batch to be processed. """ #SFTP_USER,SFTP_PASS,SFTP_HOST= get_sftp_credentials() #conn = SFTPConn(SFTP_host=SFTP_HOST,SFTP_user=SFTP_USER,SFTP_pass=SFTP_PASS) dir_list_all=conn.get_dirlist(newbatchid) return dir_list_all @task def second_level_dir(conn,sub_dir): """ This function returns list of all the files present in each of the subdirectory. """ print('test') #SFTP_USER,SFTP_PASS,SFTP_HOST= get_sftp_credentials() #conn = SFTPConn(SFTP_host=SFTP_HOST,SFTP_user=SFTP_USER,SFTP_pass=SFTP_PASS) files=conn.get_dirlist(sub_dir) #print(files) return files @task def third_level_dir(filename,bucket,conn): """ This function takes each file and stores in a temp directory and then uplooads to S3 from the temp location. """ s3 = boto3.resource('s3') s3_connect = boto3.client('s3') try: s3.meta.client.head_bucket(Bucket=bucket) except Exception as e: raise signals.FAIL(e) dir = os.path.dirname(filename) with tempfile.TemporaryDirectory() as tmpdirname: local_path=tmpdirname + dir filename_new=tmpdirname + filename #Check dir if exists else create New, Refresh,Update if not os.path.exists(local_path): os.makedirs(local_path) #SFTP_USER,SFTP_PASS,SFTP_HOST= get_sftp_credentials() #conn = SFTPConn(SFTP_host=SFTP_HOST,SFTP_user=SFTP_USER,SFTP_pass=SFTP_PASS) sftp=conn.get_connection(SFTP_host=SFTP_HOST,SFTP_user=SFTP_USER,SFTP_pass=SFTP_PASS) try: sftp.get(filename, filename_new, preserve_mtime=True) except: raise signals.FAIL(f'Failed to download {filename}') try: s3_path= 'TEST_raw/'+ filename[1:] s3_connect.upload_file(filename_new, bucket, s3_path) except: raise signals.FAIL(f'Failed to upload to S3 {filename_new}') return success if name == '__main__': with Flow("Hello") as flow: bucket = Parameter('bucket', default ='test-sandbox') conn=get_sftp_credentials() top=top_level_dir(conn,newbatchid='20201221') second=second_level_dir.map(unmapped(conn),sub_dir=top) third=third_level_dir.map(flatten(second),unmapped(bucket),unmapped(conn)) executor=LocalDaskExecutor(scheduler="processes", num_workers=6) #flow.register(project_name="Hello, World_mm!") flow.run(executor=executor)
@Kyle Moon-Wright I am now passing just one connection but it still hangs after the second task.It is not going to the third task.
It just hangs after the highlighted line.
k

Kyle Moon-Wright

12/28/2020, 7:45 PM
Hmm, it’s strange that the first few mapped tasks on that level succeeded. The only other idea I have is to use threads with your executor to check for different behavior:
Copy code
executor = LocalDaskExecutor(scheduler="threads", num_workers=6)
m

mithalee mohapatra

12/28/2020, 8:33 PM
"threads" does not change the behavior.
k

Kyle Moon-Wright

12/28/2020, 8:43 PM
Hmm, ok Theory #2 is an assignment error. Can we try separating our flow definition like this:
Copy code
with Flow("Hello") as flow:
    bucket = Parameter("bucket", default="test-sandbox")
    conn = get_sftp_credentials()
    top = top_level_dir(conn, newbatchid="20201221")
    second = second_level_dir.map(unmapped(conn), sub_dir=top)
    third = third_level_dir.map(flatten(second), unmapped(bucket), unmapped(conn))

executor = LocalDaskExecutor(scheduler="processes", num_workers=6)

if __name__ == "__main__":
    # flow.register(project_name="Hello, World_mm!")
    flow.run(executor=executor)
I also recall some issue with using the
LocalDaskExecutor
with the
flow.run
method, so I’ll look into this next.
m

mithalee mohapatra

12/28/2020, 8:48 PM
ok sure.let me try separating it.
k

Kyle Moon-Wright

12/28/2020, 8:49 PM
Another potential route could be to attach a local
run_config
to the flow:
Copy code
from prefect.run_configs import LocalRun

flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=6)
flow.run_config = LocalRun()
flow.run()
but this one may only be relevant if using the
.executor
method on the flow, possibly irrelevant for us here.
m

mithalee mohapatra

12/28/2020, 8:57 PM
Yes. Eventually I am going to schedule it in the Prefect UI.Something like :
if name == '__main__': schedule = Schedule(clocks=[IntervalClock(start_date=pendulum.datetime(2020, 12, 22,hour=6,minute=3,second=0),interval=timedelta(minutes=2))]) environment=LocalEnvironment(executor=LocalDaskExecutor(scheduler="processes", num_workers=6)) with Flow("Hello",schedule,environment=environment) as flow: bucket = Parameter('bucket', default ='test-sandbox')
Separating the flow too does not work. It hangs at the same step.
k

Kyle Moon-Wright

12/28/2020, 9:17 PM
You mentioned this was working with the DaskExecutor?
m

mithalee mohapatra

12/28/2020, 9:19 PM
yes it does.
As an alternative I am thinking of using a prefect shell task to start the dask workers and use the distributed dask. That way I can still schedule my flow on prefect UI.
k

Kyle Moon-Wright

12/28/2020, 9:49 PM
Definitely a viable option. Apologies for runaround with these potential solutions, my only conclusion is that the DaskExecutor is more memory performant, while the LocalDaskExecutor is hanging w/o sufficient memory per this doc:
Copy code
Using a DaskExecutor with a local cluster is very similar to using a LocalDaskExecutor with processes=True. You may find it more performant in certain situations (this scheduler does a better job about managing memory), but generally they should perform equivalently for most Prefect workflows.
Since the DaskExecutor is able to execute your flow and allocate resources in an efficient manner with your workload.
If this is a major block or your workaround doesn’t end up being viable, we can also open an issue for visibility with the Core developers.
m

mithalee mohapatra

12/28/2020, 10:09 PM
@Kyle Moon-Wright Thank you so much Kyle.
b

Ben Rosen

12/29/2020, 1:05 AM
Depending on how many tasks are being created/submitted this might be too much for the local dask scheduler and it's hanging. @mithalee mohapatra If you have a large number of tasks being mapped over in
third_dir
try lowering that number and see if that helps.
4 Views