在使用python创建web服务过程当中,往往有时候需要使用到多进程,这个时候就会出现以下一些问题:
多进程怎么管理?
python创建进程拷贝资源导致占用太多怎么办?
进程之间数据怎么通信?
出现某些变量无法被序列化的时候进程通信怎么办?
共享内存的时候数据传输问题?
以上问题是我个人在使用fastapi的时候遇到的,问题在于我想要使用fastapi结合某些机器学习的模型为别人提供服务,由于机器学习的模型太过于庞大并且每一次计算都相当的耗费计算资源,因此想要使用多进程的方式进行解决。以下是解决问题的过程!
直接使用fastapi的多进程 没错fastapi自己也有多进程,只需要在启动的时候添加一个wokers参数就可以了,如下:
1 uvicorn.run(app='main:app' , host="127.0.0.1" , port=8000 , reload=True , debug=True ,works=2 )
用起来确实可以,但是一启动就炸了,这个多进程本质上多个fastapi一起启动的用法,在我启动之后我就发现了事情的不对,内存占用飙升,因为我第一次开了16个进程,多进程诶!!不过分吧,然后就炸了。
使用python的多进程 我后来打算直接使用python的多进程,使用倒是没问题,但是我发现把开多进程的位置放在那个地方很有讲究,因为按照程序的设计,我想要的是每一次一个请求分配一个进程才合理,所以如果函数调用太深入,进程越靠后越慢,几乎没啥乱用,并且效率还会更低。并且然后更离谱的事情就出现了,在使用原生的多进程的时候,CPU倒是不会停留在一个CPU上了,但是每次跑满的也只有一个CPU,我日他仙人,这跟没有什么区别。
再就是进程创建使用的是fork模式,直接拷贝父进程的所有资源,我不出意外的内存又炸了。
使用线程池加共享内存 线程池是我后来想的,害怕请求太多内存炸了,想搞个线程池,然后加上一段共享内存,不久能够保证子进程内存不会激增,又能够保证多个进程数量被控制么。当时觉得简直机智!我才用python自己的进程池还有Manager.dict()保存一些较大的变量,比如机器学习的模型之类的。
但是,但是,但是……真正上手的时候发现,我需要等待进程的返回结果啊,我把进程池声明一个全局变量的话我该怎么去拿到子进程的结果呢?我还要等待进程?没办法,先妥协一下,把进程池改成多进程,每个路由下发一个进程。然后又是坑爹的事情,onnx模型属于复杂模型,不能够被序列化,报错:
1 can‘t pickle onnxruntime.capi.onnxruntime_pybind11_state.InferenceSession objects
所以从一个进程共享到另一个进程的时候报错了。我内心是崩溃的!因为没办法进程共享内存的话,意味着每一个子进程都要占用相当多的资源,这根本不可取。
最后的办法 最后使用了一种看起来复杂的办法,就是自己创建一个多进程的对象,在多进程对象当中注册一个共享内存的对象,在使用的时候一切都在对象内部发生,注册的对象也能够正常调用,也不会发生复杂模型在共享的时候序列化问题了。
部分代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 from app.selfmodel.bo_zh import main_tfrom cnocr import CnOcr from cnstd import CnStd import multiprocessingfrom multiprocessing import managersfrom concurrent.futures import ProcessPoolExecutorfrom app.ocr_cn import *from app.selftranslation import *from app.translation import *from utils.tools import *from setting.setting import *class GlobalObject : def __init__ (self ) -> None : self.ocr_std = CnStd() self.model_dict = self.create_model_dict() def create_model_dict (self ): model_dict = {"Helsinki-NLP/opus-mt-en-zh" : get_var("utils/save_var/opus_mt_en_zh.bin" )} return model_dict def getStd (self ): return self.ocr_std def get_model (self, model_list ): models = [] for i in model_list: models.append(self.model_dict.get(i)) return models def ocr (self, path ): res_cn, soc_cn = ocr_std(path, self) return res_std, soc_std class MyManager (managers.BaseManager): pass res_dict = {} def proc_callback (res ): res_dict[res.result()['task_id' ]] = res.result()['res' ] return res_dict def proc_worker_ocr (gobj, task_id, path: str = "" ): return {"task_id" : task_id, "res" : gobj.ocr(path)} def proc_worker_standTran (gobj, task_id, tran_dict ): models = gobj.get_model(tran_dict.get("model_name_list" )) return {"task_id" : task_id, "res" : tran_distribution(models, tran_dict)} def proc_worker_default (task_id ): return task_id class ServerExecutor : def __init__ (self ): MyManager.register("GlobalObject" , GlobalObject) manager = MyManager() manager.start() self.global_object = manager.GlobalObject() _cpu_cunt = multiprocessing.cpu_count() if cpu_cunt == None else cpu_cunt self.executor = ProcessPoolExecutor(round (_cpu_cunt*ocr_cunt)) self.executor_trans = ProcessPoolExecutor(round (_cpu_cunt*tran_cunt)) def submit (self, task_id, task_type, **kwargs ): if (task_type == "OcrManager" ): future = self.executor.submit( proc_worker_ocr, self.global_object, task_id, path=kwargs.get("path" , "" )) elif (task_type == "TranManager" ): future = self.executor_trans.submit( proc_worker_standTran, self.global_object, task_id, tran_dict=kwargs.get("tran_dict" )) else : future = self.executor.submit(proc_worker_default, task_id) return future executor = ServerExecutor()
注意: 子进程本身需要从主进程获取自己所需要的资源,如果子进程获取太多就会导致一个问题,进程太多资源抢占导致内存交换问题,程序会直接崩溃。