a

    ARun

    2 years ago
    Hey guys, I'm trying to map an array of dictionary items into a function. My tasks keep failing from random errors, but it looks like the task isn't actually taking in the dictionary, instead it's taking in the first letter of every object. for example, I have an array in the format [{},{},{},{}...] the dict contains about a dozen attributes I am trying to map the array to the function like function.map(array) but i keep getting random errors like file not found(one of the attributes). Instead of using the actual fully qualified file name, what's being passed in is just a letter
    nicholas

    nicholas

    2 years ago
    Hi @ARun - can you share a min example of the code you're using to map over your list? It sounds like you're calling map on the wrong element
    a

    ARun

    2 years ago
    sure, this is essential what I have. I have also tried appending all the results to d, and sending htat into the function. My goal is too get initiatePii to execute in parallel.
    with Flow("runPii", environment=LocalEnvironment(executor=DaskExecutor())) as flow:
        driverTable = query.get_df(sql_query)
        d = []
        for _, db in driverTable.iterrows():
            dict = {
                "TargetIntgDB": intg,
                "TargetDbName": db[0],
                "DatabaseName": db[0],
                "SchemaName": db[1],
                "TargetSchemaName": db[2],
                "TargetPiiSchemaName": db[3],
                "file": file
            }
        queries = initiatePii.map(**dict)
    that runs the process, but puts in the incorrect data, if i do initatePii(**dict), it works perfectly
    nicholas

    nicholas

    2 years ago
    Can you show your tasks as well?
    a

    ARun

    2 years ago
    sure, this is the current attempt. Now I am getting this error
    Unexpected error: ValueError('Could not infer an active Flow context.')
    @task
    def initiatePii(
        db
    ):
    
        dict = {
            "intg_database_name_v": db['TargetIntgDB'],
            "target_database_name_v": db['TargetDbName'],
            "database_name_v": db['DatabaseName'],
            "source_schema_v": db['SchemaName'],
            "target_schema_v": db['TargetSchemaName'],
            "target_pii_schema_v": db['TargetPiiSchemaName'],
        }
        <http://logging.info|logging.info>(dict)
        data = findAndReplace(dict, db['file'])
        sql = query.run(data)
        queries = []
        sql_2 = " "
        y = True
        for row in sql:
            sql_1 = "\n".join(row)
            sql_2 += sql_1 + "\n"
            if "SELECT " in sql_2 and y:
                sql_2 += (
                    " -- This code was autogenerated on {0} by the pii/python process".format(
                        datetime.now().strftime("%m/%d/%Y")
                    )
                    + "\n"
                )
                y = False
            if ";" in sql_2:
                queries.append(sql_2)
                sql_2 = " "
                y = True
        return queries
    nicholas

    nicholas

    2 years ago
    It looks like you're trying to map over the dict instead of the list of dicts. You should be passing in
    d
    instead of
    dict
    (which should be appended to the
    d
    , if i understand correctly). I'd recommend you move that operation to its own task and pass the result of that task (which should be a list) to
    initiatePii.map
    Without seeing all your code, I'm not sure where that error is being raised.
    But that's raised when you're trying to access a flow context from outside a task or
    with
    block