David Beck
03/22/2022, 7:37 PMError during execution of task: OperationalError('08001', '[08001] [Microsoft][ODBC Driver 17 for SQL Server]Client unable to establish connection because an error was encountered during handshakes before login. Common causes include client attempting to connect to an unsupported version of SQL Server, server too busy to accept new connections or a resource limitation (memory or maximum allowed connections) on the server. (26) (SQLDriverConnect)')
The task is defined first outside the scope of the flow and the task is mapped with the queries generated from another task. See the rough pattern below with some values obscured.
The strange thing is that this pattern will run on my local machine and successfully pull from our database. Even stranger is that another flow uses SqlServerFetch with KubernetesRun config in Prefect Cloud successfully; though that job is mapped in the different way. Any thoughts on this? Only thing that comes to mind is some issue with the driver selected when mapping the taskKevin Kho
current_timestamp = datetime.now()
here might lead to current_timestamp
being locked in to build time if you use pickle based storageDavid Beck
03/22/2022, 7:44 PMsf_db_host = prefect.config.salesforce.db_host
salesforce_query = SqlServerFetch(db_name=db_name, user=user, host=sf_db_host, fetch='all')
@task
def create_sql_queries(database: AnyStr,
schema: AnyStr,
table: AnyStr,
filter_col: AnyStr,
since: AnyStr,
until: AnyStr) -> List:
"""Generates the list of SQL queries to run against SQL DB"""
return f"SELECT * FROM [{database}].[{schema}].[{table}] WITH (NOLOCK) \
WHERE [{filter_col}] BETWEEN '{since}' AND '{until}'"
with Flow(FLOW_NAME, executor=LocalDaskExecutor(), run_config=LocalRun()) as flow:
salesforce_password = PrefectSecret("PASSWORD")
database_names = ['...']
schema_names = ['...']
source_tables = ['...']
filters = ['...']
last_timestamp = get_last_run_time(flow_name=flow.name)
current_timestamp = datetime.now()
queries = create_sql_queries.map(database=database_names,
schema=schema_names,
table=source_tables,
filter_col=filters,
since=unmapped(last_timestamp),
until=unmapped(format_datetime_as_sql_datetime(current_timestamp)))
salesforce_responses = salesforce_query.map(password=unmapped(salesforce_password), query=queries)
Kevin Kho
David Beck
03/22/2022, 7:45 PMKevin Kho
David Beck
03/22/2022, 7:47 PMKevin Kho
David Beck
03/22/2022, 7:49 PMKevin Kho
David Beck
03/22/2022, 7:50 PMKevin Kho
David Beck
03/22/2022, 7:55 PMKevin Kho
David Beck
03/22/2022, 8:04 PMKevin Kho