目录

一、开篇:项目概述

二、基础铺垫:内存池核心概念

池化技术

三、前置实现:定长内存池(基础组件)

四、核心架构:三级缓存设计(ThreadCache→CentralCache→PageCache)

五、核心流程:内存申请与释放全解析

六、进阶优化:关键功能升级与优化

七、测试与验证:性能与正确性保障

八、项目收尾:源码与扩展


一、开篇:项目概述

        在 C/C++ 的学习过程中,mallocfree 是再熟悉不过的两个接口。但在真正的高并发场景下,传统的内存分配方式往往很难兼顾效率与线程安全:频繁申请和释放内存会带来较大的系统开销,而多线程竞争又会进一步放大性能问题。也正因如此,越来越多高性能系统开始使用专门设计的内存分配器来替代系统默认方案。
        TCMalloc 就是其中非常经典的一种。它是 Google 开源的高性能内存分配器,名字来源于 Thread-Caching Malloc,核心思想是通过线程本地缓存来减少锁竞争,从而提升多线程环境下的内存分配效率。凭借优秀的性能表现,TCMalloc 被广泛应用于工业级项目中,甚至 Go 语言的内存分配器设计也借鉴了它的部分思想。
        这个项目并不是对 TCMalloc 的完整复现,而是一次“抓住核心、化繁为简”的实现尝试。我在理解其整体设计思路的基础上,提炼出高并发内存池中最关键的部分,手动实现了一个 Mini 版高并发内存池。这样做的目的,不只是为了写出一个能运行的项目,更重要的是借助这个过程去真正理解:一个高性能内存分配器到底是如何组织内存、如何减少竞争、又是如何在效率与复杂度之间做平衡的。
        整个项目的实现过程中,会涉及不少底层与工程化知识,比如 C/C++、链表与哈希桶等数据结构、操作系统内存管理、多线程、互斥锁、单例模式 等内容。对于我来说,这不仅是一个内存池项目,更是一次把零散知识点串起来、从“会用”走向“理解原理”的实践过程。

二、基础铺垫:内存池核心概念

池化技术

        所谓池化,核心思想其实并不复杂:提前准备好一批可复用的资源,在需要时直接取用,使用完后再归还池中,而不是每次都重新创建和销毁。
        这样做的目的,是为了减少重复创建资源的成本,提高系统整体运行效率。

在软件开发中,池化技术非常常见。比如:

  • 连接池:预先创建数据库连接,避免频繁建立和断开连接;
  • 线程池:提前创建一组线程,任务到来后直接分配执行;
  • 对象池:复用一些创建代价较高的对象实例。

        这些技术虽然应用场景不同,但本质上解决的都是同一类问题:资源创建与销毁成本高,而资源又会被频繁使用。

        内存管理同样存在类似问题。如果程序频繁调用 mallocfree 来申请和释放内存,就会不断触发底层分配与回收逻辑,在高并发场景下还可能带来额外的锁竞争与性能损耗。既然如此,一个很自然的思路就是:能不能像管理线程、连接那样,把内存也放到“池”里统一管理和复用?

1. 什么是内存池?

        所谓 内存池(Memory Pool),可以简单理解为:提前向系统申请一大块内存,然后自行管理这块内存的分配与回收,而不是每次都直接调用 mallocfree

也就是说,内存池本质上做的是一层“中间管理”

  • 当程序需要内存时,不一定立刻向操作系统申请;
  • 当程序释放内存时,也不一定马上归还给操作系统;
  • 而是先由内存池统一接管、复用和调度这些内存资源。

这样做的核心目的,是为了减少频繁申请和释放内存带来的开销,提高内存分配效率。

可以把它类比成“水池”:

  • 操作系统提供的是总水源;
  • 内存池先提前蓄一池水;
  • 程序真正用水时,直接从池子里取;
  • 用完后也先回收到池子里,留给下一次继续使用。

因此,内存池的关键价值并不在于“创造内存”,而在于 更高效地组织和复用内存

2. 为什么需要内存池?

        在小型程序里,直接使用 mallocfree 通常没有太大问题。但在高并发、频繁申请小块内存的场景下,传统分配方式往往会暴露出几个明显问题。

(1)频繁系统调用,开销较大

        内存的申请和释放并不是“零成本”的。标准库的 mallocfree 底层仍然需要和操作系统的内存管理机制打交道,例如通过堆空间扩展、页分配等方式完成内存获取。
        如果程序中存在大量、频繁的小对象申请与释放,那么这些操作累积起来会产生明显的性能损耗。

(2)容易产生内存碎片

        程序申请内存的大小通常并不固定,有的申请几十字节,有的申请几百字节,释放顺序也未必和申请顺序一致。
        久而久之,内存空间中就可能出现很多“零零散散的小空洞”。这些空洞单独看似乎都能用,但当需要一块连续较大的空间时,却可能因为无法拼接而利用率下降,这就是常说的 内存碎片问题

图中这种情况,就属于外部碎片(External Fragmentation)
        一开始,进程地址空间中有一块连续的大空间,如左图所示。此时这块空间是完整的,理论上无论申请 128Byte、256Byte 还是更大的内存,都有机会从这片连续区域中切分出来,这样一来,原本完整的一大块连续空间,被切分成了若干个大小不同的已使用内存块。此时内存虽然被占用了,但整体布局还是比较规整的。
        问题出现在后续的释放阶段。假设 vector 对象销毁,释放了最上面的 256Byte;随后 list 对象也销毁,释放了最下面的 128Byte。于是就形成了右图所示的状态:空闲空间被分成了上下两段,中间仍然被 mapmysql 占据。
        系统中确实还有空闲内存:上面有 256Byte、下面有 128Byte。
        但这 384Byte 并不是一整块连续空间,而是被中间的已分配内存隔开了。于是,如果此时程序想申请一块 大于 256Byte 的连续内存,哪怕总空闲空间已经达到 384Byte,仍然可能申请失败。因为对于内存分配器来说,它需要的是一块足够大的连续区域,而不是多个零散的小块之和。

        除了外部碎片,内存管理中还经常会提到另一个概念:内部碎片(Internal Fragmentation)
两者的区别可以简单理解为

  • 外部碎片:空闲空间很多,但不连续,导致申请失败;
  • 内部碎片:已经分配出去的内存块,比用户真正需要的更大,多出来的部分没有被有效使用。

比如用户只申请了 30Byte,但分配器为了对齐,实际给了 32Byte,那么多出来的 2Byte 就属于内部碎片。

(3)多线程环境下锁竞争严重

        在高并发程序中,多个线程可能同时申请和释放内存
        如果所有线程都直接访问同一套全局分配结构,就必然需要加锁保护。线程一多,锁竞争就会变得激烈,性能也会迅速下降。
        这也是为什么高性能内存分配器往往会引入 线程本地缓存(Thread Local Cache) 的原因:让每个线程优先使用自己的“小仓库”,尽量减少线程之间的直接竞争。

(4)内存复用能力不足

        很多业务场景下,内存对象的生命周期都很短,而且申请大小具有明显规律。
        例如某类对象总是申请 64 字节、128 字节、256 字节,那么如果每次都重新向系统申请、释放,显然效率不高。
        内存池通过维护空闲块链表,可以让已经释放的内存块快速复用,避免重复开辟与销毁。

三、前置实现:定长内存池(基础组件)

        所谓“定长”,指的是内存池中每次分配出去的内存块大小是固定的。比如,一个定长内存池可能专门管理 64Byte 的内存块,用户每次申请时,拿到的都是同样大小的空间。虽然这种设计还不足以直接应对复杂场景,但它非常适合作为整个内存池体系的底层基础组件。
        从实现角度看,定长内存池解决的是一个很纯粹的问题:如何高效地管理一批大小相同、可以重复利用的内存块?

1. 为什么要先实现定长内存池?

直接上来就实现完整的高并发内存池,会同时遇到很多问题:

  • 不同大小内存如何分类;
  • 多个自由链表如何组织;
  • 线程竞争如何处理;
  • 中心缓存如何调度;
  • 大块内存如何按页切分;

定长内存池具备内存池的几个关键特征:

  • 一次向系统申请一大块内存
  • 按固定大小切分成多个小块
  • 使用自由链表管理空闲块
  • 分配时从链表取,释放时归还链表
  • 避免频繁调用系统分配接口

2. 定长内存池的基本思想

定长内存池的设计思路并不复杂,可以概括成下面几步:
        首先,内存池不会在每次申请时都立刻调用 malloc。
        相反,它会在内部维护一块较大的内存区域。当用户申请内存时,如果池中还有空闲块,就直接返回其中一块;如果池中已经没有可用块了,再统一向系统申请一大块新空间,切分后挂到空闲链表中。

整个过程可以抽象成这样:

  1. 向系统申请一大块原始内存
  2. 按固定大小切成若干小块
  3. 用链表把这些小块串起来
  4. 分配时从链表头摘下一个块
  5. 回收时再把该块头插回链表

        既然是内存池,那么我们首先得向系统申请一块内存空间,然后对其进行管理。要想直接向堆申请内存空间,在Windows下,可以调用VirtualAlloc函数;在Linux 下,可以调用brk或mmap函数。

// 平台相关头文件包含
#ifdef _WIN64
#include <Windows.h>
#else
#include <unistd.h>
#include <sys/mman.h>
#include <errno.h>
#include <cstring>
#endif

