https://prefect.io logo
#prefect-community
Title
# prefect-community
p

Pierre-Edouard

07/08/2022, 9:15 AM
Hello the community. I’m stuck in my code since days. It’s really annoying because It was supposed to be the easiest flow I develop with prefect. So, the algo: I want to read the content of a big redis database (redis.scan_iter()), filter to keep key which not matching a pattern and then remove record (easy !). My problem, I don’t want to load all my redis in memory in the first step task (my dask workers will not handle) and start filtering after. I want to generate batchs of keys and process each batch directly (map feature so). If I run my code without .map(), it work but It’s sequential (I load all my key before filtering). When I run it with .map() the filtering task return directly a failed status without any print error. I have no idea why it fail ! Someone have a clue?
#Clean_redis this flow is here to remove user which are not matching pattern
Copy code
@task
def ownBatcher(iterable, n):
    while True:
        batch = []
        for i in range(n):
            try:
                batch.append(next(iterable))
            except StopIteration:
                return batch
        if len(batch) != 0:
            yield batch


@task()
def filterList(userToFilter):
    pattern = '.*\.mydomain.net.*'
    users = []

    for userTuple in userToFilter:
        for user in userTuple:
            if user:
                if not re.match(pattern, user.decode("utf-8")):
                    users.append(user)
    return users

@task()
def removeUsers(redisCon, users):
    if(users):
        redisCon.delete(*users)

def main():

    with Flow("clean_redis") as flow:
       redisConnection = redis.Redis(host='127.0.0.1', port=6379, db=0)
       userBatch = ownBatcher(redisConnection.scan_iter('*'), 5000)
       usersToRemove = filterList.map(userBatch)
       removeUsers.map(unmapped(redisConnection), usersToRemove)

    flow.executor = LocalDaskExecutor()
    flow.run()

main()
I'm using Prefect 1.2.2. I test many different way to make batch. And Multiple times I was block cause scan_iter() is a generator and Prefect don't handle it.
r

redsquare

07/08/2022, 9:25 AM
Why not run the filter in the scan on redis rather than locally
Copy code
redisConnection.scan_iter(match='*.mydomain.net.*'):
p

Pierre-Edouard

07/08/2022, 9:27 AM
I can't because redis use a basic filtering language where you can't put negation
And here I want all records that are NOT matching '.\.mydomain.net.
r

redsquare

07/08/2022, 9:32 AM
ah sorry misread
p

Pierre-Edouard

07/08/2022, 9:34 AM
No problem 🙂 Thank you a lot, it was still a good advice 😉
r

redsquare

07/08/2022, 9:36 AM
perhaps lua to the rescue - issue a lua script to ttl all the non matching keys
p

Pierre-Edouard

07/08/2022, 9:43 AM
Develop it is easy in any kind of language. I just want to use Prefect for this task for the Schedule and parallelism features. And I want to progress on prefect to implement multiples other flow in next month
r

redsquare

07/08/2022, 9:47 AM
Sure - I just hear redis scan and production and it leaves me with tears
😂 1
p

Pierre-Edouard

07/08/2022, 9:52 AM
Why ? is scan() not safe ? (I know that keys() is totally unusable in prod but I have in mind that scan() is clean. I'm not redis master so I trust your experience 😉 )
r

redsquare

07/08/2022, 9:55 AM
SCAN blocks in redis - so if you have a large amount of concurrent services using redis they will be impacted - especially if you have a large amount of keys
a

Anna Geller

07/08/2022, 11:16 AM
Steve gave you some really good suggestions - are you good now @Pierre-Edouard? purely from the Prefect perspective your flow is currently fine for local development, but you may try to attach a LocalDaskExecutor for parallelism and move out the flow object to the main scope so that you can use the same code for both local and registered flow runs:
Copy code
@task
def ownBatcher(iterable, n):
    ...

@task()
def filterList(userToFilter):
    ...

@task()
def removeUsers(redisCon, users):
    ....

with Flow("clean_redis", executor=LocalDaskExecutor()) as flow:
     # your_DAG_here
the redisConnection call should ideally move to the ownBatcher task since connections cannot be shared between tasks easily when running in parallel/distributed setting
4 Views