https://prefect.io logo
Title
a

ARun

09/11/2020, 4:17 PM
Hey guys, I'm trying to map an array of dictionary items into a function. My tasks keep failing from random errors, but it looks like the task isn't actually taking in the dictionary, instead it's taking in the first letter of every object. for example, I have an array in the format [{},{},{},{}...] the dict contains about a dozen attributes I am trying to map the array to the function like function.map(array) but i keep getting random errors like file not found(one of the attributes). Instead of using the actual fully qualified file name, what's being passed in is just a letter
n

nicholas

09/11/2020, 4:22 PM
Hi @ARun - can you share a min example of the code you're using to map over your list? It sounds like you're calling map on the wrong element
a

ARun

09/11/2020, 4:51 PM
sure, this is essential what I have. I have also tried appending all the results to d, and sending htat into the function. My goal is too get initiatePii to execute in parallel.
with Flow("runPii", environment=LocalEnvironment(executor=DaskExecutor())) as flow:
    driverTable = query.get_df(sql_query)
    d = []
    for _, db in driverTable.iterrows():
        dict = {
            "TargetIntgDB": intg,
            "TargetDbName": db[0],
            "DatabaseName": db[0],
            "SchemaName": db[1],
            "TargetSchemaName": db[2],
            "TargetPiiSchemaName": db[3],
            "file": file
        }
    queries = initiatePii.map(**dict)
that runs the process, but puts in the incorrect data, if i do initatePii(**dict), it works perfectly
n

nicholas

09/11/2020, 4:53 PM
Can you show your tasks as well?
a

ARun

09/11/2020, 5:04 PM
sure, this is the current attempt. Now I am getting this error
Unexpected error: ValueError('Could not infer an active Flow context.')
@task
def initiatePii(
    db
):

    dict = {
        "intg_database_name_v": db['TargetIntgDB'],
        "target_database_name_v": db['TargetDbName'],
        "database_name_v": db['DatabaseName'],
        "source_schema_v": db['SchemaName'],
        "target_schema_v": db['TargetSchemaName'],
        "target_pii_schema_v": db['TargetPiiSchemaName'],
    }
    <http://logging.info|logging.info>(dict)
    data = findAndReplace(dict, db['file'])
    sql = query.run(data)
    queries = []
    sql_2 = " "
    y = True
    for row in sql:
        sql_1 = "\n".join(row)
        sql_2 += sql_1 + "\n"
        if "SELECT " in sql_2 and y:
            sql_2 += (
                " -- This code was autogenerated on {0} by the pii/python process".format(
                    datetime.now().strftime("%m/%d/%Y")
                )
                + "\n"
            )
            y = False
        if ";" in sql_2:
            queries.append(sql_2)
            sql_2 = " "
            y = True
    return queries
n

nicholas

09/11/2020, 5:15 PM
It looks like you're trying to map over the dict instead of the list of dicts. You should be passing in
d
instead of
dict
(which should be appended to the
d
, if i understand correctly). I'd recommend you move that operation to its own task and pass the result of that task (which should be a list) to
initiatePii.map
Without seeing all your code, I'm not sure where that error is being raised.
But that's raised when you're trying to access a flow context from outside a task or
with
block