inline static void* SystemAlloc(size_t kpage)
{
    // 计算总字节数:页数 * 每页字节数
    size_t total_bytes = kpage << PAGE_SHIFT;

#ifdef _WIN64
    // Windows平台:使用VirtualAlloc申请内存
    // MEM_COMMIT: 提交内存页
    // MEM_RESERVE: 保留内存地址空间
    // PAGE_READWRITE: 可读可写权限
    void* ptr = VirtualAlloc(0, total_bytes, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
    // Linux平台:根据内存大小选择不同分配策略
    void* ptr = nullptr;
    const size_t MMAP_THRESHOLD = 128 * 1024; // mmap阈值(128KB)

    // 小内存(<128KB):使用sbrk扩展堆内存
    if (total_bytes < MMAP_THRESHOLD)
    {
        void* current_brk = sbrk(0); // 获取当前堆顶地址
        if (current_brk == (void*)-1)
        {
            throw std::bad_alloc();
        }
        void* new_brk = (char*)current_brk + total_bytes; // 新堆顶地址
        if (brk(new_brk) == -1) // 扩展堆
        {
            throw std::bad_alloc();
        }
        ptr = current_brk;
    }
    else
    {
        // 大内存(>=128KB):使用mmap申请匿名映射内存
        ptr = mmap(nullptr, total_bytes, PROT_READ | PROT_WRITE, 
                  MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
        if (ptr == MAP_FAILED)
        {
            throw std::bad_alloc();
        }
    }
#endif
    
    // 验证内存申请是否成功
    if (ptr == nullptr)
        throw std::bad_alloc();

    return ptr;
}

3. 定长内存池的成员变量

从本质上说,定长内存池主要管理两类资源:

  • 一类是向系统申请来的大块连续内存
  • 另一类是释放后可重复利用的小块内存

 _memory:指向当前大块内存的起始位置

——记录大块内存当前可切分位置,或者记录这块大内存的起始地址。

        定长内存池并不是每次申请对象时都直接调用 mallocnew,而是先一次性向系统申请一整块较大的连续内存,然后再从这块内存中按固定大小切出一个个小块供用户使用。
        为了能够管理这块大内存,我们首先需要一个指针来记录它的起始地址:

        方便切分内存块:这里之所以通常使用 char*,是因为 char* 在做指针运算时最方便,指针的类型决定了指针向前或向后走一步有多大距离,指针每移动 n,表示向后移动 n 个字节,这样在切分内存块时就可以精确控制偏移量。

size_t obj_size = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);

_memory += obj_size;// 移动内存指针

_remainBytes:记录当前大块内存还剩多少可用字节

——当前这块大内存还能不能继续被切分使用。

内存池在不断切分大块内存时,还需要知道:

  • 这块内存还剩多少空间;
  • 剩余空间是否还能再切出一个对象大小的小块;
  • 如果剩余空间不够,是不是该重新向系统申请一块新内存。

_freeList:管理已经释放回来的空闲内存块

——管理已经释放回来、可被重复利用的定长内存块。

        除了“从大块内存中继续切分”之外,定长内存池还有一个更重要的目标,就是复用已经释放过的内存块
        用户释放回来的一块定长内存,通常不会立刻还给操作系统,而是会先挂到一个“空闲链表”上,等下次有相同大小的申请时,直接从链表中取出复用。因此,还需要一个指针来指向这条自由链表的头结点,例如:

4. 定长内存池管理释放对象与申请对象

管理释放回来的对象

——把释放回来的对象重新串起来,挂到自由链表 _freeList 上,等待后续再次分配。

        对于还回来的定长内存块,我们可以用自由链表将其链接起来,但我们并不需要为其专门定义链式结构,我们可以让内存块的前4个字节(32位平台)或8个字节(64位平台)作为指针,存储后面内存块的起始地址即可。

为什么是前 4 / 8 个字节?

这是因为链表要存 next 指针,而指针大小与平台有关:

  • 32 位平台 下,指针大小通常是 4 字节
  • 64 位平台 下,指针大小通常是 8 字节

所以,当一个对象被释放后,我们只需要把这块对象内存的起始部分解释成“一个指针”,就能够把它链接到自由链表中。

// 适配void*类型的NextObj函数
// 把 obj 这块内存的起始地址,解释成一个 void* 类型的指针存储位置;
// 返回这块位置本身的引用;
// 这样我们就可以直接读写“当前对象块的 next 指针”。
inline static void*& NextObj(void* obj)
{
    return *(void**)obj;
}

        此在向自由链表插入被释放的内存块时,先让该内存块的前4个字节或8个字节存储自由链表中第一个内存块的地址,然后再让_freeList指向该内存块即可,也就是一个简单的链表头插操作。

当某个对象不再使用时,我们需要做两件事:

  1. 显式调用对象析构函数,释放对象内部可能管理的资源;
  2. 把对象挂回 _freeList,供后续复用
// 释放对象
void Delete(T* obj)
{
	// 显示调用析构函数
	obj->~T();

	// 头插到自由链表 (回收内存)
	// 将对象挂到 _freelist 头部
	/**(void**)obj = _freelist;*/
	NextObj(obj) = _freelist;// 新对象的next指向当前链表头
	_freelist = obj;// 更新链表头
}

申请对象

——当用户再次申请对象时,内存池应该优先考虑“复用”而不是“重新切分”。

第一优先级:先看 _freeList 是否有空闲块

        如果自由链表不为空,说明之前有对象已经释放回来,那么此时最合适的做法就是直接从链表头取一个对象块出来使用。
        取链表头的代码本质上就是头删

// 优先从自由链表中获取 (重复利用已释放的内存)
if (_freelist)
{
	// 头删逻辑:取出链表头部的内存块
	/*void* _next = *((void**)_freelist);*/
	void* _next = NextObj(_freelist);// 先找到当前头节点的下一个节点
	obj = (T*)_freelist;// 把当前头节点取出来作为本次要返回的对象
	_freelist = _next;// 最后让 _freelist 指向原来的下一个节点
}

第二优先级:如果 _freeList 为空,再从大块内存中切分

        如果自由链表为空,就说明当前没有可直接复用的对象块。这时只能继续从 _memory 指向的大块内存中划出一小块来分配。
        不过,在切分之前还必须先判断:当前剩余字节数 _remainBytes 是否足够切出一个对象大小的块。
        如果当前 _remainBytes < objSize,说明 _memory 指向的这一批大块内存已经不够再切出一个对象。这时就需要重新向系统申请一块新的大内存,然后更新内存池状态:

  • 重新设置 _memory
  • 重新设置 _remainBytes

例如可以一次申请 128KB:

// 自由链表为空,从大块内存池中切分
// 如果剩余空间不足以分配一个对象
if (_remainbyte < sizeof(T))
{
	// 重新向系统申请大块内存 (128KB)
	_remainbyte = 128 * 1024;
	// 这里使用 malloc 申请大块内存
	// 也可以使用 SystemAlloc,但需要处理页对齐逻辑
	_memory = (char*)SystemAlloc(_remainbyte); 
	if (_memory == nullptr)
	{
		throw std::bad_alloc();
	}
}

       如果当前_remainBytes >= objSize,就可以从 _memory 指向的位置取出一段 objSize 大小的内存作为当前对象块:

obj = (T*)_memory;// 当前对象地址就是 _memory 所指向的位置;
// 确保对象大小至少能存下一个指针 (用于自由链表链接)
// 即使 T 的大小很小 (如 char),分配的空间也要至少是指针大小 (4或8字节)
size_t obj_size = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);

_memory += obj_size;// _memory 向后移动 objSize 字节,指向下一块可切分位置;
_remainbyte -= obj_size;// _remainBytes 减去对应大小,记录剩余可用空间。

        为了保证释放时每一块都能挂回自由链表,需要确保每块对象的大小至少能够容纳一个指针,因此对象块大小通常这样确定:

// 确保对象大小至少能存下一个指针 (用于自由链表链接)
// 即使 T 的大小很小 (如 char),分配的空间也要至少是指针大小 (4或8字节)
size_t obj_size = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);

        需要注意的是,分配到原始内存 不等于 对象已经构造完成。从 _freeList 取出来也好,从 _memory 切出来也好,我们拿到的都只是“一段可用内存”,而不是一个已经初始化完成的 T 类型对象。因此,在返回给用户之前,还需要使用定位 new,在这段内存上显式调用对象构造函数:

new(obj) T;

 关于定位new与普通new的区别

  • 普通new:先分配内存,再调用构造函数;
  • 定位new:不分配内存,只在指定的地址上调用构造函数;

为什么要使用定位new?

1、如果程序频繁创建和销毁小对象,频繁调用new/delete可能会带来:

  • 性能开销;
  • 内存碎片;
  • 分配器锁竞争;

2、如果使用普通new

T* obj = new T;

那系统会再次去堆上申请新内存。这就绕开了你的内存池,相当于你前面从池里取内存的工作全白做了。

定长内存池整体代码如下:

#pragma once
#include "Common.h"

