Hey guys, I'm trying to map an array of dictionary items into a function. My tasks keep failing from random errors, but it looks like the task isn't actually taking in the dictionary, instead it's taking in the first letter of every object. for example, I have an array in the format [{},{},{},{}...] the dict contains about a dozen attributes I am trying to map the array to the function like function.map(array) but i keep getting random errors like file not found(one of the attributes). Instead of using the actual fully qualified file name, what's being passed in is just a letter
Hi @ARun - can you share a min example of the code you're using to map over your list? It sounds like you're calling map on the wrong element
sure, this is essential what I have. I have also tried appending all the results to d, and sending htat into the function. My goal is too get initiatePii to execute in parallel.
with Flow("runPii", environment=LocalEnvironment(executor=DaskExecutor())) as flow:
    driverTable = query.get_df(sql_query)
    d = []
    for _, db in driverTable.iterrows():
        dict = {
            "TargetIntgDB": intg,
            "TargetDbName": db[0],
            "DatabaseName": db[0],
            "SchemaName": db[1],
            "TargetSchemaName": db[2],
            "TargetPiiSchemaName": db[3],
            "file": file
    queries = initiatePii.map(**dict)
that runs the process, but puts in the incorrect data, if i do initatePii(**dict), it works perfectly
Can you show your tasks as well?
sure, this is the current attempt. Now I am getting this error
Unexpected error: ValueError('Could not infer an active Flow context.')
def initiatePii(

    dict = {
        "intg_database_name_v": db['TargetIntgDB'],
        "target_database_name_v": db['TargetDbName'],
        "database_name_v": db['DatabaseName'],
        "source_schema_v": db['SchemaName'],
        "target_schema_v": db['TargetSchemaName'],
        "target_pii_schema_v": db['TargetPiiSchemaName'],
    data = findAndReplace(dict, db['file'])
    sql = query.run(data)
    queries = []
    sql_2 = " "
    y = True
    for row in sql:
        sql_1 = "\n".join(row)
        sql_2 += sql_1 + "\n"
        if "SELECT " in sql_2 and y:
            sql_2 += (
                " -- This code was autogenerated on {0} by the pii/python process".format(
                + "\n"
            y = False
        if ";" in sql_2:
            sql_2 = " "
            y = True
    return queries
It looks like you're trying to map over the dict instead of the list of dicts. You should be passing in
instead of
(which should be appended to the
, if i understand correctly). I'd recommend you move that operation to its own task and pass the result of that task (which should be a list) to
Without seeing all your code, I'm not sure where that error is being raised.
But that's raised when you're trying to access a flow context from outside a task or