ARun
09/11/2020, 4:17 PMnicholas
09/11/2020, 4:22 PMARun
09/11/2020, 4:51 PMwith Flow("runPii", environment=LocalEnvironment(executor=DaskExecutor())) as flow:
driverTable = query.get_df(sql_query)
d = []
for _, db in driverTable.iterrows():
dict = {
"TargetIntgDB": intg,
"TargetDbName": db[0],
"DatabaseName": db[0],
"SchemaName": db[1],
"TargetSchemaName": db[2],
"TargetPiiSchemaName": db[3],
"file": file
}
queries = initiatePii.map(**dict)
nicholas
09/11/2020, 4:53 PMARun
09/11/2020, 5:04 PMUnexpected error: ValueError('Could not infer an active Flow context.')
@task
def initiatePii(
db
):
dict = {
"intg_database_name_v": db['TargetIntgDB'],
"target_database_name_v": db['TargetDbName'],
"database_name_v": db['DatabaseName'],
"source_schema_v": db['SchemaName'],
"target_schema_v": db['TargetSchemaName'],
"target_pii_schema_v": db['TargetPiiSchemaName'],
}
<http://logging.info|logging.info>(dict)
data = findAndReplace(dict, db['file'])
sql = query.run(data)
queries = []
sql_2 = " "
y = True
for row in sql:
sql_1 = "\n".join(row)
sql_2 += sql_1 + "\n"
if "SELECT " in sql_2 and y:
sql_2 += (
" -- This code was autogenerated on {0} by the pii/python process".format(
datetime.now().strftime("%m/%d/%Y")
)
+ "\n"
)
y = False
if ";" in sql_2:
queries.append(sql_2)
sql_2 = " "
y = True
return queries
nicholas
09/11/2020, 5:15 PMd
instead of dict
(which should be appended to the d
, if i understand correctly). I'd recommend you move that operation to its own task and pass the result of that task (which should be a list) to initiatePii.map
with
block