// 定长内存池模板
// 专门用于申请固定大小的对象 (例如 Span 对象)
// 相比 malloc/free,定长内存池性能更高,因为省去了计算大小和查找最适内存块的开销
template<class T>
class ObjectPool {
public:
	// 申请对象
	T* New()
	{
		T* obj = nullptr;
		// 优先从自由链表中获取 (重复利用已释放的内存)
		if (_freelist)
		{
			// 头删逻辑:取出链表头部的内存块
			/*void* _next = *((void**)_freelist);*/
			void* _next = NextObj(_freelist);// 先找到当前头节点的下一个节点
			obj = (T*)_freelist;// 把当前头节点取出来作为本次要返回的对象
			_freelist = _next;// 最后让 _freelist 指向原来的下一个节点
		}
		else
		{
			// 自由链表为空,从大块内存池中切分
			// 如果剩余空间不足以分配一个对象
			if (_remainbyte < sizeof(T))
			{
				// 重新向系统申请大块内存 (128KB)
				_remainbyte = 128 * 1024;
				// 这里使用 malloc 申请大块内存
				// 也可以使用 SystemAlloc,但需要处理页对齐逻辑
				_memory = (char*)SystemAlloc(_remainbyte); 
				if (_memory == nullptr)
				{
					throw std::bad_alloc();
				}
			}
			
			obj = (T*)_memory;
			// 确保对象大小至少能存下一个指针 (用于自由链表链接)
			// 即使 T 的大小很小 (如 char),分配的空间也要至少是指针大小 (4或8字节)
			size_t obj_size = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
			
			_memory += obj_size;// 移动内存指针
			_remainbyte -= obj_size;// 每切出一块内存,就减去对应大小
		}
		
		// 使用定位 new 初始化对象 (调用构造函数)
		new(obj)T;
		return obj;
	}

	// 释放对象
	void Delete(T* obj)
	{
		// 显示调用析构函数
		obj->~T();

		// 头插到自由链表 (回收内存)
		// 将对象挂到 _freelist 头部
		/**(void**)obj = _freelist;*/
		NextObj(obj) = _freelist;// 新对象的next指向当前链表头
		_freelist = obj;// 更新链表头
	}

private:
	// 指向大块内存的指针 (用于切分)
	char* _memory = nullptr;
	// 大块内存剩余字节数
	size_t _remainbyte = 0;
	// 自由链表头指针 (回收的对象挂在这里)
	void* _freelist = nullptr;
};

四、核心架构:三级缓存设计(ThreadCache→CentralCache→PageCache)

        前面的定长内存池,只解决了一个相对基础的问题:如何高效管理一批固定大小、可重复利用的内存块。但在真实的高并发内存分配场景中,仅靠一个定长内存池显然是不够的。因为实际申请的内存大小并不统一,同时还要面对多线程并发访问、全局共享与局部复用之间的平衡、大块内存的切分与回收等问题。

        在实现内存池时我们一般需要考虑到效率问题和内存碎片的问题,但对于高并发内存池来说,我们还需要考虑在多线程环境下的锁竞争问题

        因此,在这个项目中,内存池并没有采用“单层统一管理”的方式,而是设计成一个三级缓存结构:

  • ThreadCache:线程级缓存
  • CentralCache:中心级缓存
  • PageCache:页级缓存

高并发内存池设计整体框架:

1. 为什么要设计三级缓存?

1)锁竞争严重

        多个线程同时申请和释放内存时,必然频繁争抢同一把锁,性能会迅速下降。

2)小对象申请过于频繁

        对于 8B、16B、64B 这种高频小对象,如果每次都走全局结构甚至系统调用,开销会非常大。

3)大块内存不适合按对象粒度管理

        对于页级内存,如果仍然按“小块链表”的思路组织,会导致切分、合并和回收都很困难。

这个项目采用了一个很典型、也很实用的分层设计:

  • 线程本地层 优先满足本线程的小对象申请,尽量做到无锁;
  • 中央共享层 负责在线程之间转运和批量分发小对象;
  • 页缓存层 只管理以页为单位的大块连续内存。

2. 整体设计理念:小对象尽量本地化,大对象统一页管理

该项目对申请内存对象路径整体分为两类:

第一类:小于等于 256KB 的申请

这部分走三级缓存结构,由 ThreadCache -> CentralCache -> PageCache 协同完成。

第二类:大于 256KB 的申请

这部分不再走线程缓存和中心缓存,而是直接按页对齐后交给 PageCache 处理。代码中用 MAX_BYTES = 256 * 1024 作为小对象与大对象的分界线。

3. 第一层:ThreadCache —— 每个线程独享的小对象缓存

设计理念:尽量让线程在本地完成内存分配与释放,避免频繁加锁。
        从代码上出发,ThreadCache 通过 thread_local ThreadCache* TLS_thread_cache 实现线程局部存储,每个线程拥有自己独立的一份 ThreadCache 实例。也就是说,不同线程访问的是各自独立的缓存结构,因此绝大多数小对象分配过程都不需要加锁。

ThreadCache的核心数据结构

ThreadCache 内部维护的是:

FreeList _freeLists[BLOCK_NUM]; // 自由链表哈希桶数组
                                // BLOCK_NUM=208,对应[1,256KB]范围内的所有内存大小
                                // 索引由SizeClass::HB_Index()计算得到
                                // 每个桶独立管理对应大小的内存块,实现O(1)时间复杂度的内存分配

这意味着:

  • 用户申请 6 字节、7 字节、8 字节,最终可能都落到同一个对齐后的桶;
  • 用户申请 129 字节和 140 字节,也可能落到 16 字节对齐后的同一组桶中。

        所以,ThreadCache 的本质不是“按原始大小存对象”,而是:按对齐后的大小类别管理小对象自由链表。

ThreadCache 的分配逻辑

ThreadCache::Alloc(size) 的流程:

  • 先对用户申请大小进行对齐:SizeClass::RoundUp(size)
  • 再计算该大小对应的桶下标:SizeClass::HB_Index(size)
  • 如果对应桶的 FreeList 非空,直接弹出一个对象返回
  • 如果为空,就从 CentralCache 批量获取一批对象回来

ThreadCache 的释放逻辑

ThreadCache::Free(obj, size) 会先根据大小定位桶,然后把对象头插回对应的 FreeList。如果某个链表过长,就说明这个线程缓存了太多空闲对象,此时会调用 ListTooLong(),把一部分对象链表批量归还给 CentralCache

ThreadCache整体代码:

ThreadCache.h:

#pragma once
#include "Common.h"

// 定长内存池模板
// 专门用于申请固定大小的对象 (例如 Span 对象)
// 相比 malloc/free,定长内存池性能更高,因为省去了计算大小和查找最适内存块的开销
template<class T>
class ObjectPool {
public:
	// 申请对象
	T* New()
	{
		T* obj = nullptr;
		// 优先从自由链表中获取 (重复利用已释放的内存)
		if (_freelist)
		{
			// 头删逻辑:取出链表头部的内存块
			/*void* _next = *((void**)_freelist);*/
			void* _next = NextObj(_freelist);// 先找到当前头节点的下一个节点
			obj = (T*)_freelist;// 把当前头节点取出来作为本次要返回的对象
			_freelist = _next;// 最后让 _freelist 指向原来的下一个节点
		}
		else
		{
			// 自由链表为空,从大块内存池中切分
			// 如果剩余空间不足以分配一个对象
			if (_remainbyte < sizeof(T))
			{
				// 重新向系统申请大块内存 (128KB)
				_remainbyte = 128 * 1024;
				// 这里使用 malloc 申请大块内存
				// 也可以使用 SystemAlloc,但需要处理页对齐逻辑
				_memory = (char*)SystemAlloc(_remainbyte); 
				if (_memory == nullptr)
				{
					throw std::bad_alloc();
				}
			}
			
			obj = (T*)_memory;
			// 确保对象大小至少能存下一个指针 (用于自由链表链接)
			// 即使 T 的大小很小 (如 char),分配的空间也要至少是指针大小 (4或8字节)
			size_t obj_size = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
			
			_memory += obj_size;// 移动内存指针
			_remainbyte -= obj_size;// 每切出一块内存,就减去对应大小
		}
		
		// 使用定位 new 初始化对象 (调用构造函数)
		new(obj)T;
		return obj;
	}

	// 释放对象
	void Delete(T* obj)
	{
		// 显示调用析构函数
		obj->~T();

		// 头插到自由链表 (回收内存)
		// 将对象挂到 _freelist 头部
		/**(void**)obj = _freelist;*/
		NextObj(obj) = _freelist;// 新对象的next指向当前链表头
		_freelist = obj;// 更新链表头
	}

private:
	// 指向大块内存的指针 (用于切分)
	char* _memory = nullptr;
	// 大块内存剩余字节数
	size_t _remainbyte = 0;
	// 自由链表头指针 (回收的对象挂在这里)
	void* _freelist = nullptr;
};

ThreadCache.cpp:

// ============================================================================// 高并发内存池 - ThreadCache.cpp// 线程本地缓存层实现// 提供无锁内存分配与释放,实现慢启动策略和内存回收机制
#include "CentralCache.h"
#include "ThreadCache.h"

// 定义与声明分离
thread_local ThreadCache* TLS_thread_cache = nullptr;

