摘要:最近在用python的一款异步web框架sanic搭建web服务,遇到一个需要加特定锁的场景:同一用户并发处理订单时需要排队处理,但不同用户不需要排队。
最近在用python的一款异步web框架sanic搭建web服务,遇到一个需要加特定锁的场景:同一用户并发处理订单时需要排队处理,但不同用户不需要排队。
如果仅仅使用async with asyncio.lock的话。会使所有请求都排队处理。
import asyncioimport datetimelock = asyncio.Lock # 模拟下单处理 print(f"{datetime.datetime.now.strftime('%Y-%m-%d %H:%M:%S:%f')} Processing order {order_id} for user {user_id}") await asyncio.sleep(1) # 假设处理需要 1 秒 print(f"{datetime.datetime.now.strftime('%Y-%m-%d %H:%M:%S:%f')} Order {order_id} for user {user_id} is done")# 定义一个测试函数 # 创建四个任务,模拟两个 UserID 的并发请求 tasks = [ asyncio.create_task(place_order(1, 101)),1, 102)),2, 201)),2, 202)), ] # 等待所有任务完成 await asyncio.gather(*tasks) # 运行测试函数 asyncio.run(test)这显然不是想要的结果,第二种方案是定义一个字典,key使用user_id,value为asyncio.Lock,每次执行前从字典里面获取lock,相同的user_id将会使用同一个lock,那就实现了功能。
locks = {} if user_id not in locks: locks[user_id] = asyncio.Lock async with locks[user_id]:# 定义一个测试函数 tasks = [ ] # 运行测试函数但是这个方案会有缺点是,user_id执行完成之后没有释放资源,当请求的user_id变多之后,势必会造成占用过多的资源。继续改进方案,将locks的value加一个计数器,当获取lock时计数器加1,使用完之后计数器-1,当计数器变为小于等于0时,释放locks对应的key。最后将这个功能封装为一个类方便其他地方调用。
import asyncio self.lock = asyncio.Lock mutex_locks[key] = MutexObj self.__mutex_obj = mutex_locks[key] self.__key = key def lock(self): """ 获取锁 :return: """ self.__mutex_obj.count += 1 return self.__mutex_obj.lock 释放锁 :return: """ if self.__mutex_obj.count from utils.mutex import Mutex, mutex_lockslocks = {} try: # 模拟下单处理 print(f"{datetime.datetime.now.strftime('%Y-%m-%d %H:%M:%S:%f')} Processing order {order_id} for user {user_id}") await asyncio.sleep(1) # 假设处理需要 1 秒 print(f"{datetime.datetime.now.strftime('%Y-%m-%d %H:%M:%S:%f')} Order {order_id} for user {user_id} is done") mutex.release print print(mutex_locks)# 定义一个测试函数 tasks = [ ] # 运行测试函数至此实现了我的需求,此方案只考虑了单应用场景,如果是分布式部署,需要更换方案如redis锁,这里暂且不考虑。如果有其他实现方式,欢迎留言交流。
来源:寂寞的咖啡