Pierre-Edouard
07/08/2022, 9:15 AM#Clean_redis this flow is here to remove user which are not matching pattern
@task
def ownBatcher(iterable, n):
while True:
batch = []
for i in range(n):
try:
batch.append(next(iterable))
except StopIteration:
return batch
if len(batch) != 0:
yield batch
@task()
def filterList(userToFilter):
pattern = '.*\.mydomain.net.*'
users = []
for userTuple in userToFilter:
for user in userTuple:
if user:
if not re.match(pattern, user.decode("utf-8")):
users.append(user)
return users
@task()
def removeUsers(redisCon, users):
if(users):
redisCon.delete(*users)
def main():
with Flow("clean_redis") as flow:
redisConnection = redis.Redis(host='127.0.0.1', port=6379, db=0)
userBatch = ownBatcher(redisConnection.scan_iter('*'), 5000)
usersToRemove = filterList.map(userBatch)
removeUsers.map(unmapped(redisConnection), usersToRemove)
flow.executor = LocalDaskExecutor()
flow.run()
main()
redsquare
07/08/2022, 9:25 AMredisConnection.scan_iter(match='*.mydomain.net.*'):
Pierre-Edouard
07/08/2022, 9:27 AMredsquare
07/08/2022, 9:32 AMPierre-Edouard
07/08/2022, 9:34 AMredsquare
07/08/2022, 9:36 AMPierre-Edouard
07/08/2022, 9:43 AMredsquare
07/08/2022, 9:47 AMPierre-Edouard
07/08/2022, 9:52 AMredsquare
07/08/2022, 9:55 AMAnna Geller
07/08/2022, 11:16 AM@task
def ownBatcher(iterable, n):
...
@task()
def filterList(userToFilter):
...
@task()
def removeUsers(redisCon, users):
....
with Flow("clean_redis", executor=LocalDaskExecutor()) as flow:
# your_DAG_here
the redisConnection call should ideally move to the ownBatcher task since connections cannot be shared between tasks easily when running in parallel/distributed setting