// @param size: 申请的原始字节数
// @return: 分配的内存指针
// @note: 快速分配小内存 (<=256KB),大内存直接调用系统分配
void* ThreadCache::Alloc(size_t size)
{
	assert(size <= MAX_BYTES);
	// 1. 计算内存大小对应的对齐值
	 size_t alignSize = SizeClass::RoundUp(size);
	// 2. 计算对齐值对应的桶索引
	 size_t index = SizeClass::HB_Index(size);
	// 3. 检查对应FreeList是否有可用内存块
	if (!_freeLists[index].Empty())
	{
		// 3.1 有可用内存块,直接从FreeList弹出返回
		return _freeLists[index].Pop();
	}
	else
	{
		// 3.2 无可用内存块,从CentralCache批量获取
		return FetchFromCentralCache(index, alignSize);	
	}
}

// -----------------------------------------------------------------------// 内存释放函数// -----------------------------------------------------------------------
// @param obj: 要释放的内存指针
// @param size: 内存块的原始字节数
// @note: 快速释放小内存 (<=256KB),大内存直接调用系统释放
void ThreadCache::Free(void* obj, size_t size)
{
	assert(obj);
	assert(size <= MAX_BYTES); // 大内存直接调用系统释放

	// 1. 计算内存大小对应的桶索引
	 size_t index = SizeClass::HB_Index(size);
	// 2. 将内存块归还到对应FreeList
	_freeLists[index].Push(obj);

	// 3. 检查FreeList是否过长,若过长则回收部分内存到CentralCache
	//    避免ThreadCache占用过多内存,平衡内存使用
	if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
	{
		ListTooLong(_freeLists[index], size);// _freeLists[index]过长,回收部分内存
	}
}

// 过长链表内存回收
// @param list: 要检查的FreeList对象
// @param size: 单个内存块的大小
// @note: 将ThreadCache中过长的内存块链表回收一部分到CentralCache
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
	void* start = nullptr;
	void* end = nullptr;
	// 1. 从list中取出MaxSize数量的内存块
	list.PopRange(start, end, list.MaxSize());

	// 2. 将取出的内存块链表释放到CentralCache
	CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}

// 从CentralCache批量获取内存对象
// @param index: 内存大小对应的FreeList桶索引
// @param alignSize: 对齐后的内存块大小
// @return: 分配给应用层的内存指针
// @note: 实现慢启动策略,动态调整批量获取的内存块数量
void* ThreadCache::FetchFromCentralCache(size_t index, size_t alignSize)
{
	// 慢启动策略:控制向CentralCache批量获取的内存对象数量
	// 1. 初始获取数量较少,避免一次性获取过多内存
	// 2. 根据内存大小动态调整,小对象多获取,大对象少获取
	// 3. 随着ThreadCache使用频率增加,获取数量逐渐增长
	// batchNum : 期望获取的内存对象数量,初始值为1,最大值由SizeClass::NumMoveSize()计算得到
	 size_t batchNum = (std::min)(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(alignSize));
	if (batchNum == _freeLists[index].MaxSize())
	{
		// 慢启动阈值增长:表明该大小内存使用频繁,下次获取更多
		_freeLists[index].MaxSize() += 1;
	}
	// 初始化内存块链表指针
	void* start = nullptr;
	void* end = nullptr;
	// 从CentralCache批量获取内存块
	 size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, alignSize);
	assert(actualNum >= 1);
	
	// 处理获取的内存块
	if (actualNum == 1)
	{
		// 3.1 只获取到一个内存块,直接返回给应用层
		assert(start == end);
		return start;
	}
	else {
		// 3.2 获取到多个内存块,将剩余内存块保存到ThreadCache的FreeList
		//      - 第一个内存块直接返回给应用层
		//      - 剩余内存块批量插入到对应FreeList
		_freeLists[index].PushRange(NextObj(start), end, actualNum-1);
		return start;
	}
}

4. 第二层:CentralCache —— 线程之间共享的中心调度层

——负责线程间共享小对象,批量分发对象块,跟踪 Span 的使用状态,并在合适时将完整空闲 Span 归还给 PageCache。

设计理念:当线程本地不够用时,多个线程之间如何共享和调度内存。
CentralCache 是一个全局单例对象,它不直接面向应用层,而是作为 ThreadCachePageCache 之间的桥梁存在。

static CentralCache SR_Inst; // 单例实例

CentralCache 的核心结构

CentralCache 内部维护:

 // SpanList哈希桶数组
 // 1. 数组长度 BLOCK_NUM = 208,与ThreadCache的桶数量一致
 // 2. 每个索引对应一个SpanList,管理特定大小的Span
 // 3. 每个Span管理一段连续的页内存,切分成相同大小的小内存块
 SpanList _spanlists[BLOCK_NUM];

这里同样是 208 个桶,并且和 ThreadCache 的桶数量是一致的。不同的是:

  • ThreadCache 的每个桶里挂的是“小对象链表”
  • CentralCache 的每个桶里挂的是“Span 链表”

对于Span的定义

一个 Span 主要描述:Span 是一段连续页内存的管理单元,它既知道自己占了多少页,也知道这些页切成了什么规格的小对象。

  • 从哪一页开始:_pageId
  • 一共有多少页:_npage
  • 这批页被切成了多大的小对象:_objsize
  • 当前还有哪些空闲对象:_freeList
  • 已经分配出去多少对象:_useCount

CentralCache 如何向 ThreadCache 分发对象?

CentralCache::FetchRangeObj(start, end, n, size) 负责批量ThreadCache 提供对象。

其基本流程是:

  1. 根据对象大小算出桶下标;
  2. 给当前桶加锁;
  3. 在对应的 SpanList 中找一个 _freeList 不为空的 Span;
  4. 从该 Span 的自由链表上切下一段对象链表返回给 ThreadCache
  5. 增加该 Span 的 _useCount,表示这些对象已被线程缓存拿走。

中心层没有可用 Span 时

这就会进入 CentralCache::GetOneSpan() 的逻辑:

  • 先在当前桶里遍历 SpanList,查找是否有可用 Span;
  • 如果没有,则暂时解开当前桶锁;
  • 再去加 PageCache 的大锁;
  • PageCache 申请一个合适页数的新 Span;
  • 申请回来后,按照当前对象大小把这个 Span 切分成一个个小对象,组织成 span->_freeList
  • 最后再把这个 Span 挂回当前桶。

CentralCache 的回收逻辑

        当 ThreadCache 某个桶太长时,会批量把一段对象链表还给 CentralCache
CentralCache::ReleaseListToSpans(start, size) 会依次遍历这些对象,把对象头插回 span->_freeList,同时减少 span->_useCount,并通过 PageCache::MapObjectToSpan(ptr) 找到每个对象所属的 Span。

        一旦某个 Span 的 _useCount 减到 0,说明这个 Span 切出的对象已经全部归还,没有线程再持有它,此时就会:

  • 将该 Span 从当前 CentralCache 桶中摘下;
  • 归还给 PageCache 进一步处理。

5. 第三层:PageCache —— 以页为单位管理大块内存

PageCache 是整个内存池架构中最底层的一层,它直接面向操作系统,主要负责:

  • 按页申请大块连续内存
  • 把大 Span 切分成更小的 Span(以页构成的)
  • 回收空闲 Span 并尝试和前后邻接的 Span 合并

PageCache 的核心结构

PageCache 内部维护:

 // 哈希桶数组
 // 数组长度 PAGES_NUM = 129
 // 下标 1 挂 1 页的 Span,下标 128 挂 128 页的 Span
SpanList _spanlists[PAGES_NUM];

其中 PAGES_NUM = 129,也就是说它的桶下标表示“页数”,含义是:

  • _spanlists[1] 挂 1 页的 Span
  • _spanlists[2] 挂 2 页的 Span
  • _spanlists[128] 挂 128 页的 Span

其中 PAGES_NUM = 129,也就是说它的桶下标表示“页数”,含义是:

  • _spanlists[1] 挂 1 页的 Span
  • _spanlists[2] 挂 2 页的 Span
  • _spanlists[128] 挂 128 页的 Span

PageCache 如何分配 Span?——PageCache::NewSpan(k) 的逻辑分三步

1)如果请求页数大于 128 页

直接向系统申请这么多页,单独创建一个 Span 返回,不走常规桶管理。

2)如果对应页数桶非空

直接从 _spanlists[k] 中弹出一个 Span 返回。

3)如果对应页数桶为空

向更大的桶中查找第一个可用 Span,找到后把它拆成两部分:

  • 前一部分:正好是当前请求的 k 页,返回给调用者;
  • 后一部分:剩余页重新挂回对应页数的桶中。

4)如果所有桶都没有

直接向系统申请一个 128 页的大 Span,放入 128 页桶中,再递归重新申请。

PageCache 为什么还要维护页号映射表?

std::unordered_map<PAGE_ID, Span*> _pageid_span_map;

这个映射主要有两个用途:

1)对象释放时定位所属 Span

无论是小对象还是大对象,只要给定对象地址 ptr,都可以先右移 PAGE_SHIFT 算出页号,再通过 MapObjectToSpan(ptr) 找到它所属的 Span。

2)Span 合并时定位前后邻居

ReleaseSpanToPageCache(span) 中,代码会通过:

  • span->_pageId - 1 找前一页
  • span->_pageId + span->_npage 找后一页

        再去映射表中查相邻页是否属于某个空闲 Span。若前后邻居存在、未使用且合并后页数不超过 128 页,就执行合并。

所以 PageCache 不仅负责按页数分类,还负责按页号定位,这两套映射是同时存在的:

  • 页数 -> 桶
  • 页号 -> Span

