参考
https://shen-shanshan.github.io/articles/vllm-v1-整体流程从请求到算子执行/
https://gitee.com/omniai/omniinfer/tree/release_v0.6.0/
https://github.com/vllm-project/vllm/tree/v0.9.0
https://ai.gitcode.com/ascend-tribe/openPangu-Embedded-7B-V1.1
整体框架图和pangu7b模型图
1 LLMEngine - 核心引擎
位置: vllm/engine/llm_engine.py
主要职责:
- 请求处理和生命周期管理
- 调度协调和输出管理
- 多步迭代处理
- 与tokenizer和detokenizer集成
关键特性:- class LLMEngine:
- def __init__(self, model_config: ModelConfig,
- cache_config: CacheConfig,
- parallel_config: ParallelConfig,
- scheduler_config: SchedulerConfig,
- device_config: DeviceConfig,
- lora_config: LoRAConfig,
- vision_language_config: VisionLanguageConfig,
- speculative_config: SpeculativeConfig,
- decoding_config: DecodingConfig,
- observability_config: ObservabilityConfig,
- prompt_adapter_config: PromptAdapterConfig,
- executor_class: Type[ExecutorBase]):
复制代码 核心方法:
- add_request(): 添加新请求
- step(): 执行一个推理步骤
- abort_request(): 中断请求
- has_unfinished_requests(): 检查未完成请求
2 Scheduler - 调度器
位置: vllm/core/scheduler.py
主要职责:
- 实现连续批处理(Continuous Batching)
- 内存感知的请求调度
- 优先级管理和抢占策略
- Chunked prefill支持
调度策略:- def _schedule_chunked_prefill(self) -> SchedulerOutputs:
- """使用chunked prefill调度排队请求"""
- # 1. 计算调度预算
- budget = SchedulingBudget(
- token_budget=self.scheduler_config.max_num_batched_tokens,
- max_num_seqs=self.scheduler_config.max_num_seqs,
- )
- # 2. 预填充阶段调度
- prefills = self._schedule_prefills(budget)
- # 3. 解码阶段调度
- decodes = self._schedule_decodes(budget)
- # 4. 抢占和内存管理
- self._handle_preemption(budget)
复制代码 关键数据结构:- @dataclass
- class SchedulingBudget:
- token_budget: int # token预算
- max_num_seqs: int # 最大序列数
- _num_cached_tokens: int = 0 # 缓存token数
- _num_batched_tokens: int = 0 # 批处理token数
复制代码 3 BlockManager - 内存管理器
位置: vllm/core/block_manager.py
主要职责:
- 实现PagedAttention内存管理
- KV缓存的块分配和回收
- Copy-on-Write内存共享
- 前缀缓存管理
核心算法:- class SelfAttnBlockSpaceManager(BlockSpaceManager):
- def allocate(self, seq_group: SequenceGroup) -> None:
- """为序列组分配内存块"""
- # 1. 计算需要的块数
- num_required_blocks = self._calculate_required_blocks(seq_group)
- # 2. 检查可用块
- if not self._has_enough_blocks(num_required_blocks):
- raise MemoryError("Insufficient GPU blocks")
-
- # 3. 分配块并建立映射
- block_table = self._allocate_blocks(seq_group, num_required_blocks)
-
- # 4. 更新序列状态
- seq_group.block_tables = block_table
复制代码 内存优化策略:
- 前缀缓存: 相同前缀的序列共享KV缓存
- Copy-on-Write: 派生序列共享内存块
- 滑动窗口: 限制长序列的内存使用
- 分层内存: GPU/CPU/磁盘三级存储
4 Attention后端
位置: vllm/attention/backends/
支持的后端:
- FlashAttention: 最高性能的attention实现
- FlashInfer: 高性能推理专用后端
- XFormers: 替代性attention实现
- Triton: 自定义triton内核
- Placeholder: 无attention模型的占位符
后端选择逻辑:- def get_attn_backend(head_size: int, dtype: torch.dtype,
- kv_cache_dtype: Optional[str], block_size: int,
- is_attention_free: bool = False) -> type[AttentionBackend]:
- """根据配置选择最优attention后端"""
- # 1. 检查是否为无attention模型
- if is_attention_free:
- return PlaceholderAttentionBackend
- # 2. 检查FlashAttention支持
- if FlashAttentionBackend.is_supported(head_size, dtype, kv_cache_dtype):
- return FlashAttentionBackend
- # 3. 检查其他后端支持
- if FlashInferBackend.is_supported(head_size, dtype, kv_cache_dtype):
- return FlashInferBackend
- # 4. 默认使用Triton后端
- return TritonAttentionBackend
复制代码 5 Worker和ModelRunner
位置: vllm/worker/
Worker架构:- class Worker(LocalOrDistributedWorkerBase):
- """在GPU上执行(分区)模型的worker类"""
- def __init__(self, vllm_config: VllmConfig,
- local_rank: int, rank: int,
- distributed_init_method: str):
- # 1. 初始化设备
- self.device = torch.device(f"cuda:{local_rank}")
- # 2. 创建模型运行器
- self.model_runner = ModelRunnerClass(vllm_config)
- # 3. 创建缓存引擎
- self.cache_engine = CacheEngine(vllm_config.cache_config,
- vllm_config.model_config,
- self.device)
- # 4. 初始化内存
- self._init_memory()
复制代码 ModelRunner职责:
- 模型前向传播执行
- 输入张量准备
- Attention元数据管理
- CUDA图优化
6 Executor框架
位置: vllm/executor/
Executor类型:
- UnipartExecutor: 单GPU执行器
- MultiprocessingExecutor: 多进程执行器
- RayDistributedExecutor: 基于Ray的分布式执行器
- PipelineParallelExecutor: 流水线并行执行器
分布式执行:- class RayDistributedExecutor(DistributedExecutorBase):
- def __init__(self, vllm_config: VllmConfig,
- placement_group: Optional[PlacementGroup] = None):
- # 1. 初始化Ray集群
- self._init_ray_cluster()
- # 2. 创建远程worker
- self._init_workers(vllm_config, placement_group)
- # 3. 初始化并行状态
- self._initialize_parallel_state(vllm_config.parallel_config)
- # 4. 加载模型
- self._load_model(vllm_config.model_config)
复制代码 来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |