SGLang EPD
Mooncake Transfer Engine
MooncakeTransferEngine:
封装 from mooncake.engine import TransferEngine 的 API:
通过 engine = init_mooncake_transfer_engine() 对外暴露初始化接口。
EPD using mooncake:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| class MMEncoder:
def __init__(...): self.mm_global_cache = EmbeddingCacheController() self.engine = init_mooncake_transfer_engine() self.embedding_to_send = dict()
async def encode(...): mm_data = EmbeddingData() self.embedding_to_send[req_id] = mm_data
async def _send(...): self.engine.register(embedding.data_ptr(), embedding.nbytes) self.engine.transfer_sync( session_id, embedding.data_ptr(), buffer_address, embedding.nbytes ) self.engine.deregister(embedding.data_ptr()) mm_data.embedding = None、 serialized_data = pickle.dumps(mm_data) buffer = None
class MMReceiverBase:
def __init__(...): self.embeddings_engine = init_mooncake_transfer_engine() self.embeddings_buffer = dict() async def recv_mm_data(...): embedding_port, recv_socket = get_zmq_socket_on_host() mm_data = self._extract_url_data(request_obj) asyncio.create_task(self.encode(req_id, mm_data, embedding_port, "encode", "send")) return await asyncio.wait_for(self._recv_mm_data(req_id, recv_socket, mm_processor, prompt)) if error: self._cleanup_mooncake_buffer(req_id) async def encode(...): for ...: encode_requests.append( { "encoder_idx": idx, "mm_items": [ mm_item.get("url") for mm_item in mm_data_modality[ cum_num_items : cum_num_items + assigned_num ] ], "num_parts": total_num_parts, "part_idx": part_idx, "req_id": part_req_id, "modality": modality.name, "prefill_host": self.host, "embedding_port": embedding_port, } ) async with aiohttp.ClientSession(...) as session: buffer_address = await self.allocate_embedding_buffer( req_id, total_embedding_bytes, ) for ...: buffer_address_adjust = offset + buffer_address response_json.update( { "session_id": self.embeddings_engine.session_id, "buffer_address": buffer_address_adjust, } )
async def allocate_embedding_buffer(...): embeddings = torch.empty(total_bytes, dtype=torch.uint8) self.embeddings_engine.register( embeddings.data_ptr(), embeddings.nbytes, ) self.embeddings_buffer[req_id] = embeddings return embeddings.data_ptr()
async def _recv_mm_data(...): recv_embedding = None recv_embedding_data: MultiModalEmbeddingData = None while ...: parts = await recv_socket.recv_multipart(copy=False) recv_obj: EmbeddingData = pickle.loads(parts[0]) if error: self._cleanup_mooncake_buffer(req_id) raw_buffer = self.embeddings_buffer.pop(req_id) self.embeddings_engine.deregister(raw_buffer.data_ptr()) recv_embedding = recv_embedding_data.get_embedding(is_concat=True) return mm_inputs def _cleanup_mooncake_buffer(req_id): embeddings = self.embeddings_buffer.pop(req_id, None) self.embeddings_engine.deregister(embeddings.data_ptr())
|
Mooncake Store
请分析 python/sglang/srt/mem_cache/storage/mooncake_store/ 目录下的代码的整体架构和功能,并阐述它在 sglang 的 EPD 场景下是怎么使用的?有什么好处?
要求图文并貌(流程图、架构图),并解释涉及的一些技术原理,生成一份markdown格式的报告放到当前根目录下。
当使用 mooncake 作为 EPD 的传输后端时,请分析 python/sglang/srt/disaggregation/ 目录下的 encode_server 和 encode_receiver 是怎么进行交互的(比如:消息同步、buffer 分配、数据传输等)?
要求图文并貌(流程图、架构图),并解释涉及的一些技术原理,生成一份markdown格式的报告放到当前根目录下。