PageCache 的回收与合并

——整个内存池的页级碎片整理工作。

CentralCache 归还一个已完全空闲的 Span 时,PageCache::ReleaseSpanToPageCache() 会尝试:

  1. 向前看,能不能和前一个空闲 Span 合并;
  2. 向后看,能不能和后一个空闲 Span 合并;
  3. 合并完之后,再把新的大 Span 挂回对应页数的桶。

这一步非常关键,因为如果没有合并机制,那么页内存会越来越碎,虽然总量还够,但连续页申请会越来越困难。

6. 三层之间的协同工作

情况一:申请小对象(<= 256KB)

第一步:进入 ConcurrentAlloc

入口函数先判断大小是否超过 MAX_BYTES。若没有超过,则走线程缓存路径。

第二步:ThreadCache 优先尝试本地分配

  • 根据大小对齐
  • 映射到对应桶
  • 如果 FreeList 非空,直接弹出返回。

第三步:本地没有,则向 CentralCache 批量申请

ThreadCache::FetchFromCentralCache() 会根据慢启动阈值和 SizeClass::NumMoveSize(size) 计算本次应该拿多少对象,然后调用 CentralCache::FetchRangeObj()

第四步:CentralCache 查找或创建 Span

  • 如果当前桶有可用 Span,直接切下一段对象链表;
  • 如果没有,则向 PageCache 申请新的页块,再切分成对象。

第五步:PageCache 提供页级内存

  • 优先精确匹配页数
  • 不够则找更大的 Span 拆分
  • 再不够则向系统申请新的大块页内存。

        最后,切好的对象会一路返回到 ThreadCache,线程拿走第一个对象,其余的挂回本地 FreeList,后续再次申请时就可以直接本地命中。

7. 小对象释放时的回收路径

——先局部缓存,再逐层回收,最后在页级做统一整理。

第一步:先通过对象地址找到其所属 Span

ConcurrentFree(ptr) 里先调用 MapObjectToSpan(ptr),拿到该对象所在的 Span,再根据 span->_objsize 判断这是大对象还是小对象。

第二步:小对象优先还给当前线程的 ThreadCache

ThreadCache::Free(obj, size) 把对象头插到本地对应大小的 FreeList 中。

第三步:若本地链表过长,则部分回收给 CentralCache

当某个自由链表长度达到阈值时,通过 ListTooLong() 批量归还给 CentralCache

第四步:CentralCache 归还对象到对应 Span

ReleaseListToSpans() 会逐个找到对象所属 Span,归还到 span->_freeList,并减少 _useCount。如果某个 Span 的 _useCount 减为 0,说明它已经完全空闲,可以从中心层移除并归还给 PageCache

第五步:PageCache 进行页级合并

PageCache 接收到空闲 Span 后,尝试与前后相邻空闲 Span 合并,再按新的页数重新挂回桶中。

8. 映射关系:三级缓存真正串起来的关键

1)ThreadCache 的映射

用户申请大小 sizeRoundUp(size) 得到对齐大小→ HB_Index(size) 得到桶下标→ 访问 _freeLists[index]

2)CentralCache 的映射

对象大小 sizeHB_Index(size)→ 访问 _spanlists[index]

3)CentralCache 到 PageCache 的映射

对象大小 sizePageMoveSize(size)→ 计算出需要的页数 kpage→ 调用 PageCache::NewSpan(kpage)

4)PageCache 的映射

页数 kpage_spanlists[kpage] 管理对应页数的空闲 Span。

5)对象地址到 Span 的映射

对象地址 ptrpageId = (PAGE_ID)ptr >> PAGE_SHIFT_pageid_span_map[pageId]→ 找到所属 Span。

9. 锁设计:为什么三层的加锁策略不同?

ThreadCache:无锁

因为它是线程私有的,通过 TLS 实现,每个线程只访问自己的实例。

CentralCache:每个桶一把锁

因为它是全局共享的,但不同大小类别之间可以独立并发访问,所以采用的是桶锁。每个 SpanList 都有自己的 _mtx

PageCache:只有一把大锁

因为它操作频率相对较低,但逻辑较复杂,涉及切分、回收、前后合并、页号映射表维护等,因此统一用 _pagemtx 保护,简化并发控制。

10. 小结

ThreadCache

解决线程本地小对象快速分配问题,尽量做到无锁。

CentralCache

解决多线程之间小对象共享与批量调度问题,用 Span 作为对象与页之间的桥梁。

PageCache

解决页级大块内存的申请、切分、映射与合并问题,是整个系统的页级资源管理中心。

五、核心流程:内存申请与释放全解析

内存申请路径:

内存释放路径:

六、测试总结:从功能正确到并发稳定,再到性能对比

第一类是功能测试,主要验证程序是否能正确分配和释放内存。

功能测试部分一共分成了 5 小项,主要定义在 FunctionalTest.cpp 中。

1. 基础申请与释放测试

        最基础的测试是 TestBasicAllocFree()。它一次申请几块不同大小的内存,比如 1 字节、8 字节、16 字节、64 字节、128 字节,然后再逐个释放。

        验证的功能是:

  • ConcurrentAlloc(size) 是否能返回非空指针;
  • ConcurrentFree(ptr) 是否能正常回收;
  • 对于不同大小的小对象,内存池最基本的路径是否通畅;

2. 不同大小对象测试

        接下来是 TestDifferentSizeAlloc()。这里不是只测一个固定大小,而是测试多个典型大小,比如 1、2、3、7、8、9、15、16、17、32、64、127、128、129,一直到 8192 字节。

因为项目内部是按大小类进行管理的,所以测试时必须覆盖多个边界值,例如:

  • 7 和 8
  • 8 和 9
  • 127、128、129
  • 1023、1024、1025

这些边界值很关键,因为它们往往对应“是否进入新的对齐区间”或“是否进入新的桶”。

3. 写入/读取测试

        在 TestWriteRead() 中,我不只是申请内存,还对申请到的内存进行了写入和读取验证。代码里通过 memset(p, 0x5A, sz); 把整块内存写成固定值,然后检查首字节和末字节是否仍然是这个值。

验证内容:

  • 分配出来的指针是否真的可用
  • 这块内存是否能正常读写

4. 批量申请与批量释放测试

TestBatchAllocFree() 中,代码会连续申请 10000 次内存,再统一释放。申请大小会在 1 到 1024 字节之间变化。

验证内容:

  • 大量申请时是否稳定
  • 批量释放时是否正常
  • 内存池内部缓存结构是否能承受连续操作

5. 大对象测试

        功能测试里的最后一项是 TestBigObjectAllocFree(),它会申请大于 MAX_BYTES 的对象,比如 MAX_BYTES + 1MAX_BYTES + 1024、512KB、1MB。

        当前项目里,小对象和大对象不是走同一条路径。项目内部把 MAX_BYTES 定义为 256KB,超过这个值就不再走普通的小对象缓存流程,而是走大对象分配路径。所以如果只测小对象,就等于漏掉了一整条逻辑分支。

#include "Test.h"
// ============================================================
// 功能测试 1:基础申请与释放
// 目的:验证最基本的 alloc/free 是否能正常运行
// ============================================================
void TestBasicAllocFree()
{
    PrintTitle("功能测试 1:基础申请与释放");

    void* p1 = ConcurrentAlloc(1);
    void* p2 = ConcurrentAlloc(8);
    void* p3 = ConcurrentAlloc(16);
    void* p4 = ConcurrentAlloc(64);
    void* p5 = ConcurrentAlloc(128);

    assert(p1 != nullptr);
    assert(p2 != nullptr);
    assert(p3 != nullptr);
    assert(p4 != nullptr);
    assert(p5 != nullptr);

    cout << "p1 = " << p1 << endl;
    cout << "p2 = " << p2 << endl;
    cout << "p3 = " << p3 << endl;
    cout << "p4 = " << p4 << endl;
    cout << "p5 = " << p5 << endl;

    ConcurrentFree(p1);
    ConcurrentFree(p2);
    ConcurrentFree(p3);
    ConcurrentFree(p4);
    ConcurrentFree(p5);

    PrintPass("TestBasicAllocFree");
}

// ============================================================
// 功能测试 2:不同大小对象测试
// 目的:覆盖多个 size class,观察不同大小是否都能正常工作
// ============================================================
void TestDifferentSizeAlloc()
{
    PrintTitle("功能测试 2:不同大小对象测试");

    vector<size_t> sizes = {
        1, 2, 3, 7, 8, 9, 15, 16, 17,
        32, 64, 127, 128, 129,
        256, 512, 1023, 1024, 1025,
        2048, 4096, 8192
    };

    vector<void*> ptrs;
    ptrs.reserve(sizes.size());

    for (size_t sz : sizes)
    {
        void* p = ConcurrentAlloc(sz);
        assert(p != nullptr);
        ptrs.push_back(p);

        cout << "alloc size = " << sz << ", ptr = " << p << endl;
    }

    for (void* p : ptrs)
    {
        ConcurrentFree(p);
    }

    PrintPass("TestDifferentSizeAlloc");
}

