Hi again, so I'm running a flow with KubernetesRun...
# prefect-community
d
Hi again, so I'm running a flow with KubernetesRun config in Prefect Cloud with a SqlServerFetch task that is receiving this error:
Copy code
Error during execution of task: OperationalError('08001', '[08001] [Microsoft][ODBC Driver 17 for SQL Server]Client unable to establish connection because an error was encountered during handshakes before login. Common causes include client attempting to connect to an unsupported version of SQL Server, server too busy to accept new connections or a resource limitation (memory or maximum allowed connections) on the server. (26) (SQLDriverConnect)')
The task is defined first outside the scope of the flow and the task is mapped with the queries generated from another task. See the rough pattern below with some values obscured. The strange thing is that this pattern will run on my local machine and successfully pull from our database. Even stranger is that another flow uses SqlServerFetch with KubernetesRun config in Prefect Cloud successfully; though that job is mapped in the different way. Any thoughts on this? Only thing that comes to mind is some issue with the driver selected when mapping the task
k
Hey @David Beck, could you move the code to this thread when you get a chance to keep the main channel cleaner. I suspect this error might just be because of too many concurrent connections? How is the job that works mapped in a different way? Also, what storage do you use?
The
current_timestamp = datetime.now()
here might lead to
current_timestamp
being locked in to build time if you use pickle based storage
d
flow:
Copy code
sf_db_host = prefect.config.salesforce.db_host
salesforce_query = SqlServerFetch(db_name=db_name, user=user, host=sf_db_host, fetch='all')


@task
def create_sql_queries(database: AnyStr,
                       schema: AnyStr,
                       table: AnyStr,
                       filter_col: AnyStr,
                       since: AnyStr,
                       until: AnyStr) -> List:
    """Generates the list of SQL queries to run against SQL DB"""
    return f"SELECT * FROM [{database}].[{schema}].[{table}] WITH (NOLOCK) \
        WHERE [{filter_col}] BETWEEN '{since}' AND '{until}'"


with Flow(FLOW_NAME, executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
    
    salesforce_password = PrefectSecret("PASSWORD")
    
    database_names = ['...']
    schema_names = ['...']
    source_tables = ['...']
    filters = ['...']

    last_timestamp = get_last_run_time(flow_name=flow.name)
    current_timestamp = datetime.now()

    queries = create_sql_queries.map(database=database_names,
                                     schema=schema_names,
                                     table=source_tables,
                                     filter_col=filters,
                                     since=unmapped(last_timestamp),
                                     until=unmapped(format_datetime_as_sql_datetime(current_timestamp)))

    salesforce_responses = salesforce_query.map(password=unmapped(salesforce_password), query=queries)
k
Is the error intermittent or consistent also? I am wondering if you may have been unlucky with many concurrent connections at the time of the flow run?
d
The error is consistent on every cloud run.
k
What is different about the mapped job that works?
d
the other mapped job creates a wrapper task that executes a single run on the SqlServerFetch task, ala sql_fetch.run(query=query, password=password). So in affect the mapping is happening to the wrapper task and not the inner SqlServerFetch task
k
I think…that should generate the same number of connections? Or is my thinking wrong?
Do you think there might be a difference with the number of concurrent connections from Local and on the Kubernetes pod?
d
I don't know. So the SqlServerFetch task is created as a global var in the working flow and then set to run under each mapped task
k
Ah I see what you are thinking. It’s the run method though that creates the connection and then closes it on line 100
d
It's possible. When a SqlServerFetch task is mapped, does it create multiple connnections?
k
Yes each mapped instance creates an independent connection because you can’t pass connections through Dask (on cluster) for example
d
Ah I see. Does this mean that mapping a SqlServerFetch task or any of the SqlServer task is not advisable?
Again I'm not sure why the wrapper task instance seems to work, but I might try implementing it that way if it doesn't work otherwise
k
You can map if the database can handle that many concurrent connections
If you are on Prefect Cloud, you can limit the concurrent task runs or you can reduce the processes/threads of your LocalDaskExecutor to limit it if that helps
d
Okay. How would I limit the processes/threads in the cloud?
k
There is limiting here.
Or you can define the number of workers here