项目:C++高并发内存池
目录
4.以⻚为单位的⼤内存管理span的定义及spanlist定义
4.windows和Linux下如何直接向堆申请⻚为单位的⼤块内存:
八.多线程并发环境下,对比malloc和ConcurrentAlloc申请和释放内存效率对比
一.项⽬介绍
1.这个项⽬做的是什么?
当前项⽬是实现⼀个⾼并发的内存池,他的原型是google的⼀个开源项⽬tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了⾼效的多线程内存管理,⽤于替代系统的内存分配相关的函数(malloc、free)。这个项⽬是把tcmalloc最核⼼的框架简化后拿出来,模拟实现出⼀个⾃⼰的⾼并发内存池,⽬的就是学习tcamlloc的精华,这种⽅式有点类似我们之前学习STL容器的⽅式。但是相⽐STL容器部分,tcmalloc的代码量和复杂度上升了很多。
2.这个项⽬的所需要的技术栈?
这个项⽬会⽤到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等等⽅⾯的知识。
二.什么是内存池
1.池化技术
所谓“池化技术”,就是程序先向系统申请过量的资源,然后⾃⼰管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较⼤的开销,不如提前申请好了,这样使⽤时就会变得⾮常快捷,⼤ 提⾼程序运⾏效率。在计算机中,有很多使⽤“池”这种技术的地⽅,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若⼲数量的线程,让它们处于睡眠状态,当接收到客⼾端的请求时,唤醒池中某个睡眠的线程,让它来处理客⼾端的请求,当处理完这个请求,线程⼜进⼊睡眠状态。
2.内存池
内存池是指程序预先从操作系统申请⼀块⾜够⼤内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,⽽是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,⽽是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
3.内存池主要解决的问题
1.效率问题:不用每次都向操作系统申请内存,浪费时间
2.内存碎片化问题
内碎片:由对齐规则产生,等于实际申请大小与对齐后块大小之间的差值
外碎片:一些空间连续的内存太小,合计内存满足,但是不连续,所有不满足一些内存分配申请
4.malloc
C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的,⽽malloc就是⼀个内存池。malloc() 相当于向操作系统“批发”了⼀块较⼤的内存空间,然后“零售”给程序⽤。当全部“售完”或程序有⼤量的内存需求时,再根据实际需求向操作系统“进货”。malloc的实现⽅式有很多种,⼀般不同编译器平台⽤的都是不同的。⽐如windows的vs系列⽤的微软⾃⼰写的⼀套,linux gcc⽤的glibc中的ptmalloc。下⾯有⼏篇关于这块的⽂章,⼤概可以去简单看看了解⼀下,关于ptmalloc,有兴趣⼤家可以去看看他的实现细节。
三.开胃菜--先设计⼀个定长的内存池
作为程序员(C/C++)我们知道申请内存使⽤的是malloc,malloc其实就是⼀个通⽤的⼤众货,什么场景下都可以⽤,但是什么场景下都可以⽤就意味着什么场景下都不会有很⾼的性能,下⾯我们就先来设计⼀个定⻓内存池做个开胃菜,当然这个定⻓内存池在我们后⾯的⾼并发内存池中也是有价值的,所以学习他⽬的有两层,先熟悉⼀下简单内存池是如何控制的,第⼆他会作为我们后⾯内存池的⼀个基础组件。![]()
windows和Linux下如何直接向堆申请⻚为单位的⼤块内存 :ObjectPool.h#pragma once
#include "Common.h"// 直接去堆上按⻚申请空间
inline static void* SystemAlloc(size_t kpage) //kpage表示页数
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage * (1 << 13), MEM_COMMIT | MEM_RESERVE,PAGE_READWRITE); //参数1:0代表系统随机分配内存,参数2:页数*一页(8k),参数3:MEM_RESERVE:预留一段虚拟地址空间(不占用物理内存)
MEM_COMMIT:为预留的地址空间分配物理内存(并映射到虚拟地址)参数 4:PAGE_READWRITE:设置分配的内存页权限为 “可读可写”;#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();//分配失败时抛出标准异常
return ptr;
}
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
VirtualFree(ptr, 0, MEM_RELEASE);//参数 1:ptr:要释放的内存起始地址,参数 2:0:配合MEM_RELEASE使用时,必须设为 0,表示释放整个预留 / 提交的内存块,参数 3:MEM_RELEASE:释放内存块(同时解除物理内存映射和虚拟地址预留)#else
// sbrk unmmap等
#endif
}template<class T>//类型模板参数
class ObjectPool
{
public:
T* New()
{
T* obj = nullptr;//优先考虑还回来的内存块对象再次重复利用
if (_freelist)
{
void* next = *(void**)_freelist;//_freelist的下一个节点
obj = (T*)_freelist;//取头
_freelist = next;//next为头
}
else
{
//剩余内存不够一个对象大小时,则重新开大块空间
if (_remainBytes < sizeof(T))
{
_remainBytes = 128 * 1024;//128KB
//_memory = (char*)malloc(_remainBytes);
_memory = (char*)SystemAlloc(_remainBytes >> 13);//16页
if (_memory == nullptr)
{
throw std::bad_alloc();//表示内存分配失败
}
}obj = (T*)_memory;
size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);//强制切分出至少指针大小的块,保证释放时能连成链表
_memory += objSize;//起始地址后移
_remainBytes -= objSize;//剩余内存减去用了的
}//定位new,显示调用T的构造函数初始化
new(obj)T;//定位new,对已分配内存的未初始化的指针进行初始化return obj;//返回指向申请内存的指针
}
void Delete(T* obj)
{
//显示调用析构函数
obj->~T();//仅调用析构函数清理资源,不释放内存//头插
*(void**)obj = (T*)_freelist;//obj指向旧的第一个节点
_freelist = obj;//obj充当新的第一个节点
}
private:
char* _memory;//指向大块内存的指针
size_t _remainBytes = 0;//大块内存在切分过程中剩余字节数void* _freelist;//还回来过程中链接的自由链表的头指针
};
四.高并发内存池整体框架设计
项目源码:项目代码: 关于项目的代码
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本⾝其实已经很优秀,那么我们项⽬的原型tcmalloc就是在多线程⾼并发的场景下更胜⼀筹,所以这次我们实现的内存池需要考虑以下⼏⽅⾯的问题。1. 性能问题。2. 多线程环境下,锁竞争问题。3. 内存碎⽚问题。concurrent memory pool主要由以下3个部分构成:1. thread cache:线程缓存是每个线程独有的,⽤于⼩于256KB的内存的分配,线程从这⾥申请内存不需要加锁,每个线程独享⼀个cache,这也就是这个并发线程池⾼效的地⽅。2. central cache:中⼼缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免⼀个线程占⽤了太多的内存,⽽其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的⽬的。central cache是存在竞争的,所以从这⾥取内存对象是需要加锁,⾸先这⾥⽤的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这⾥竞争不会很激烈。3. page cache:⻚缓存是在central cache缓存上⾯的⼀层缓存,存储的内存是以⻚为单位存储及分配的,central cache没有内存对象时,从page cache分配出⼀定数量的page,并切割成定⻓⼤⼩的⼩块内存,分配给central cache。当⼀个span的⼏个跨度⻚的对象都回收以后,page cache会回收central cache满⾜条件的span对象,并且合并相邻的⻚,组成更⼤的⻚,缓解内存碎⽚的问题。![]()
五.thread cache
thread cache是哈希桶结构,每个桶是⼀个按桶位置映射⼤⼩的内存块对象的⾃由链表。每个线程都会有⼀个thread cache对象,这样每个线程在这⾥获取对象和释放对象时是⽆锁的。![]()
1.申请内存:
1. 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶⾃由链表下标i。2. 如果⾃由链表_freeLists[i]中有对象,则直接Pop⼀个内存对象返回。3. 如果_freeLists[i]中没有对象时,则批量从central cache中获取⼀定数量的对象,插⼊到⾃由链表并返回⼀个对象。
2.释放内存:
1. 当释放内存⼩于256k时将内存释放回thread cache,计算size映射⾃由链表桶位置i,将对象Push到_freeLists[i]。2. 当链表的⻓度过⻓,则回收⼀部分内存对象到central cache。
3.TLS--thread local storage:
(18 封私信) Thread Local Storage(线程局部存储)TLS - 知乎
ConcurrentAlloc.h
#pragma once
#include "ThreadCache.h"
#include "PageCache.h"
#include "Common.h"
#include "ObjectPool.h"//ConcurrentAlloc 是线程安全的入口层,
// 它利用 TLS 为每个线程创建并绑定专属的 ThreadCache,
// 保证了多线程下内存分配的无锁、高效、安全;static void* ConcurrentAlloc(size_t size)
{
if (size > MAX_BYTES)
{
size_t alignsize = SizeClass::RoundUp(size);
size_t kpage = alignsize >> PAGE_SHIFT;PageCache::GetInstance()->_pagemtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(kpage);
span->_objsize = size;
PageCache::GetInstance()->_pagemtx.unlock();void* ptr = (void*)(span->_page << PAGE_SHIFT);//页号 → 物理地址 的转换
return ptr;
}
else
{
//通过TLS每个线程无锁的获取自己的专属的ThreadCache对象//检查当前线程的本地 ThreadCache 指针是否为空
if (pTLSThreadCache == nullptr)
{
static ObjectPool<ThreadCache> tcpool;
//如果为空:说明当前线程是第一次申请内存,需要创建专属的对象,并赋值给这个 TLS 指针;
//pTLSThreadCache = new ThreadCache;//创建 ThreadCache 对象
pTLSThreadCache = tcpool.New();
}//std::cout << std::this_thread::get_id() << ":" << pTLSThreadCache << std::endl;
return pTLSThreadCache->Allocate(size);
}
}static void ConcurrentFree(void* ptr)
{
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);//通过地址拿到对应的span
size_t size = span->_objsize;if (size > MAX_BYTES)
{
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);//通过地址拿到对应的spanPageCache::GetInstance()->_pagemtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pagemtx.unlock();
}
else
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
}
4.thread cache代码框架:
首先我们先明白在ThreadCache中的一些名词含义。
1. 数组下标(全局桶下标)
- 含义:内存池核心管理数组(存储所有自由链表头指针)的索引值,是整个内存池范围内的全局唯一标识;
- 作用:通过下标可直接定位到对应桶,范围从 0 开始(如 0、1、20、72 等);
- 举例:下标 0 对应 8B 桶,下标 20 对应 208B 桶,下标 72 对应 1152B 桶。
2. 桶
- 含义:数组下标对应的 “逻辑单元”,是 “固定大小内存块的专属管理单元”;
- 核心特征:一个桶唯一对应一种固定大小的内存块,且仅管理该大小的内存块;
- 本质:桶本身不是实体,是对 “数组下标 + 对应链表” 的统称(如下标 0=8B 桶,下标 20=208B 桶)。
3. 对齐粒度
- 含义:内存大小向上取整的 “最小步长”,是按区间划分的统一规则;
- 作用:将任意申请的内存大小规整为桶能管理的固定大小,避免碎片化;
- 区间规则:
- [1,128B]:对齐粒度 8B;
- (128,1024B]:对齐粒度 16B;
- (1024,8192B]:对齐粒度 128B;
- 以此类推。
4. 链表(自由链表 / 空闲链表)
- 含义:每个桶绑定的单向链表,节点是该桶对应大小的空闲内存块;
- 作用:管理同大小的空闲内存块,实现高效分配 / 释放(分配时头删、释放时头插);
- 核心特征:一个桶对应且仅对应一个链表,链表中所有节点的内存块大小完全一致。
关键关联逻辑
通过对齐粒度将任意申请的内存大小规整为固定值 → 根据固定值计算出桶的下标(数组下标) → 定位到唯一的桶 → 该桶通过专属的链表管理所有同大小的空闲内存块。
ThreadCache.h
#pragma once
#include "Common.h"
//ThreadCache本质是由一个哈希映射的对象自由链表构成
//ThreadCache 是线程级内存缓存的外壳,_freelist 是它的内脏——_freelist
//数组把不同大小的内存块分级管理,ThreadCache 用这套结构实现了无锁、低碎片的内存分配服务。
class ThreadCache
{
public:
//申请和释放空间
void* Allocate(size_t size);
void Deallocate(void* ptr, size_t size);
//从中心缓存获取对象
void* FetchFromCentralCache(size_t index, size_t size);// 释放对象时,链表过⻓时,回收内存回到中⼼缓存
void ListTooLong(FreeList& list, size_t size);
private:
FreeList _freelist[NFREE_LIST];//自由链表数组定义,数组的每个元素都是一个 FreeList 对象,对应一个哈希桶
};//TLS thread local storage线程本地存储
//为每一个线程创建独立的 pTLSThreadCache 指针副本,每个线程只能访问自己的副本,线程间互不干扰。
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;//定义线程本地变量
ThreadCache.cpp
#define _CRT_SECURE_NO_WARNINGS 1
#include "ThreadCache.h"
#include "CentralCache.h"void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
//慢开始反馈调节算法
//1.最开始不会向centralcache一次批量要太多,可能用不完
//2.如果不断有这个size大小内存的需求,那么就会不断增长直到上限
//3.size越大,一次向Central Cache要的batchNum就越小
//4.size越小,一次向Central Cache要的batchNum就越大
size_t batchNum=min(_freelist[index].MaxSize(),SizeClass::NumMoveSize(size));//计算一次拿取size大小个数
if (_freelist[index].MaxSize() == batchNum)
{
_freelist[index].MaxSize() += 1;
}
void* start = nullptr;
void* end = nullptr;
size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start,end,batchNum,size);//实际获取的数量
assert(actualNum >= 1);//保证至少有一个if (actualNum == 1)
{
assert(start == end);
return start;//直接返回地址给ptr
}
else
{
_freelist[index].PushRange(NextObj(start), end, actualNum - 1);//把从第2个开始的内存块插入到链表
return start;//直接返回第一个内存块地址给ptr
}
}void* ThreadCache::Allocate(size_t size)
{
//空间申请
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);//计算实际所分配的大小
size_t index = SizeClass::Index(size);//计算是桶的全局下标//判断此下标桶是否为空
if (!_freelist[index].Empty())
{
//不为空,则Pop出去
return _freelist[index].Pop();
}
else
{
//为空,没有内存块,向中心缓存获取
return FetchFromCentralCache(index, alignSize);
}
}void ThreadCache::Deallocate(void* ptr, size_t size)
{
//空间释放
assert(size <= MAX_BYTES);
assert(ptr);//找出映射的自由链表桶,对象插入进入
size_t index = SizeClass::Index(size);
_freelist[index].Push(ptr);//当链表长度大于一次批量申请的内存时就开始还一段list给centralcache
if (_freelist[index].Size() > _freelist[index].MaxSize())
{
ThreadCache::ListTooLong(_freelist[index], size);
}
}void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
void* start = nullptr;
void* end = nullptr;
list.PopRange(start, end, list.MaxSize());CentralCache::GetInstance()->ReleaseListToSpans(start,size);
}
5.自由链表的哈希桶跟对象大小的映射关系
Common.h
#pragma once
#include <iostream>
#include <vector>
#include <unordered_map>
#include <map>
#include <algorithm>
#include <new>
#include <thread>
#include <mutex>
#include <assert.h>
#include <time.h>using std::cout;
using std::endl;static const size_t MAX_BYTES = 256 * 1024;//桶的内存256KB
static const size_t NFREE_LIST = 208;//桶的数量
#ifdef _WIN64
typedef unsigned long long PAGE_ID;//_WIN64下面_WIN64和_WIN32都会被定义
#elif _WIN32
typedef size_t PAGE_ID;//_WIN32下面只会定义_WIN32
#endif#ifdef _WIN32
#include <Windows.h>
#else
//linux
#endif//&是引用,且是 void* 类型的引用,核心作用是让函数返回值可以被赋值(作为左值)
static void*& NextObj(void* obj)
{
assert(obj != nullptr);
return *(void**)obj;
}//管理切分好的小对象的自由链表
//自由链表类---Thread Cache
class FreeList
{
public:
void Push(void* obj)
{
assert(obj);//头插
//*(void**)obj = _freelist;
NextObj(obj) = _freelist;
_freelist = obj;
++_size;
}
void PushRange(void* start, void* end, size_t n)
{//头插入多个
NextObj(end) = _freelist;
_freelist = start;
_size += n;
}
void PopRange(void*& start, void*& end, size_t n)
{//头删多个
assert(n <= _size);
start = _freelist;
end = start;
for (size_t i = 0; i < n - 1; i++)
{
if (end == nullptr)
{
break;
}
end = NextObj(end);
}
if(end==nullptr)
{
_freelist = nullptr;
}
else
{
_freelist = NextObj(end);
NextObj(end) = nullptr;
}
_size -= n;
}void* Pop()
{
assert(_freelist);//头删
void* obj = _freelist;
_freelist = NextObj(obj);
--_size;return obj;
}
bool Empty()
{
//判空
return _freelist == nullptr;
}size_t& MaxSize() { return _maxsize; }
size_t Size() { return _size; }
private:
void* _freelist = nullptr;
size_t _maxsize = 1;
size_t _size = 0;
};
class SizeClass
{
// 整体控制在最多10%左右的内碎⽚浪费
// 内存大小区间 (字节) 对齐粒度 对应 freelist 下标区间 桶数量
// [1,128] 8byte对⻬ freelist[0,16) 16
// [128+1,1024] 16byte对⻬ freelist[16,72) 56
// [1024+1,81024] 128byte对⻬ freelist[72,128) 56
// [8*1024+1,641024] 1024byte对⻬ freelist[128,184) 56
// [64*1024+1,256*1024] 8*1024byte对⻬ freelist[184,208) 24//size_t _RoundUp(size_t size, size_t alignNum)//普通方法
//{
// size_t alignSize;
// if (size % alignNum != 0)
// {
// alignSize = (size / alignNum + 1) * alignNum;
// //计算对齐数 size为9,需要分配16, (9/8 + 1)*8=16
// }
// else
// {
// alignSize = size;
// }
// return alignSize;
//}public:
//计算对象大小的对齐映射规则
static inline size_t _RoundUp(size_t bytes, size_t align)//性能方法
{
return (((bytes)+align - 1) & ~(align - 1));
}static inline size_t RoundUp(size_t size)
{
if (size <= 128)
{
return _RoundUp(size, 8);//8字节对齐
}
else if (size <= 1024)
{
return _RoundUp(size, 16);//16字节对齐
}
else if (size <= 8 * 1024)
{
return _RoundUp(size, 128);//128字节对齐
}
else if (size <= 64 * 1024)
{
return _RoundUp(size, 1024);//1024字节对齐
}
else if (size <= 256 * 1024)
{
return _RoundUp(size, 8 * 1024);//8*1024字节对齐
}
else
{
return _RoundUp(size, 1 << PAGE_SHIFT);//以页为单位对齐
}
}// 计算映射的哪一个自由链表桶
//size_t _Index(size_t bytes,size_t alignNum)//普通方法
//{
// if (bytes % alignNum == 0)
// {
// return bytes / alignNum - 1;// 8/8-1=0
// }
// else
// {
// return bytes / alignNum;//9/8=1
// }
//}
static inline size_t _Index(size_t bytes, size_t align_shift)//性能方法
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
//以8为例, (8+1<<3-1) >> 3 -1=(8+7)>>3-1=0;
}
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);// 每个区间有多少个链(16: [1,128]区间桶数;56: 后续三个区间桶数)
static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128)
{
return _Index(bytes, 3);//这里的3是2^3=8
}
else if (bytes <= 1024)
{
return _Index(bytes - 128, 4) + group_array[0];
}
else if (bytes <= 8192)
{
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
}
else if (bytes <= 64 * 1024)
{
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
}
else if (bytes <= 256 * 1024)
{
return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
}
else
{
assert(false);
}return -1;
}//// ⼀次从中⼼缓存获取多少个
static size_t NumMoveSize(size_t size)
{
assert(size > 0);// 核心公式:用最大可处理字节数 / 单个内存块大小 = 理论最大批量数
int num = MAX_BYTES / size;// 下限:至少批量拿2个(避免频繁向CentralCache申请,减少锁竞争)
if (num < 2)
num = 2;// 上限:最多批量拿512个(避免一次性拿太多,导致内存浪费/其他线程饥饿)
if (num > 512)
num = 512;return num;
}
};
六.central cache
central cache也是⼀个哈希桶结构,他的哈希桶的映射关系跟thread cache是⼀样的。不同的是他的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下⾯的span中的⼤内存块被按映射关系切成了⼀个个⼩内存块对象挂在span的⾃由链表中。![]()
1.申请内存:
1. 当thread cache中没有内存时,就会批量向central cache申请⼀些内存对象,这⾥的批量获取对象的数量使⽤了类似⽹络tcp协议拥塞控制的慢开始算法;central cache也有⼀个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不过这⾥使⽤的是⼀个桶锁,尽可能提⾼效率。2. central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请⼀个新的span对象,拿到span以后将span管理的内存按⼤⼩切好作为⾃由链表链接到⼀起。然后从span中取对象给thread cache。3. central cache的中挂的span中use_count记录分配了多少个对象出去,分配⼀个对象给thread cache,就++use_count
2.释放内存:
1. 当thread_cache过⻓或者线程销毁,则会将内存释放回central cache中的,释放回来时--use_count。当use_count减到0时则表⽰所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲⻚进⾏合并。
3.central cache代码框架:
CentralCache.h
#pragma once
#include "Common.h"
////CentralCache 使用单例模式的核心逻辑:
//唯一性:保证内存调度中心全局唯一,避免内存碎片化和分配规则混乱;
//资源效率:避免核心数据结构重复初始化,节省内存和启动时间;
//同步简化:集中管理多线程锁机制,降低并发控制复杂度,契合内存池分层架构设计。//单例饿汉模式
class CentralCache
{
public:
//提供一个接口
static CentralCache* GetInstance()
{
return &_sInst;
}// 获取⼀个⾮空的span
Span* GetOneSpan(SpanList& list, size_t size);// 从中⼼缓存获取⼀定数量的对象给thread cache
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);// 将⼀定数量的对象释放到span跨度
void ReleaseListToSpans(void* start, size_t size);
private:
SpanList _spanlists[NFREE_LIST];//centralcache里面桶的数量
private:
//防止构造和拷贝构造
CentralCache()
{}
CentralCache(const CentralCache&) = delete;static CentralCache _sInst;
};
CentralCache.cpp
#define _CRT_SECURE_NO_WARNINGS 1
#include "CentralCache.h"
#include "PageCache.h"CentralCache CentralCache::_sInst;
// 获取⼀个⾮空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
//遍历查找当前span链表是否还有未分配对象的span
Span* it = list.Begin();//拿到该桶链表的头节点
while (it != list.End())
{
if (it->_freelist != nullptr)
{
return it;
}
else
{
it = it->_next;
}
}//先把centralcache的桶锁解掉,这样如果其他线程释放内存回来,不会阻塞
list._centralmtx.unlock();//走到这里说明没有空闲的span,只能找page cache要一个span
PageCache::GetInstance()->_pagemtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_isuse = true;//在使用
span->_objsize = size;
PageCache::GetInstance()->_pagemtx.unlock();//对获取的span进行切分,不需要加锁,因为这会其他线程访问不到这个span,还没有挂起
//计算span的大块内存在页内中页号的起始地址
char* start = (char*)(span->_page << PAGE_SHIFT);
//计算页号的内存字节数大小(字节数)
size_t bytes = span->_n << PAGE_SHIFT;
char* end = start + bytes;
//把大块内存切成自由链表链接起来,这里链接时尾插(内存连续)
//1.先切一块下来做头,方便尾插
span->_freelist = start;
start += size;//start后移一个节点
void* tail = span->_freelist;//tail和freelist指向最开始的startwhile (start < end)
{
//遍历连接新开辟的sapn
NextObj(tail) = start;//tail指向下一个节点
tail = start;//tail后移一个节点
start += size;//start后移一个节点
}
NextObj(tail) = nullptr;//遍历到最后一个节点置空//切好span以后,需要把span挂到桶里面去的时候,在加锁
list._centralmtx.lock();
list.PushFront(span);return span;
}size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
size_t index = SizeClass::Index(size);//计算桶的位置,也就是下标
_spanlists[index]._centralmtx.lock();//加锁
Span* span = CentralCache::GetOneSpan(_spanlists[index], size);
assert(span);
assert(span->_freelist);//从span中获取batchNum个对象,若不够则有多少拿多少
start = span->_freelist;
end = start;
size_t i = 0;
size_t actualNum = 1;//实际获取的数量
while(NextObj(end) != nullptr && i < batchNum - 1)//使用while来防止一个span不够的情况
{
end = NextObj(end);//end后移找到需要被拿取的内存块的位置
++i;
++actualNum;
}
span->_freelist = NextObj(end);//头指向end的下一个位置
NextObj(end) = nullptr;
span->_usecount += actualNum;_spanlists[index]._centralmtx.unlock();//解锁
return actualNum;
}void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
if (start == nullptr) return;
size_t index = SizeClass::Index(size);
_spanlists[index]._centralmtx.lock();
while (start)
{
void* next = NextObj(start);Span* span = PageCache::GetInstance()->MapObjectToSpan(start);//算出对应的sapn
NextObj(start) = span->_freelist;//头插
span->_freelist = start;
span->_usecount--;
if (span->_usecount == 0)
{
//说明span切分出的小块内存全部回收完,可以这个span就回收给pagecache
_spanlists[index].Erase(span);
span->_freelist = nullptr;
span->_next = nullptr;
span->_prev = nullptr;//解掉桶锁,释放span给pagecache,使用pagecache的锁
_spanlists[index]._centralmtx.unlock();PageCache::GetInstance()->_pagemtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pagemtx.unlock();
_spanlists[index]._centralmtx.lock();
}
start = next;
}_spanlists[index]._centralmtx.unlock();
}
4.以页为单位的大内存管理span的定义及spanlist定义
Common.h
class SizeClass
{static const size_t PAGE_SHIFT = 13;// ⻚⼤⼩转换偏移, 即⼀⻚定义为2^13,也就是8KB,8*1024byte
// 计算⼀次向PageCache⼏个⻚
// 单个对象 8byte num=512 npage=4096byte--1页
// ...
// 单个对象 256KB
static size_t NumMovePage(size_t size)
{
size_t num = NumMoveSize(size); //计算批量拿多少个size大小的块
size_t npage = num * size; //这批块的总字节数npage >>= PAGE_SHIFT;//转换为页
if (npage == 0)
npage = 1;
return npage;
}
};//管理多个连续页的大块内存跨度
struct Span
{
PAGE_ID _page = 0; //大块内存起始页的页号
size_t _n = 0; //页数,也就是数组下标Span* _next = nullptr; //双向页表的结构
Span* _prev = nullptr;size_t _objsize = 0; //切好的小对象的大小
size_t _usecount = 0; //切成小块内存的,被分配给ThreadCache的计数
void* _freelist = nullptr; //切好的小块内存的自由链表bool _isuse; //是否在被使用,默认是false
};//带头双向循环链表---Central Cache
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
Span* Begin()
{
return _head->_next;
}
Span* End()
{
return _head;
}
void PushFront(Span* span)
{
//头插
Insert(Begin(), span);
}
Span* PopFront()
{
//头删
Span* front = _head->_next;
Erase(front);
return front;
}
bool Empty()
{
//判空
return _head->_next == _head;
}
void Insert(Span* pos, Span* newSpan)
{
//pos位置插入
assert(pos);
assert(newSpan);Span* prev = pos->_prev;
//prev newSpan pos
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
//删除pos位置
assert(pos);
assert(pos != _head);// prev pos next
Span* prev = pos->_prev;
Span* next = pos->_next;prev->_next = next;
next->_prev = prev;
}
~SpanList()
{}
private:
Span* _head;//头节点
public:
std::mutex _centralmtx;//桶锁
};
七.page cache
1.申请内存:
1. 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更⼤⻚寻找⼀个span,如果找到则分裂成两个。⽐如:申请的是4⻚page,4⻚page后⾯没有挂span,则向后⾯寻找更⼤的span,假设在10⻚page位置找到⼀个span,则将10⻚page span分裂为⼀个4⻚page span和⼀个6⻚page span。2. 如果找到_spanList[128]都没有合适的span,则向系统使⽤mmap、brk或者是VirtualAlloc等⽅式申请128⻚page span挂在⾃由链表中,再重复1中的过程。3. 需要注意的是central cache和page cache 的核⼼结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache⼀样的⼤⼩对⻬关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成⼩块内存的⾃由链表。⽽page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i⻚内存。
2.释放内存:
1. 如果central cache释放回⼀个span,则依次寻找span的前后page id的没有在使⽤的空闲span,看是否可以合并,如果合并继续向前寻找。这样就可以将切⼩的内存合并收缩成⼤的span,减少内存碎⽚。
3.page cache代码框架:
PageCache.h
#pragma once
#include "Common.h"
#include "ObjectPool.h"
#include "PageMap.h"class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInstan;
}// 获取从对象到span的映射
Span * MapObjectToSpan(void* obj);// 释放空闲span回到Pagecache,并合并相邻的span
void ReleaseSpanToPageCache(Span * span);//获取一个k页的span
Span* NewSpan(size_t k);std::mutex _pagemtx;
private:
SpanList _pagelists[NPAGES];
ObjectPool<Span> _spanpool;//std::unordered_map<PAGE_ID, Span*> _idspanmap;//页号和span的映射
//std::map<PAGE_ID, Span*> _idspanmap;//页号和span的映射
TCMalloc_PageMap1<32 - PAGE_SHIFT> _idspanmap;PageCache()
{}
PageCache(const PageCache&) = delete;static PageCache _sInstan;
};
PageCache.cpp
#define _CRT_SECURE_NO_WARNINGS 1
#include "PageCache.h"PageCache PageCache::_sInstan;
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);//大于128页的直接向堆申请
if (k > NPAGES - 1)
{
void* ptr = SystemAlloc(k);
//Span* span = new Span;
Span* span = _spanpool.New();span->_page = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = k;
//_idspanmap[span->_page] = span;
_idspanmap.set(span->_page, span);
return span;
}//先检查第k个桶里面有没有span,
if (!_pagelists[k].Empty())
{
Span* kspan = _pagelists[k].PopFront();
//建立id和span的映射,方便centralcache回收查找对应的span
for (PAGE_ID i = 0; i < kspan->_n; i++)
{
//_idspanmap[kspan->_page + i] = kspan;//从页号开始起映射_n个页
_idspanmap.set(kspan->_page + i, kspan);
}
return kspan;
}//检查一下后面的桶里面有没有span,如果有可以把他进行切分
for (size_t i = k + 1; i < NPAGES; ++i)
{
if (!_pagelists[i].Empty())//后面的某个桶不为空
{
//需要进行切分为一个k页的的span和一个n-k页的span
//k页的span返回给cnetral cache
//n-k页的span挂到相应的页位置
Span* nspan = _pagelists[i].PopFront();//取下这个位置的页 起始页号是第 10 页
//Span* kspan = new Span(); //管理连续 5 页:10、11、12、13、14
Span* kspan = _spanpool.New();//在nspan的头部切一个k页下来
kspan->_page = nspan->_page;//kspan->_page = 10
kspan->_n = k;//kspan->_n = 2 → 管理页10、11nspan->_page += k;//nspan->_page = 10 + 2 = 12
nspan->_n -= k; // nspan->_n = 5 - 2 = 3 → 管理页12、13、14_pagelists[nspan->_n].PushFront(nspan);//把剩下的nspan挂到_n下标的位置
//存储nspan的收尾页号跟nspan的映射,方便pagecache回收内存时进行的合并查找
//_idspanmap[nspan->_page] = nspan;
_idspanmap.set(nspan->_page, nspan);//_idspanmap[nspan->_page + nspan->_n - 1] = nspan;
_idspanmap.set(nspan->_page + nspan->_n - 1, nspan);//建立id和span的映射,方便centralcache回收查找对应的span
for (PAGE_ID i = 0; i < kspan->_n; i++)
{
//_idspanmap[kspan->_page + i] = kspan;//从页号开始起映射_n个页
_idspanmap.set(kspan->_page + i, kspan);
}
return kspan;
}
}//走到这个位置就说明后面没有大页的span,这时就去找堆要一个128页的span
//Span* bigspan = new Span;
Span* bigspan = _spanpool.New();void* ptr = SystemAlloc(NPAGES - 1);//向堆申请 NPAGES - 1 个连续的内存页
bigspan->_page = (PAGE_ID)ptr >> PAGE_SHIFT;//计算ptr的起始页
bigspan->_n = NPAGES - 1;//申请的页数也就是_n_pagelists[bigspan->_n].PushFront(bigspan);//插入到对应的桶里面
return NewSpan(k);
}// 获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;//算出页号
//std::unique_lock<std::mutex> lock(_pagemtx);
////auto ret = _idspanmap.find(id);//在建立的映射中查找
//if (ret != _idspanmap.end())
//{
// return ret->second;
//}
//else
//{
// assert(false);
// return nullptr;
//}
auto ret = (Span*)_idspanmap.get(id);
assert(ret != nullptr);
return ret;
}void PageCache::ReleaseSpanToPageCache(Span* span)
{
//大于128页的直接还给堆
if (span->_n > NPAGES - 1)
{
void* ptr = (void*)(span->_page << PAGE_SHIFT);
SystemFree(ptr);
//delete span;
_spanpool.Delete(span);return;
}//对span前后的页尝试进行合并,缓解内存碎片问题
while (1)//向前合并
{
PAGE_ID previd = span->_page - 1;
//auto ret = _idspanmap.find(previd);//是 std::unordered_map<PAGE_ID, Span*>::iterator 类型的迭代器
////前面的页号没有,不进行合并
//if (ret == _idspanmap.end())
//{
// break;
//}
auto ret = (Span*)_idspanmap.get(previd);
if (ret == nullptr)
{
break;
}
//前面相邻的页的span在使用,不合并
Span* prevspan = ret;//前一个相邻span
if (prevspan->_isuse == true)
{
break;
}
//合并出超过128页的span,无法管理也不合并
if (prevspan->_n + span->_n > NPAGES - 1)
{
break;
}span->_page = prevspan->_page;
span->_n += prevspan->_n;_pagelists[prevspan->_n].Erase(prevspan);//删除prevspan未合并之前的位置
//delete prevspan;
_spanpool.Delete(prevspan);
}while (1)//向后合并
{
PAGE_ID nextid = span->_page + span->_n;
/* auto ret = _idspanmap.find(nextid);
if (ret == _idspanmap.end())
{
break;
}*/
auto ret = (Span*)_idspanmap.get(nextid);
if (ret == nullptr)
{
break;
}Span* nextspan = ret;
if (nextspan->_isuse == true)
{
break;
}
if (nextspan->_n + span->_n > NPAGES - 1)
{
break;
}span->_n += nextspan->_n;
_pagelists[nextspan->_n].Erase(nextspan);
//delete nextspan;
_spanpool.Delete(nextspan);
}
_pagelists[span->_n].PushFront(span);
span->_isuse = false;//_idspanmap[span->_page] = span;
_idspanmap.set(span->_page, span);//_idspanmap[span->_page + span->_n - 1] = span;
_idspanmap.set(span->_page + span->_n - 1, span);
}
4.windows和Linux下如何直接向堆申请⻚为单位的⼤块内存:
Linux进程分配内存的两种方式--brk() 和mmap() - VinoZhu - 博客园
Common.h
static const size_t NPAGES = 129;//pagecache中一个桶的最大页数
// 直接去堆上按⻚申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage * (1 << 13), MEM_COMMIT | MEM_RESERVE,PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
VirtualFree(ptr, 0, MEM_RELEASE);
#else
// sbrk unmmap等
#endif
}
八.多线程并发环境下,对比malloc和ConcurrentAlloc申请和释放内存效率对比
Benchmark.cpp
#define _CRT_SECURE_NO_WARNINGS 1
#include "ConcurrentAlloc.h"//多线程环境下对比malloc性能测试
//ntimes 一轮申请和释放内存的次数
//nworks 线程数
//rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&, k](){
std::vector<void*> v;
v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(malloc(16));
//v.push_back(malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
free(v[i]);
}
size_t end2 = clock();
v.clear();malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}for (auto& t : vthread)
{
t.join();
}printf("%u个线程并发执⾏%u轮次,每轮次malloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, malloc_costtime.load());printf("%u个线程并发执⾏%u轮次,每轮次free %u次: 花费:%u ms\n",
nworks, rounds, ntimes, free_costtime.load());printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&]() {
std::vector<void*> v;
v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
v.push_back(ConcurrentAlloc(16));
//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
ConcurrentFree(v[i]);
}
size_t end2 = clock();
v.clear();malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}for (auto& t : vthread)
{
t.join();
}printf("%u个线程并发执⾏%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, malloc_costtime.load());
printf("%u个线程并发执⾏%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, free_costtime.load());printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
nworks, nworks * rounds * ntimes, malloc_costtime + free_costtime);
}int main()
{
size_t n = 10000;
cout << "==========================================================" << endl;
BenchmarkConcurrentMalloc(n, 4, 10);
cout << endl << endl;BenchmarkMalloc(n, 4, 10);
cout << "==========================================================" << endl;return 0;
}
九.使用tcmalloc源码中实现基数树进行优化
#pragma once
#include "Common.h"// Single-level array
template <int BITS>
class TCMalloc_PageMap1
{
private:
static const int LENGTH = 1 << BITS;//计算存储页号需要多少位,32位下为2^19,也就是19个位
void** array_;//二级指针,指针数组,数组每个元素都是void*public:
typedef uintptr_t Number;// 页号类型:uintptr_t 兼容32/64位系统,存储整数型页号//构造函数:初始化数组
explicit TCMalloc_PageMap1()
{
size_t size = sizeof(void*) << BITS;
size_t alignsize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
array_ = (void**)SystemAlloc(alignsize >> PAGE_SHIFT);
// 步骤2:初始化数组所有元素为NULL,也就是指针指向空
memset(array_, 0, sizeof(void*) << BITS);
}// Return the current value for KEY. Returns NULL if not yet set,
// or if k is out of range.
// get方法:根据页号查询指针
void* get(Number k) const
{
if ((k >> BITS) > 0) // 边界检查:k >> BITS > 0 表示k超出2^BITS范围(无效页号)
{
return NULL;
}
return array_[k];
}// REQUIRES "k" is in range "[0,2^BITS-1]".
// REQUIRES "k" has been ensured before.
//
// Sets the value 'v' for key 'k'.
void set(Number k, void* v)
{
array_[k] = v;
}
};// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2
{
private:
// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
// 根节点位宽:固定5位 → 根节点有 2^5=32 个子节点
static const int ROOT_BITS = 5;
static const int ROOT_LENGTH = 1 << ROOT_BITS;// 根节点数组长度=32// 叶子节点位宽 = 总位宽 - 根节点位宽
static const int LEAF_BITS = BITS - ROOT_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;// 每个叶子节点的存储槽位数量// Leaf node,// 叶子节点:存储实际的页指针映射
struct Leaf
{
//用「动态创建的数组结构体」模拟树的叶子节点
void* values[LEAF_LENGTH];// 每个叶子节点有 LEAF_LENGTH 个槽位
};Leaf* root_[ROOT_LENGTH]; //根节点:32个指针,每个指向一个叶子节点(初始为NULL),指针数组
void* (*allocator_)(size_t); // 内存分配器(函数指针,保存外部分配器)public:
typedef uintptr_t Number;//explicit TCMalloc_PageMap2(void* (*allocator)(size_t))
explicit TCMalloc_PageMap2()
{
memset(root_, 0, sizeof(root_));// 根节点所有指针初始化为NULL
PreallocateMoreMemory();
}void* get(Number k) const
{
const Number i1 = k >> LEAF_BITS;// 高 LEAF_BITS 位 → 根节点索引(i1)
const Number i2 = k & (LEAF_LENGTH - 1);// 低 LEAF_BITS 位 → 叶子节点索引(i2)// 步骤2:边界检查 + 叶子节点存在性检查
if ((k >> BITS) > 0 || root_[i1] == NULL)
{
return NULL;
}
// 步骤3:访问叶子节点的对应槽位
return root_[i1]->values[i2];
}void set(Number k, void* v)
{
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
assert(i1 < ROOT_LENGTH);// 断言:根节点索引不越界(调试用)
root_[i1]->values[i2] = v;// 赋值(前提:叶子节点已创建)
}//Ensure方法:按需创建叶子节点(核心延迟分配逻辑)
bool Ensure(Number start, size_t n)
{
// 遍历需要确保的页号范围 [start, start+n-1]
for (Number key = start; key <= start + n - 1;)
{
// 步骤1:提取当前key的根节点索引
const Number i1 = key >> LEAF_BITS;// 步骤2:溢出检查(i1 >= 32 则超出根节点范围)
if (i1 >= ROOT_LENGTH)
return false;// 步骤3:创建叶子节点(如果不存在)
if (root_[i1] == NULL)
{
// 分配叶子节点内存
static ObjectPool<Leaf> leafpool;
Leaf* leaf = (Leaf*)leafpool.New();
memset(leaf, 0, sizeof(*leaf));// 叶子节点槽位初始化为NULL
root_[i1] = leaf;// 根节点指向新创建的叶子节点
}// 步骤4:批量跳过当前叶子节点覆盖的页号(优化循环)
// 例如 LEAF_BITS=15 → 叶子节点覆盖32768个页号,key直接跳到下一个叶子节点的起始页号
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}//预分配所有叶子节点
void PreallocateMoreMemory()
{
// Ensure(0, 1<<BITS):确保 0 ~ 2^BITS-1 所有页号对应的叶子节点都被创建
Ensure(0, 1 << BITS);
}
};// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3
{
private:
// INTERIOR_BITS = (BITS + 2)/3:根/中间节点的位宽(向上取整,保证均匀拆分)
static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;// 根/中间节点的子节点数// LEAF_BITS = BITS - 2*INTERIOR_BITS:叶子节点位宽(总位宽 - 两层内部节点位宽)
static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;// 叶子节点槽位数//内部节点(根/中间层):仅存储子节点指针
struct Node
{
Node* ptrs[INTERIOR_LENGTH];// 指向子节点(中间节点或叶子节点)
};// 叶子节点:存储实际的页指针映射
struct Leaf
{
void* values[LEAF_LENGTH];
};Node* root_; // 根节点(三级树的唯一入口)
void* (*allocator_)(size_t);// 内存分配器Node* NewNode()
{
// 分配Node内存
Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
if (result != NULL)
{
memset(result, 0, sizeof(*result));// 子节点指针初始化为NULL
}
return result;
}public:
typedef uintptr_t Number;explicit TCMalloc_PageMap3(void* (*allocator)(size_t))
{
allocator_ = allocator;// 保存分配器
root_ = NewNode(); // 创建根节点(仅占 INTERIOR_LENGTH * 8 字节)
}void* get(Number k) const
{
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 ||
root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL)
{
return NULL;
}
return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
}void set(Number k, void* v)
{
ASSERT(k >> BITS == 0);
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
}bool Ensure(Number start, size_t n)
{
for (Number key = start; key <= start + n - 1;)
{
const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
// Check for overflow
if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_->ptrs[i1] == NULL)
{
Node* n = NewNode();
if (n == NULL) return false;
root_->ptrs[i1] = n;
}
// Make leaf node if necessary
if (root_->ptrs[i1]->ptrs[i2] == NULL)
{
Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
if (leaf == NULL) return false;
memset(leaf, 0, sizeof(*leaf));
root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}void PreallocateMoreMemory()
{}
};
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐




所有评论(0)