// ============================================================
// 功能测试 3:写入/读取测试
// 目的:验证申请出来的内存可以正常访问,避免只“分配成功但不能用”
// 注意:这里每块都写入 size 个字节
// ============================================================
void TestWriteRead()
{
    PrintTitle("功能测试 3:写入/读取测试");

    vector<size_t> sizes = { 8, 16, 32, 64, 128, 256, 512, 1024 };
    vector<void*> ptrs;
    ptrs.reserve(sizes.size());

    for (size_t sz : sizes)
    {
        void* p = ConcurrentAlloc(sz);
        assert(p != nullptr);

        // 将整块内存写成固定字节
        memset(p, 0x5A, sz);

        // 简单检查前后若干字节是否写入成功
        unsigned char* bytes = static_cast<unsigned char*>(p);
        assert(bytes[0] == 0x5A);
        assert(bytes[sz - 1] == 0x5A);  

        ptrs.push_back(p);
        cout << "write/read size = " << sz << " success" << endl;
    }

    for (void* p : ptrs)
    {
        ConcurrentFree(p);
    }

    PrintPass("TestWriteRead");
}

// ============================================================
// 功能测试 4:批量申请与批量释放
// 目的:验证批量申请、集中释放是否正常
// ============================================================
void TestBatchAllocFree()
{
    PrintTitle("功能测试 4:批量申请与批量释放");

    const size_t N = 10000;
    vector<void*> ptrs;
    ptrs.reserve(N);

    for (size_t i = 0; i < N; ++i)
    {
        size_t sz = (i % 1024) + 1;
        void* p = ConcurrentAlloc(sz);
        assert(p != nullptr);
        ptrs.push_back(p);
    }

    for (void* p : ptrs)
    {
        ConcurrentFree(p);
    }

    PrintPass("TestBatchAllocFree");
}

// ============================================================
// 功能测试 5:大对象测试
// 目的:覆盖 size > MAX_BYTES 的路径
// 当前项目里 MAX_BYTES = 256KB
// ============================================================
void TestBigObjectAllocFree()
{
    PrintTitle("功能测试 5:大对象测试");

    vector<size_t> bigSizes = {
        MAX_BYTES + 1,
        MAX_BYTES + 1024,
        512 * 1024,
        1024 * 1024
    };

    vector<void*> ptrs;
    ptrs.reserve(bigSizes.size());

    for (size_t sz : bigSizes)
    {
        void* p = ConcurrentAlloc(sz);
        assert(p != nullptr);

        // 为了安全,这里只写少量头部字节,不整块 memset 到 1MB
        memset(p, 0x3C, 64);

        ptrs.push_back(p);
        cout << "big alloc size = " << sz << ", ptr = " << p << endl;
    }

    for (void* p : ptrs)
    {
        ConcurrentFree(p);
    }

    PrintPass("TestBigObjectAllocFree");
}

第二类是并发测试,主要验证在多线程环境下是否会出问题。

1. 多线程同线程申请/释放测试

TestMultiThreadAllocFree() 的逻辑比较直接:创建 4 个线程,每个线程自己申请、自己释放。每轮申请 5000 次,一共做 5 轮。

验证内容:

  • 多线程同时运行时程序会不会崩溃
  • 是否会出现明显死锁
  • 线程本地缓存是否能发挥作用

2. 多线程随机大小压力测试

TestMultiThreadRandomSize() 更接近真实场景。每个线程都会随机申请 1 到 8192 字节的对象,并且不是申请一个就立刻释放,而是先攒到一定数量再统一释放。

验证内容:

  • 不同大小对象混合出现
  • 多个线程同时竞争共享缓存
  • ThreadCache、CentralCache、PageCache 三层之间发生更复杂的交互

3. 长时间压力测试

最后一个并发测试是 StressTest()。它会创建 4 个线程,每个线程进行 50000 轮随机大小申请和释放,并对申请到的内存做简单写入。

验证内容:

  • 长时间运行是否稳定
  • 是否会越跑越慢
  • 是否会出现偶发崩溃
  • 是否存在潜在死锁或资源管理问题
#include "Test.h"
// ============================================================
// 并发测试 1:多线程同线程申请/释放
// 目的:每个线程自己申请、自己释放,验证基本并发稳定性
// ============================================================
void WorkerSameThreadAllocFree(size_t tid, size_t rounds, size_t n)
{
    for (size_t r = 0; r < rounds; ++r)
    {
        vector<void*> ptrs;
        ptrs.reserve(n);

        for (size_t i = 0; i < n; ++i)
        {
            size_t sz = (i % 1024) + 1;
            void* p = ConcurrentAlloc(sz);  
            assert(p != nullptr);
            ptrs.push_back(p);
        }

        for (void* p : ptrs)
        {
            ConcurrentFree(p);
        }
    }

    cout << "thread " << tid << " finished" << endl;
}

void TestMultiThreadAllocFree()
{
    PrintTitle("并发测试 1:多线程同线程申请/释放");

    thread t1(WorkerSameThreadAllocFree, 1, 5, 5000);
    thread t2(WorkerSameThreadAllocFree, 2, 5, 5000);
    thread t3(WorkerSameThreadAllocFree, 3, 5, 5000);
    thread t4(WorkerSameThreadAllocFree, 4, 5, 5000);

    t1.join();
    t2.join();
    t3.join();
    t4.join();

    PrintPass("TestMultiThreadAllocFree");
}

// ============================================================
// 并发测试 2:多线程随机大小压力测试
// 目的:模拟更真实的申请场景,随机申请 1~8192 字节
// ============================================================
void WorkerRandomSizeStress(size_t tid, size_t rounds)
{
    std::mt19937_64 rng(
        (unsigned long long)std::hash<std::thread::id>{}(std::this_thread::get_id())
    );
    std::uniform_int_distribution<size_t> dist(1, 8192);

    vector<void*> hold;
    hold.reserve(1024);

    for (size_t i = 0; i < rounds; ++i)
    {
        size_t sz = dist(rng);
        void* p = ConcurrentAlloc(sz);
        assert(p != nullptr);
        hold.push_back(p);

        // 攒到一定数量再统一释放,模拟缓存/批量回收场景
        if (hold.size() >= 512)
        {
            for (void* ptr : hold)
            {
                ConcurrentFree(ptr);
            }
            hold.clear();
        }
    }

    for (void* ptr : hold)
    {
        ConcurrentFree(ptr);
    }

    cout << "random stress thread " << tid << " finished" << endl;
}

void TestMultiThreadRandomSize()
{
    PrintTitle("并发测试 2:多线程随机大小压力测试");

    thread t1(WorkerRandomSizeStress, 1, 20000);
    thread t2(WorkerRandomSizeStress, 2, 20000);
    thread t3(WorkerRandomSizeStress, 3, 20000);
    thread t4(WorkerRandomSizeStress, 4, 20000);

    t1.join();
    t2.join();
    t3.join();
    t4.join();

    PrintPass("TestMultiThreadRandomSize");
}
// ============================================================
// 并发测试 3:长时间压力测试
// 目的:验证长时间跑是否稳定,是否容易出现死锁/崩溃/明显异常
// ============================================================
void StressTest()
{
    PrintTitle("并发测试 3:长时间压力测试");

    const size_t threadCount = 4;
    const size_t rounds = 50000;

    vector<thread> threads;
    threads.reserve(threadCount);

    for (size_t i = 0; i < threadCount; ++i)
    {
        threads.emplace_back([i, rounds]() {
            std::mt19937_64 rng(
                (unsigned long long)std::hash<std::thread::id>{}(std::this_thread::get_id())
            );
            std::uniform_int_distribution<size_t> dist(1, 4096);

            for (size_t r = 0; r < rounds; ++r)
            {
                size_t sz = dist(rng);
                void* p = ConcurrentAlloc(sz);
                assert(p != nullptr);

                // 写入前几个字节,验证可访问
                memset(p, 0x7F, (sz < 32 ? sz : 32));

                ConcurrentFree(p);
            }

            cout << "stress thread " << i << " finished" << endl;
            });
    }

    for (auto& t : threads)
    {
        t.join();
    }

    PrintPass("StressTest");
}

第三类是性能测试,主要把自定义内存池和系统 malloc/free 进行对比,看看性能是否真的有提升。

1. 固定大小下的系统 malloc/free 测试

BenchmarkMallocFixed() 用多个线程反复执行固定大小 16B 的 malloc/free。它分别统计:

  • alloc 阶段耗时
  • free 阶段耗时
  • 总耗时

2. 固定大小下的内存池测试

        与之对应的是 BenchmarkConcurrentMallocFixed(),测试对象换成了 ConcurrentAlloc(16)ConcurrentFree(ptr)

这样一来,固定大小的小对象测试就可以非常直观地比较:

  • 系统分配器在高频 16B 申请下的表现,固定大小测试可以验证热点路径性能
  • 自定义内存池在同样场景下的表现

3. 随机大小下的系统 malloc/free 测试

BenchmarkMallocRandom() 测试的是随机小对象,大小范围在 1 到 8192 字节之间。

4. 随机大小下的内存池测试

最后的 BenchmarkConcurrentMallocRandom() 用相同的大小分布去测试自定义内存池。

这样做的优点是:

  • 随机大小测试可以验证复杂情况下的整体表现        
#include "Test.h"

// ============================================================
// 性能测试 1:系统 malloc/free 基准测试
// 说明:
// 1. 这里只比较性能,不做 correctness 深挖
// 2. 统计 alloc/free 以及总耗时
// ============================================================
void BenchmarkMallocFixed(size_t ntimes, size_t nworks, size_t rounds)
{
    PrintTitle("性能测试 1:系统 malloc/free(固定大小 16B)");

    vector<thread> vthread(nworks);
    atomic<size_t> malloc_costtime(0);
    atomic<size_t> free_costtime(0);

    for (size_t k = 0; k < nworks; ++k)
    {
        vthread[k] = thread([&, k]() {
            vector<void*> v;
            v.reserve(ntimes);

            for (size_t j = 0; j < rounds; ++j)
            {
                v.clear();

                size_t begin1 = clock();
                for (size_t i = 0; i < ntimes; ++i)
                {
                    v.push_back(malloc(16));
                }
                size_t end1 = clock();

                size_t begin2 = clock();
                for (void* p : v)
                {
                    free(p);
                }
                size_t end2 = clock();

                malloc_costtime += (end1 - begin1);
                free_costtime += (end2 - begin2);
            }
            });
    }

    for (auto& t : vthread)
    {
        t.join();
    }

    printf("[malloc fixed] threads=%zu rounds=%zu ntimes=%zu\n",
        nworks, rounds, ntimes);
    printf("[malloc fixed] alloc cost: %zu ticks\n", malloc_costtime.load());
    printf("[malloc fixed] free  cost: %zu ticks\n", free_costtime.load());
    printf("[malloc fixed] total cost: %zu ticks\n",
        malloc_costtime.load() + free_costtime.load());
}

// ============================================================
// 性能测试 2:内存池 ConcurrentAlloc/ConcurrentFree 固定大小基准
// ============================================================
void BenchmarkConcurrentMallocFixed(size_t ntimes, size_t nworks, size_t rounds)
{
    PrintTitle("性能测试 2:内存池 ConcurrentAlloc/ConcurrentFree(固定大小 16B)");

    vector<thread> vthread(nworks);
    atomic<size_t> malloc_costtime(0);
    atomic<size_t> free_costtime(0);

    for (size_t k = 0; k < nworks; ++k)
    {
        vthread[k] = thread([&, k]() {
            vector<void*> v;
            v.reserve(ntimes);

            for (size_t j = 0; j < rounds; ++j)
            {
                v.clear();

                size_t begin1 = clock();
                for (size_t i = 0; i < ntimes; ++i)
                {
                    v.push_back(ConcurrentAlloc(16));
                }
                size_t end1 = clock();

                size_t begin2 = clock();
                for (void* p : v)
                {
                    ConcurrentFree(p);
                }
                size_t end2 = clock();

                malloc_costtime += (end1 - begin1);
                free_costtime += (end2 - begin2);
            }
            });
    }

    for (auto& t : vthread)
    {
        t.join();
    }

    printf("[pool fixed] threads=%zu rounds=%zu ntimes=%zu\n",
        nworks, rounds, ntimes);
    printf("[pool fixed] alloc cost: %zu ticks\n", malloc_costtime.load());
    printf("[pool fixed] free  cost: %zu ticks\n", free_costtime.load());
    printf("[pool fixed] total cost: %zu ticks\n",
        malloc_costtime.load() + free_costtime.load());
}

// ============================================================
// 性能测试 3:系统 malloc/free 随机小对象基准
// ============================================================
void BenchmarkMallocRandom(size_t ntimes, size_t nworks, size_t rounds)
{
    PrintTitle("性能测试 3:系统 malloc/free(随机小对象 1~8192B)");

    vector<thread> vthread(nworks);
    atomic<size_t> malloc_costtime(0);
    atomic<size_t> free_costtime(0);

    for (size_t k = 0; k < nworks; ++k)
    {
        vthread[k] = thread([&, k]() {
            vector<void*> v;
            v.reserve(ntimes);

            for (size_t j = 0; j < rounds; ++j)
            {
                v.clear();

                size_t begin1 = clock();
                for (size_t i = 0; i < ntimes; ++i)
                {
                    v.push_back(malloc((16 + i) % 8192 + 1));
                }
                size_t end1 = clock();

                size_t begin2 = clock();
                for (void* p : v)
                {
                    free(p);
                }
                size_t end2 = clock();

                malloc_costtime += (end1 - begin1);
                free_costtime += (end2 - begin2);
            }
            });
    }

    for (auto& t : vthread)
    {
        t.join();
    }

    printf("[malloc random] threads=%zu rounds=%zu ntimes=%zu\n",
        nworks, rounds, ntimes);
    printf("[malloc random] alloc cost: %zu ticks\n", malloc_costtime.load());
    printf("[malloc random] free  cost: %zu ticks\n", free_costtime.load());
    printf("[malloc random] total cost: %zu ticks\n",
        malloc_costtime.load() + free_costtime.load());
}

// ============================================================
// 性能测试 4:内存池随机小对象基准
// ============================================================
void BenchmarkConcurrentMallocRandom(size_t ntimes, size_t nworks, size_t rounds)
{
    PrintTitle("性能测试 4:内存池 ConcurrentAlloc/ConcurrentFree(随机小对象 1~8192B)");

    vector<thread> vthread(nworks);
    atomic<size_t> malloc_costtime(0);
    atomic<size_t> free_costtime(0);

    for (size_t k = 0; k < nworks; ++k)
    {
        vthread[k] = thread([&, k]() {
            vector<void*> v;
            v.reserve(ntimes);

            for (size_t j = 0; j < rounds; ++j)
            {
                v.clear();

                size_t begin1 = clock();
                for (size_t i = 0; i < ntimes; ++i)
                {
                    v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
                }
                size_t end1 = clock();

                size_t begin2 = clock();
                for (void* p : v)
                {
                    ConcurrentFree(p);
                }
                size_t end2 = clock();

                malloc_costtime += (end1 - begin1);
                free_costtime += (end2 - begin2);
            }
            });
    }

    for (auto& t : vthread)
    {
        t.join();
    }

    printf("[pool random] threads=%zu rounds=%zu ntimes=%zu\n",
        nworks, rounds, ntimes);
    printf("[pool random] alloc cost: %zu ticks\n", malloc_costtime.load());
    printf("[pool random] free  cost: %zu ticks\n", free_costtime.load());
    printf("[pool random] total cost: %zu ticks\n",
        malloc_costtime.load() + free_costtime.load());
}

七、进阶优化:关键功能升级与优化

        在 Visual Studio 的性能分析中,在Deallocate函数中,调用ListTooLong函数时消耗的时间是最多的、在ListTooLong函数中,调用ReleaseListToSpans函数时消耗的时间是最多的、在ListTooLong函数中,调用ReleaseListToSpans函数时消耗的时间是最多的,我们发现 MapObjectToSpan 这个函数占用了较多的执行时间。

        进一步分析后发现,主要问题出在 unique_lock 上:由于它会对共享资源加锁,不同线程在访问时容易发生等待和阻塞,从而带来较大的时间开销。为了解决这个问题,我们尝试替换这种加锁方式,在读取操作中采用无锁访问的方法。而在这一改进过程中,就引出了本文要介绍的核心数据结构——基数树

        在三层缓存结构(ThreadCache -> CentralCache -> PageCache)搭起来之后,内存池已经具备了基本可用性:小对象优先走线程缓存,大对象直接按页分配;中央缓存负责批量切分与回收;页缓存负责 Span 的申请、拆分与合并。整个架构已经形成闭环。
        但真正进入“进阶优化”阶段后,我发现项目性能瓶颈并不只在“分配”本身,还出现在一个非常核心、但很容易被忽视的环节:对象地址反查所属 Span

1、“地址反查 Span”成为关键瓶颈

        在 ConcurrentFree 中,释放任意对象前,都会先通过 PageCache::MapObjectToSpan(ptr) 找到该对象所在的 Span,再根据 span->_objsize 判断它是小对象还是大对象,进而走不同释放路径。
        同样,在 CentralCache::ReleaseListToSpans 中,ThreadCache 把一串对象归还回来时,也需要对链表中的每一个对象执行一次“对象地址 -> Span”映射,才能把它重新挂回对应 Span 的 freeList,并维护 _useCount

这意味着:

  • 每次 free 基本都要查一次 Span
  • CentralCache 批量归还时会频繁查 Span
  • PageCache 做前后页合并时也依赖页号映射表

2、 基数树引入:把页号映射从“哈希查找”升级为“分层寻址”

PageCache 中引入了三层基数树结构:

TCMalloc_PageMap3<35> _idSpanMap;

        之所以选择三层基数树,而不是一层大数组或两层结构,原因主要是三层结构是空间和效率之间的折中。如果直接按所有可能页号开一张大数组,访问虽然最快,但空间浪费会非常严重。如果层数太少,单层节点又可能过大,不够灵活。
        而三层结构的优点就在于:

  • 层数不多,查询仍然是固定常数级
  • 节点按需创建,不会一次性占满整片空间
  • 兼顾查找效率和空间利用率

3、三层基数树在当前项目中的具体落点

入基数树之后,它并不是孤立存在的一份代码,而是已经真正接入到了 PageCache 的核心工作流中。

1)申请新 Span 时建立页号映射

PageCache::NewSpan() 中,无论是直接向系统申请的大 Span,还是从更大 Span 中切分出来的小 Span,代码都会对 Span 所覆盖的页号区间建立映射关系。

  • 系统新分配大块页内存后,要把整段页号登记到 _idSpanMap:
bool ok = _idSpanMap.SetRange(bigSpan->_pageId, bigSpan->_npage, bigSpan);
assert(ok);
  • 拆分出 kSpan 和剩余 nSpan 之后,也都会重新登记映射:
bool ok1 = _idSpanMap.SetRange(nSpan->_pageId, nSpan->_npage, nSpan);
bool ok2 = _idSpanMap.SetRange(kSpan->_pageId, kSpan->_npage, kSpan);
assert(ok1 && ok2);

2)对象释放时通过页号快速反查 Span

ConcurrentFree() 中,系统释放一个对象时,首先会调用:

Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);

MapObjectToSpan() 内部会先将对象地址转换成页号,再通过 _idSpanMap.get(pageId) 查询对应 Span。

3)CentralCache 批量回收时频繁受益

        在 CentralCache::ReleaseListToSpans() 中,ThreadCache 归还给 CentralCache 的通常不是单个对象,而是一整段对象链表。代码会一边遍历链表,一边不断调用 MapObjectToSpan(start),把对象挂回其所属 Span 的 freeList 中,并更新 _useCount

while (start)
{
	void* next = NextObj(start);
	// 找到对象对应的 Span (通过对象地址找到页号,再通过页号找到 Span)
	Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
	assert(span);
	// 头插——把对象放回 Span 的 freeList 中
	NextObj(start) = span->_freeList;
	span->_freeList = start;

	span->_useCount--; // 引用计数减 1

        当 _useCount == 0 时,整个 Span 会进一步归还给 PageCache

if (span->_useCount == 0)
{
	_spanlists[index].Erase(span); // 从当前桶中移除该 Span
    span->_freeList = nullptr;
    span->_prev = nullptr;
    span->_next = nullptr;
	_spanlists[index]._mtx.unlock(); // 释放桶锁
	PageCache::GetInstance()->_pagemtx.lock(); // 加 PageCache 锁
	PageCache::GetInstance()->ReleaseSpanToPageCache(span); // 归还给 PageCache
	PageCache::GetInstance()->_pagemtx.unlock(); // 解 PageCache 锁
	_spanlists[index]._mtx.lock(); // 重新加桶锁
}

引入三层基数树之后,这部分优化非常直接:

  • 查询路径更短
  • 查找成本更稳定
  • 批量回收时整体开销更可控
Span* PageCache::MapObjectToSpan(void* obj)
{
    // 定位页号
	PAGE_ID pageId = (PAGE_ID)obj >> PAGE_SHIFT;// 除于8k

    std::unique_lock<std::mutex> lock(_pagemtx);

	//auto it = _pageid_span_map.find(pageId);// 查找页号对应的Span;find返回一个迭代器
 //   if(it!= _pageid_span_map.end())
 //   {
 //       return it->second;// 返回对应的Span
	//}
 //   else
 //   {
	//	return nullptr;// 未找到对应的Span,返回空指针
 //   }
    return _idSpanMap.get(pageId);
}
 // 向前合并
 while (true)
 {
     PAGE_ID prevId = span->_pageId - 1; // 谁是我的前一页?

     // 1.1 检查是否存在:前一页属于某个 Span 吗?
     // 如果前一页没映射,说明前面没有被管理的内存了,停止合并
     /*auto ret = _pageid_span_map.find(prevId);*/
     Span* prevSpan = _idSpanMap.get(prevId);
     /*if (ret == _pageid_span_map.end())
     {
         break;
     }*/
     if (prevSpan == nullptr)
     {
         break;
     }

     // 1.2 检查状态:前一个 Span 是空闲的吗?
     //Span* prevSpan = ret->second;
     //if (prevSpan->_isUse == true) // 如果邻居正在被使用,不能抢,停止
     //{
     //    break;
     //}
     if (prevSpan->_isUse == true)
     {
         break;
     }

     // 1. 检查大小:合并后会不会超过 PageCache 能管理的最大限制(比如128页)?
     // 如果合并太大,桶装不下,管理起来反而麻烦,就不合并了
     if (prevSpan->_npage + span->_npage > PAGES_NUM - 1)
     {
         break;
     }

     // --- 开始合并 ---
     span->_pageId = prevSpan->_pageId; // 我的起始页号变成邻居的(因为他在前面)
     span->_npage += prevSpan->_npage;          // 我的大小加上邻居的大小

     // 邻居被我吞并了,把他从原来的哈希桶里删掉,并释放它的结构
//     for (PAGE_ID i = 0; i < span->_npage; i++)
//     {
//_pageid_span_map[span->_pageId + i] = span;// 更新合并后Span的页号映射
//     }
     bool ok = _idSpanMap.SetRange(span->_pageId, span->_npage, span);
     assert(ok);
     _spanlists[prevSpan->_npage].Erase(prevSpan);
     _spanPool.Delete(prevSpan);
 }

4)PageCache 合并 Span 时同样依赖它

ReleaseSpanToPageCache() 中,系统会尝试向前、向后合并相邻的空闲 Span。
而合并的前提,就是先根据相邻页号查询前一个 Span 和后一个 Span:

  • 前向查 prevId = span->_pageId - 1
  • 后向查 nextId = span->_pageId + span->_npage

然后借助 _idSpanMap.get(...) 取出相邻 Span,再判断是否空闲、是否可合并。

PAGE_ID prevId = span->_pageId - 1;
Span* prevSpan = _idSpanMap.get(prevId);

PAGE_ID nextId = span->_pageId + span->_npage;
Span* nextSpan = _idSpanMap.get(nextId);

三层基数树的结构设计

#pragma once
#include "Common.h"
#include <cstring>
template <int BITS>
class TCMalloc_PageMap3
{
private:
    // 将 BITS 大致均分为 3 层
    static const int INTERIOR_BITS = (BITS + 2) / 3;   // 向上取整
    static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;

    static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
    static const int LEAF_LENGTH = 1 << LEAF_BITS;  

    struct Node
    {
        Node* ptrs[INTERIOR_LENGTH];
    };

    struct Leaf
    {
        Span* values[LEAF_LENGTH];
    };

private:
    Node* _root = nullptr;

private:
    static Node* NewNode()
    {
        Node* n = (Node*)SystemAlloc((sizeof(Node) + ((1 << PAGE_SHIFT) - 1)) >> PAGE_SHIFT);
        if (n)
        {
            memset(n, 0, sizeof(Node));
        }
        return n;
    }

    static Leaf* NewLeaf()
    {
        Leaf* leaf = (Leaf*)SystemAlloc((sizeof(Leaf) + ((1 << PAGE_SHIFT) - 1)) >> PAGE_SHIFT);
        if (leaf)
        {
            memset(leaf, 0, sizeof(Leaf));
        }
        return leaf;
    }

public:
    typedef uintptr_t Number;

    TCMalloc_PageMap3()
    {
        _root = NewNode();
        if (_root == nullptr)
        {
            throw std::bad_alloc();
        }
    }

    // 查询 pageId 对应的 Span*
    Span* get(Number k) const
    {
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);// 最高层索引,取前面
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);// 中间层索引
		const Number i3 = k & (LEAF_LENGTH - 1);// 叶子层索引

        if ((k >> BITS) != 0 || _root == nullptr)
        {
            return nullptr;
        }

        if (_root->ptrs[i1] == nullptr || _root->ptrs[i1]->ptrs[i2] == nullptr)
        {
            return nullptr;
        }

        Leaf* leaf = reinterpret_cast<Leaf*>(_root->ptrs[i1]->ptrs[i2]);
        return leaf->values[i3];
    }

    // 设置 pageId -> Span*
    // 要求调用前已经 Ensure 过
    void set(Number k, Span* v)
    {
        assert((k >> BITS) == 0);

        const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
        const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
        const Number i3 = k & (LEAF_LENGTH - 1);

        Leaf* leaf = reinterpret_cast<Leaf*>(_root->ptrs[i1]->ptrs[i2]);
        assert(leaf != nullptr);

        leaf->values[i3] = v;
    }

    // 确保 [start, start+n) 覆盖到的中间节点 / 叶子节点都已创建
    bool Ensure(Number start, size_t n)
    {
        if (n == 0)
            return true;

        Number end = start + n - 1;
        if (end < start) // 溢出保护
            return false;

        for (Number key = start; key <= end;)
        {
			const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);// 最高层索引,取前面
			const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);// 中间层索引

            if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
            {
                return false;
            }

            // 二级节点不存在则创建
            if (_root->ptrs[i1] == nullptr)
            {
                Node* n2 = NewNode();
                if (n2 == nullptr)
                    return false;
                _root->ptrs[i1] = n2;
            }

            // 叶子不存在则创建
            if (_root->ptrs[i1]->ptrs[i2] == nullptr)
            {
                Leaf* leaf = NewLeaf();
                if (leaf == nullptr)
                    return false;

                _root->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
            }

            // 直接跳到下一个 leaf 覆盖范围
            Number next = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
            if (next == 0 || next <= key)
                break;

            key = next;
        }

        return true;
    }

    // 批量设置 [start, start+n) -> span
    bool SetRange(Number start, size_t n, Span* span)
    {
        if (!Ensure(start, n))
            return false;

        for (size_t i = 0; i < n; ++i)
        {
            set(start + i, span);
        }
        return true;
    }
};

八、项目收尾:源码与扩展

https://gitee.com/yes-qingxin/concurrent-memory-pool.git

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