目录

一.项⽬介绍

1.这个项⽬做的是什么?

2.这个项⽬的所需要的知识储备和难度?

二.什么是内存池

1.池化技术

2.内存池

3.内存池主要解决的问题

4.malloc

三.开胃菜--先设计⼀个定⻓的内存池

四.高并发内存池整体框架设计

五.thread cache

1.申请内存:

2.释放内存:

3.TLS--thread local storage:

4.thread cache代码框架:

5.自由链表的哈希桶跟对象大小的映射关系

六.central cache

1.申请内存:

2.释放内存:

3.central cache代码框架:

4.以⻚为单位的⼤内存管理span的定义及spanlist定义

七.page cache

1.申请内存:

2.释放内存:

3.page cache代码框架:

4.windows和Linux下如何直接向堆申请⻚为单位的⼤块内存:

八.多线程并发环境下,对比malloc和ConcurrentAlloc申请和释放内存效率对比

九.使⽤tcmalloc源码中实现基数树进⾏优化


一.项⽬介绍

1.这个项⽬做的是什么?

当前项⽬是实现⼀个⾼并发的内存池,他的原型是google的⼀个开源项⽬tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了⾼效的多线程内存管理,⽤于替代系统的内存分配相关的函数(malloc、free)。
这个项⽬是把tcmalloc最核⼼的框架简化后拿出来,模拟实现出⼀个⾃⼰的⾼并发内存池,⽬的就是学习tcamlloc的精华,这种⽅式有点类似我们之前学习STL容器的⽅式。但是相⽐STL容器部分,tcmalloc的代码量和复杂度上升了很多。

2.这个项⽬的所需要的技术栈?

这个项⽬会⽤到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等等⽅⾯的知识。

二.什么是内存池

1.池化技术

所谓“池化技术”,就是程序先向系统申请过量的资源,然后⾃⼰管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较⼤的开销,不如提前申请好了,这样使⽤时就会变得⾮常快捷,⼤ 提⾼程序运⾏效率。
在计算机中,有很多使⽤“池”这种技术的地⽅,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若⼲数量的线程,让它们处于睡眠状态,当接收到客⼾端的请求时,唤醒池中某个睡眠的线程,让它来处理客⼾端的请求,当处理完这个请求,线程⼜进⼊睡眠状态。

2.内存池

内存池是指程序预先从操作系统申请⼀块⾜够⼤内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,⽽是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,⽽是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。

3.内存池主要解决的问题

1.效率问题:不用每次都向操作系统申请内存,浪费时间

2.内存碎片化问题

        内碎片:由对齐规则产生,等于实际申请大小与对齐后块大小之间的差值

        外碎片:一些空间连续的内存太小,合计内存满足,但是不连续,所有不满足一些内存分配申请

4.malloc

C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的,⽽malloc就是⼀个内存池。malloc() 相当于向操作系统“批发”了⼀块较⼤的内存空间,然后“零售”给程序⽤。当全部“售完”或程序有⼤量的内存需求时,再根据实际需求向操作系统“进货”。
malloc的实现⽅式有很多种,⼀般不同编译器平台⽤的都是不同的。⽐如windows的vs系列⽤的微软⾃⼰写的⼀套,linux gcc⽤的glibc中的ptmalloc。下⾯有⼏篇关于这块的⽂章,⼤概可以去简单看看了解⼀下,关于ptmalloc,有兴趣⼤家可以去看看他的实现细节。

三.开胃菜--先设计⼀个定长的内存池

作为程序员(C/C++)我们知道申请内存使⽤的是malloc,malloc其实就是⼀个通⽤的⼤众货,什么场景下都可以⽤,但是什么场景下都可以⽤就意味着什么场景下都不会有很⾼的性能,下⾯我们就先来设计⼀个定⻓内存池做个开胃菜,当然这个定⻓内存池在我们后⾯的⾼并发内存池中也是有价值的,所以学习他⽬的有两层,先熟悉⼀下简单内存池是如何控制的,第⼆他会作为我们后⾯内存池的⼀个基础组件。
windows和Linux下如何直接向堆申请⻚为单位的⼤块内存
ObjectPool.h
#pragma once
#include "Common.h"
// 直接去堆上按⻚申请空间
inline static void* SystemAlloc(size_t kpage) //kpage表示页数
{
#ifdef _WIN32
    void* ptr = VirtualAlloc(0, kpage * (1 << 13), MEM_COMMIT | MEM_RESERVE,PAGE_READWRITE); //参数1:0代表系统随机分配内存,参数2:页数*一页(8k),参数3:MEM_RESERVE:预留一段虚拟地址空间(不占用物理内存)

​​​​MEM_COMMIT:为预留的地址空间分配物理内存(并映射到虚拟地址)参数 4:PAGE_READWRITE:设置分配的内存页权限为 “可读可写”;

#else
    // linux下brk mmap等
#endif
    if (ptr == nullptr)
        throw std::bad_alloc();//分配失败时抛出标准异常
    return ptr;
}
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
    VirtualFree(ptr, 0, MEM_RELEASE);//参数 1:ptr:要释放的内存起始地址,参数 2:0:配合 MEM_RELEASE 使用时,必须设为 0,表示释放整个预留 / 提交的内存块,参数 3:MEM_RELEASE:释放内存块(同时解除物理内存映射和虚拟地址预留)

#else
    // sbrk unmmap等
#endif
}

template<class T>//类型模板参数
class ObjectPool
{
public:
    T* New()
    {
        T* obj = nullptr;

        //优先考虑还回来的内存块对象再次重复利用
        if (_freelist)
        {
            void* next = *(void**)_freelist;//_freelist的下一个节点
            obj = (T*)_freelist;//取头
            _freelist = next;//next为头
        }
        else
        {
            //剩余内存不够一个对象大小时,则重新开大块空间
            if (_remainBytes < sizeof(T))
            {
                _remainBytes = 128 * 1024;//128KB
                //_memory = (char*)malloc(_remainBytes);
                _memory = (char*)SystemAlloc(_remainBytes >> 13);//16页
                if (_memory == nullptr)
                {
                    throw std::bad_alloc();//表示内存分配失败
                }
            }

            obj = (T*)_memory;
            size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);//强制切分出至少指针大小的块,保证释放时能连成链表
            _memory += objSize;//起始地址后移
            _remainBytes -= objSize;//剩余内存减去用了的
        }

        //定位new,显示调用T的构造函数初始化
        new(obj)T;//定位new,对已分配内存的未初始化的指针进行初始化

        return obj;//返回指向申请内存的指针
    }
    void Delete(T* obj)
    {
        //显示调用析构函数
        obj->~T();//仅调用析构函数清理资源,不释放内存

        //头插
       *(void**)obj = (T*)_freelist;//obj指向旧的第一个节点
        _freelist = obj;//obj充当新的第一个节点
    }
private:
    char* _memory;//指向大块内存的指针
    size_t _remainBytes = 0;//大块内存在切分过程中剩余字节数

    void* _freelist;//还回来过程中链接的自由链表的头指针
};

四.高并发内存池整体框架设计

项目源码项目代码: 关于项目的代码

现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本⾝其实已经很优秀,那么我们项⽬的原型tcmalloc就是在多线程⾼并发的场景下更胜⼀筹,所以这次我们实现的内存池需要考虑以下⼏⽅⾯的问题。
1. 性能问题。
2. 多线程环境下,锁竞争问题。
3. 内存碎⽚问题。
concurrent memory pool主要由以下3个部分构成:
        1. thread cache:线程缓存是每个线程独有的,⽤于⼩于256KB的内存的分配,线程从这⾥申请内存不需要加锁,每个线程独享⼀个cache,这也就是这个并发线程池⾼效的地⽅。
        2. central cache:中⼼缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免⼀个线程占⽤了太多的内存,⽽其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的⽬的。central cache是存在竞争的,所以从这⾥取内存对象是需要加锁,⾸先这⾥⽤的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这⾥竞争不会很激烈。
        3. page cache:⻚缓存是在central cache缓存上⾯的⼀层缓存,存储的内存是以⻚为单位存储及分配的,central cache没有内存对象时,从page cache分配出⼀定数量的page,并切割成定⻓⼤⼩的⼩块内存,分配给central cache。当⼀个span的⼏个跨度⻚的对象都回收以后,page cache会回收central cache满⾜条件的span对象,并且合并相邻的⻚,组成更⼤的⻚,缓解内存碎⽚的问题。

五.thread cache

thread cache是哈希桶结构,每个桶是⼀个按桶位置映射⼤⼩的内存块对象的⾃由链表。每个线程都会有⼀个thread cache对象,这样每个线程在这⾥获取对象和释放对象时是⽆锁的。

1.申请内存:

1. 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶⾃由链表下标i。
2. 如果⾃由链表_freeLists[i]中有对象,则直接Pop⼀个内存对象返回。
3. 如果_freeLists[i]中没有对象时,则批量从central cache中获取⼀定数量的对象,插⼊到⾃由链表并返回⼀个对象。

2.释放内存:

1. 当释放内存⼩于256k时将内存释放回thread cache,计算size映射⾃由链表桶位置i,将对象Push到_freeLists[i]。
2. 当链表的⻓度过⻓,则回收⼀部分内存对象到central cache。

3.TLS--thread local storage:

(18 封私信) Thread Local Storage(线程局部存储)TLS - 知乎

ConcurrentAlloc.h

#pragma once
#include "ThreadCache.h"
#include "PageCache.h"
#include "Common.h"
#include "ObjectPool.h"

//ConcurrentAlloc 是线程安全的入口层,
// 它利用 TLS 为每个线程创建并绑定专属的 ThreadCache,
// 保证了多线程下内存分配的无锁、高效、安全;

static void* ConcurrentAlloc(size_t size)
{
    if (size > MAX_BYTES)
    {
        size_t alignsize = SizeClass::RoundUp(size);
        size_t kpage = alignsize >> PAGE_SHIFT;

        PageCache::GetInstance()->_pagemtx.lock();
        Span* span = PageCache::GetInstance()->NewSpan(kpage);
        span->_objsize = size;
        PageCache::GetInstance()->_pagemtx.unlock();

        void* ptr = (void*)(span->_page << PAGE_SHIFT);//页号 → 物理地址 的转换
        return ptr;
    }
    else
    {
        //通过TLS每个线程无锁的获取自己的专属的ThreadCache对象

        //检查当前线程的本地 ThreadCache 指针是否为空
        if (pTLSThreadCache == nullptr)
        {
            static ObjectPool<ThreadCache> tcpool;
            //如果为空:说明当前线程是第一次申请内存,需要创建专属的对象,并赋值给这个 TLS 指针;
            //pTLSThreadCache = new ThreadCache;//创建 ThreadCache 对象
            pTLSThreadCache = tcpool.New();
        }

        //std::cout << std::this_thread::get_id() << ":" << pTLSThreadCache << std::endl;

        return pTLSThreadCache->Allocate(size);
    }
}

static void ConcurrentFree(void* ptr)
{
    Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);//通过地址拿到对应的span
    size_t size = span->_objsize;

    if (size > MAX_BYTES)
    {
        Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);//通过地址拿到对应的span

        PageCache::GetInstance()->_pagemtx.lock();
        PageCache::GetInstance()->ReleaseSpanToPageCache(span);
        PageCache::GetInstance()->_pagemtx.unlock();
    }
    else
    {
        assert(pTLSThreadCache);
        pTLSThreadCache->Deallocate(ptr, size);
    }
}

4.thread cache代码框架:

首先我们先明白在ThreadCache中的一些名词含义。

1. 数组下标(全局桶下标)

  • 含义:内存池核心管理数组(存储所有自由链表头指针)的索引值,是整个内存池范围内的全局唯一标识
  • 作用:通过下标可直接定位到对应桶,范围从 0 开始(如 0、1、20、72 等);
  • 举例:下标 0 对应 8B 桶,下标 20 对应 208B 桶,下标 72 对应 1152B 桶。

2. 桶

  • 含义:数组下标对应的 “逻辑单元”,是 “固定大小内存块的专属管理单元”;
  • 核心特征:一个桶唯一对应一种固定大小的内存块,且仅管理该大小的内存块;
  • 本质:桶本身不是实体,是对 “数组下标 + 对应链表” 的统称(如下标 0=8B 桶,下标 20=208B 桶)。

3. 对齐粒度

  • 含义:内存大小向上取整的 “最小步长”,是按区间划分的统一规则;
  • 作用:将任意申请的内存大小规整为桶能管理的固定大小,避免碎片化;
  • 区间规则
    • [1,128B]:对齐粒度 8B;
    • (128,1024B]:对齐粒度 16B;
    • (1024,8192B]:对齐粒度 128B;
    • 以此类推。

4. 链表(自由链表 / 空闲链表)

  • 含义:每个桶绑定的单向链表,节点是该桶对应大小的空闲内存块;
  • 作用:管理同大小的空闲内存块,实现高效分配 / 释放(分配时头删、释放时头插);
  • 核心特征:一个桶对应且仅对应一个链表,链表中所有节点的内存块大小完全一致。

关键关联逻辑

通过对齐粒度将任意申请的内存大小规整为固定值 → 根据固定值计算出桶的下标(数组下标) → 定位到唯一的 → 该桶通过专属的链表管理所有同大小的空闲内存块。

ThreadCache.h

#pragma once
#include "Common.h"


//ThreadCache本质是由一个哈希映射的对象自由链表构成
//ThreadCache 是线程级内存缓存的外壳,_freelist 是它的内脏——_freelist 
//数组把不同大小的内存块分级管理,ThreadCache 用这套结构实现了无锁、低碎片的内存分配服务。

class ThreadCache
{
public:
    
//申请和释放空间
    void* Allocate(size_t size);
    void Deallocate(void* ptr, size_t size);
  
 //从中心缓存获取对象
    void* FetchFromCentralCache(size_t index, size_t size);

    // 释放对象时,链表过⻓时,回收内存回到中⼼缓存
    void ListTooLong(FreeList& list, size_t size);
private:
    FreeList _freelist[NFREE_LIST];
//自由链表数组定义,数组的每个元素都是一个 FreeList 对象,对应一个哈希桶
};

//TLS thread local storage线程本地存储
//为每一个线程创建独立的 pTLSThreadCache 指针副本,每个线程只能访问自己的副本,线程间互不干扰。

static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;//定义线程本地变量

ThreadCache.cpp

#define _CRT_SECURE_NO_WARNINGS 1
#include "ThreadCache.h"
#include "CentralCache.h"

void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
  
 //慢开始反馈调节算法
    //1.最开始不会向centralcache一次批量要太多,可能用不完
    //2.如果不断有这个size大小内存的需求,那么就会不断增长直到上限
    //3.size越大,一次向Central Cache要的batchNum就越小
    //4.size越小,一次向Central Cache要的batchNum就越大

    size_t batchNum=min(_freelist[index].MaxSize(),SizeClass::NumMoveSize(size));//计算一次拿取size大小个数
    if (_freelist[index].MaxSize() == batchNum)
    {
        _freelist[index].MaxSize() += 1;
    }
    void* start = nullptr;
    void* end = nullptr;
    size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start,end,batchNum,size);
//实际获取的数量
    assert(actualNum >= 1);//保证至少有一个

    if (actualNum == 1)
    {
        assert(start == end);
        return start;
//直接返回地址给ptr
    }
    else
    {
        _freelist[index].PushRange(NextObj(start), end, actualNum - 1);
//把从第2个开始的内存块插入到链表
        return start;//直接返回第一个内存块地址给ptr
    }
}

void* ThreadCache::Allocate(size_t size)
{
  
 //空间申请
    assert(size <= MAX_BYTES);
    size_t alignSize = SizeClass::RoundUp(size);
//计算实际所分配的大小
    size_t index = SizeClass::Index(size);//计算是桶的全局下标

    //判断此下标桶是否为空
    if (!_freelist[index].Empty())
    {
     
   //不为空,则Pop出去
        return _freelist[index].Pop(); 
    }
    else
    {
      
 //为空,没有内存块,向中心缓存获取
        return FetchFromCentralCache(index, alignSize);
    }
}

void ThreadCache::Deallocate(void* ptr, size_t size)
{
  
 //空间释放
    assert(size <= MAX_BYTES);
    assert(ptr);

    //找出映射的自由链表桶,对象插入进入
    size_t index = SizeClass::Index(size);
    _freelist[index].Push(ptr);

    //当链表长度大于一次批量申请的内存时就开始还一段list给centralcache
    if (_freelist[index].Size() > _freelist[index].MaxSize())
    {
        ThreadCache::ListTooLong(_freelist[index], size);
    }
}

void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
    void* start = nullptr;
    void* end = nullptr;
    list.PopRange(start, end, list.MaxSize());

    CentralCache::GetInstance()->ReleaseListToSpans(start,size);
}

5.自由链表的哈希桶跟对象大小的映射关系

Common.h

#pragma once

#include <iostream>
#include <vector>
#include <unordered_map>
#include <map>
#include <algorithm>
#include <new> 
#include <thread>
#include <mutex>
#include <assert.h>
#include <time.h>

using std::cout;
using std::endl;

static const size_t MAX_BYTES = 256 * 1024;//桶的内存256KB

static const size_t NFREE_LIST = 208;//桶的数量

#ifdef _WIN64
    typedef unsigned long long PAGE_ID;
//_WIN64下面_WIN64和_WIN32都会被定义
#elif _WIN32
    typedef size_t PAGE_ID;
//_WIN32下面只会定义_WIN32
#endif

#ifdef _WIN32
    #include <Windows.h>
#else

    //linux
#endif

//&是引用,且是 void* 类型的引用,核心作用是让函数返回值可以被赋值(作为左值)
static void*& NextObj(void* obj)
{
    assert(obj != nullptr);
    return *(void**)obj;
}

//管理切分好的小对象的自由链表
//自由链表类---Thread Cache

class FreeList
{
public:
    void Push(void* obj)
    {
        assert(obj);

        //头插
        //*(void**)obj = _freelist;

        NextObj(obj) = _freelist;
        _freelist = obj;
        ++_size;
    }
    void PushRange(void* start, void* end, size_t n)
    {

        //头插入多个
        NextObj(end) = _freelist;
        _freelist = start;
        _size += n;
    }
    void PopRange(void*& start, void*& end, size_t n)
    {

        //头删多个
        assert(n <= _size);
        start = _freelist;
        end = start;
        for (size_t i = 0; i < n - 1; i++)
        {
            if (end == nullptr)
            {
                break;
            }
            end = NextObj(end);
        }
        if(end==nullptr)
        {
            _freelist = nullptr;
        }
        else
        {
            _freelist = NextObj(end);
            NextObj(end) = nullptr;
        }
        _size -= n;
    }

    void* Pop()
    {
        assert(_freelist);

        //头删
        void* obj = _freelist;
        _freelist = NextObj(obj);
        --_size;

        return obj;
    }
    bool Empty()
    {
        //判空
        return _freelist == nullptr;
    }

    size_t& MaxSize() { return _maxsize; }
    size_t Size() { return _size; }
private:
    void* _freelist = nullptr;
    size_t _maxsize = 1;
    size_t _size = 0;
};


class SizeClass
{
    // 整体控制在最多10%左右的内碎⽚浪费
    // 内存大小区间 (字节)     对齐粒度                 对应 freelist 下标区间    桶数量
    // [1,128]                         8byte对⻬                 freelist[0,16)                    16
    // [128+1,1024]                16byte对⻬               freelist[16,72)                 56
    // [1024+1,81024]            128byte对⻬              freelist[72,128)              56
    // [8*1024+1,641024]       1024byte对⻬            freelist[128,184)            56
    // [64*1024+1,256*1024]    8*1024byte对⻬      freelist[184,208)            24

    //size_t _RoundUp(size_t size, size_t alignNum)//普通方法
    //{
    //    size_t alignSize;
    //    if (size % alignNum != 0)
    //    {
    //        alignSize = (size / alignNum + 1) * alignNum;
    //        //计算对齐数  size为9,需要分配16, (9/8 + 1)*8=16
    //    }
    //    else
    //    {
    //        alignSize = size;
    //    }
    //    return alignSize;
    //}

public:

    //计算对象大小的对齐映射规则
    static inline size_t _RoundUp(size_t bytes, size_t align)//性能方法
    {
        return (((bytes)+align - 1) & ~(align - 1));
    }

    static inline size_t RoundUp(size_t size)
    {
        if (size <= 128)
        {
            return _RoundUp(size, 8);//8字节对齐
        }
        else if (size <= 1024)
        {
            return _RoundUp(size, 16);//16字节对齐
        }
        else  if (size <= 8 * 1024)
        {
            return _RoundUp(size, 128);//128字节对齐
        }
        else if (size <= 64 * 1024)
        {
            return _RoundUp(size, 1024);//1024字节对齐
        }
        else if (size <= 256 * 1024)
        {
            return _RoundUp(size, 8 * 1024);//8*1024字节对齐
        }
        else
        {
            return _RoundUp(size, 1 << PAGE_SHIFT);//以页为单位对齐
        }
    }

    // 计算映射的哪一个自由链表桶
    //size_t _Index(size_t bytes,size_t alignNum)//普通方法
    //{
    //    if (bytes % alignNum == 0)
    //    {
    //        return  bytes / alignNum - 1;// 8/8-1=0
    //    }
    //    else
    //    {
    //        return bytes / alignNum;//9/8=1
    //    }
    //}

    static inline size_t _Index(size_t bytes, size_t align_shift)//性能方法
    {
        return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
        //以8为例,  (8+1<<3-1) >> 3 -1=(8+7)>>3-1=0;
    }
    static inline size_t Index(size_t bytes)
    {
        assert(bytes <= MAX_BYTES);

        // 每个区间有多少个链(16: [1,128]区间桶数;56: 后续三个区间桶数)
        static int group_array[4] = { 16, 56, 56, 56 };

        if (bytes <= 128) 
        {
            return _Index(bytes, 3);//这里的3是2^3=8
        }
        else if (bytes <= 1024) 
        {
            return _Index(bytes - 128, 4) + group_array[0];
        }
        else if (bytes <= 8192) 
        { 
            return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
        }
        else if (bytes <= 64 * 1024) 
        {
            return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
        }
        else if (bytes <= 256 * 1024) 
        {
            return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
        }
        else 
        {
            assert(false);
        }

        return -1;
    }

    //// ⼀次从中⼼缓存获取多少个
    static size_t NumMoveSize(size_t size)
    {
        assert(size > 0);

        // 核心公式:用最大可处理字节数 / 单个内存块大小 = 理论最大批量数
        int num = MAX_BYTES / size;

        // 下限:至少批量拿2个(避免频繁向CentralCache申请,减少锁竞争)
        if (num < 2)
            num = 2;

        // 上限:最多批量拿512个(避免一次性拿太多,导致内存浪费/其他线程饥饿)
        if (num > 512)
            num = 512;

        return num;
    }
};

六.central cache

central cache也是⼀个哈希桶结构,他的哈希桶的映射关系跟thread cache是⼀样的。不同的是他的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下⾯的span中的⼤内存块被按映射关系切成了⼀个个⼩内存块对象挂在span的⾃由链表中。

1.申请内存:

1. 当thread cache中没有内存时,就会批量向central cache申请⼀些内存对象,这⾥的批量获取对象的数量使⽤了类似⽹络tcp协议拥塞控制的慢开始算法;central cache也有⼀个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不过这⾥使⽤的是⼀个桶锁,尽可能提⾼效率。
2. central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请⼀个新的span对象,拿到span以后将span管理的内存按⼤⼩切好作为⾃由链表链接到⼀起。然后从span中取对象给thread cache。
3. central cache的中挂的span中use_count记录分配了多少个对象出去,分配⼀个对象给thread cache,就++use_count

2.释放内存:

1. 当thread_cache过⻓或者线程销毁,则会将内存释放回central cache中的,释放回来时--
use_count。当use_count减到0时则表⽰所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲⻚进⾏合并。

3.central cache代码框架:

CentralCache.h

#pragma once
#include "Common.h"


////CentralCache 使用单例模式的核心逻辑:
//唯一性:保证内存调度中心全局唯一,避免内存碎片化和分配规则混乱;
//资源效率:避免核心数据结构重复初始化,节省内存和启动时间;
//同步简化:集中管理多线程锁机制,降低并发控制复杂度,契合内存池分层架构设计。

//单例饿汉模式
class CentralCache
{
public:
    //提供一个接口
    static CentralCache* GetInstance()
    {
        return &_sInst;
    }

    // 获取⼀个⾮空的span
    Span* GetOneSpan(SpanList& list, size_t size);

    // 从中⼼缓存获取⼀定数量的对象给thread cache
    size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);

    // 将⼀定数量的对象释放到span跨度
    void ReleaseListToSpans(void* start, size_t size);
private:
    SpanList _spanlists[NFREE_LIST];
//centralcache里面桶的数量
private:
  
 //防止构造和拷贝构造
    CentralCache()
    {}
    CentralCache(const CentralCache&) = delete;

    static CentralCache _sInst;
};

CentralCache.cpp

#define _CRT_SECURE_NO_WARNINGS 1
#include "CentralCache.h"
#include "PageCache.h"

CentralCache CentralCache::_sInst;

// 获取⼀个⾮空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
  
 //遍历查找当前span链表是否还有未分配对象的span
    Span* it = list.Begin();//拿到该桶链表的头节点
    while (it != list.End())
    {
        if (it->_freelist != nullptr)
        {
            return it;
        }
        else
        {
            it = it->_next;
        }
    }

    //先把centralcache的桶锁解掉,这样如果其他线程释放内存回来,不会阻塞
    list._centralmtx.unlock();

    //走到这里说明没有空闲的span,只能找page cache要一个span
    PageCache::GetInstance()->_pagemtx.lock();
    Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
    span->_isuse = true;
//在使用
    span->_objsize = size;
    PageCache::GetInstance()->_pagemtx.unlock();

    //对获取的span进行切分,不需要加锁,因为这会其他线程访问不到这个span,还没有挂起

    //计算span的大块内存在页内中页号的起始地址
    char* start = (char*)(span->_page << PAGE_SHIFT);
  
 //计算页号的内存字节数大小(字节数)
    size_t bytes = span->_n << PAGE_SHIFT;
    char* end = start + bytes;
    
//把大块内存切成自由链表链接起来,这里链接时尾插(内存连续)
    //1.先切一块下来做头,方便尾插
    span->_freelist = start;
    start += size;
//start后移一个节点
    void* tail = span->_freelist;//tail和freelist指向最开始的start

    while (start < end)
    {
        
//遍历连接新开辟的sapn
        NextObj(tail) = start;//tail指向下一个节点
        tail = start;//tail后移一个节点
        start += size;//start后移一个节点
    }
    NextObj(tail) = nullptr;//遍历到最后一个节点置空

    //切好span以后,需要把span挂到桶里面去的时候,在加锁
    list._centralmtx.lock();
    list.PushFront(span);

    return span;
}

size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
    size_t index = SizeClass::Index(size);
//计算桶的位置,也就是下标
    _spanlists[index]._centralmtx.lock();//加锁
    Span* span = CentralCache::GetOneSpan(_spanlists[index], size);
    assert(span);
    assert(span->_freelist);

    //从span中获取batchNum个对象,若不够则有多少拿多少
    start = span->_freelist;
    end = start;
    size_t i = 0;
    size_t actualNum = 1;
//实际获取的数量
    while(NextObj(end) != nullptr && i < batchNum - 1)//使用while来防止一个span不够的情况
    {
        end = NextObj(end);
//end后移找到需要被拿取的内存块的位置
        ++i;
        ++actualNum;
    }
    span->_freelist = NextObj(end);
//头指向end的下一个位置
    NextObj(end) = nullptr;
    span->_usecount += actualNum;

    _spanlists[index]._centralmtx.unlock();//解锁

    return actualNum;
}

void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
    if (start == nullptr) return;
    size_t index = SizeClass::Index(size);
    _spanlists[index]._centralmtx.lock();
    while (start)
    {
        void* next = NextObj(start);

        Span* span = PageCache::GetInstance()->MapObjectToSpan(start);//算出对应的sapn
        NextObj(start) = span->_freelist;//头插
        span->_freelist = start;
        span->_usecount--;
        if (span->_usecount == 0)
        {
          
 //说明span切分出的小块内存全部回收完,可以这个span就回收给pagecache
            _spanlists[index].Erase(span);
            span->_freelist = nullptr;
            span->_next = nullptr;
            span->_prev = nullptr;

            //解掉桶锁,释放span给pagecache,使用pagecache的锁
            _spanlists[index]._centralmtx.unlock();

            PageCache::GetInstance()->_pagemtx.lock();
            PageCache::GetInstance()->ReleaseSpanToPageCache(span);
            PageCache::GetInstance()->_pagemtx.unlock();
        
            _spanlists[index]._centralmtx.lock();
        }
        start = next;
    }

    _spanlists[index]._centralmtx.unlock();
}

4.以页为单位的大内存管理span的定义及spanlist定义

Common.h

class SizeClass
{

     static const size_t PAGE_SHIFT = 13;// ⻚⼤⼩转换偏移, 即⼀⻚定义为2^13,也就是8KB,8*1024byte

    // 计算⼀次向PageCache⼏个⻚
    // 单个对象 8byte   num=512   npage=4096byte--1页
    // ...
    // 单个对象 256KB

    static size_t NumMovePage(size_t size)
    {
        size_t num = NumMoveSize(size);
//计算批量拿多少个size大小的块
        size_t npage = num * size; //这批块的总字节数

        npage >>= PAGE_SHIFT;//转换为页
        if (npage == 0)
            npage = 1;
        return npage;
    }
};

//管理多个连续页的大块内存跨度
struct Span
{
    PAGE_ID _page = 0;            
//大块内存起始页的页号
    size_t _n = 0;                //页数,也就是数组下标

    Span* _next = nullptr;        //双向页表的结构
    Span* _prev = nullptr;

    size_t _objsize = 0;            //切好的小对象的大小
    size_t _usecount = 0;        //切成小块内存的,被分配给ThreadCache的计数
    void* _freelist = nullptr;    //切好的小块内存的自由链表

    bool _isuse;                //是否在被使用,默认是false
};

//带头双向循环链表---Central Cache
class SpanList
{
public:
    SpanList()
    {
        _head = new Span;
        _head->_next = _head;
        _head->_prev = _head;
    }


    Span* Begin()
    {
        return _head->_next;
    }


    Span* End()
    {
        return _head;
    }


    void PushFront(Span* span)
    {
  
     //头插
        Insert(Begin(), span);
    }


    Span* PopFront()
    {

        //头删
        Span* front = _head->_next;
        Erase(front);
        return front;
    }


    bool Empty()
    {

        //判空
        return _head->_next == _head;
    }


    void Insert(Span* pos, Span* newSpan)
    {

        //pos位置插入
        assert(pos);
        assert(newSpan);

        Span* prev = pos->_prev;
        //prev newSpan pos
        prev->_next = newSpan;
        newSpan->_prev = prev;
        newSpan->_next = pos;
        pos->_prev = newSpan;
    }


    void Erase(Span* pos)
    {

        //删除pos位置
        assert(pos);
        assert(pos != _head);

        // prev pos next
        Span* prev = pos->_prev;
        Span* next = pos->_next;

        prev->_next = next;
        next->_prev = prev;
    }


    ~SpanList()
    {}
private:
    Span* _head;
//头节点
public:
    std::mutex _centralmtx;
//桶锁
};

七.page cache

1.申请内存:

1. 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更⼤⻚寻找⼀个span,如果找到则分裂成两个。⽐如:申请的是4⻚page,4⻚page后⾯没有挂span,则向后⾯寻找更⼤的span,假设在10⻚page位置找到⼀个span,则将10⻚page span分裂为⼀个4⻚page span和⼀个6⻚page span。
2. 如果找到_spanList[128]都没有合适的span,则向系统使⽤mmap、brk或者是VirtualAlloc等⽅式申请128⻚page span挂在⾃由链表中,再重复1中的过程。
3. 需要注意的是central cache和page cache 的核⼼结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache⼀样的⼤⼩对⻬关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成⼩块内存的⾃由链表。⽽page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i⻚内存。

2.释放内存:

1. 如果central cache释放回⼀个span,则依次寻找span的前后page id的没有在使⽤的空闲span,看是否可以合并,如果合并继续向前寻找。这样就可以将切⼩的内存合并收缩成⼤的span,减少内存碎⽚。

3.page cache代码框架:

PageCache.h

#pragma once
#include "Common.h"
#include "ObjectPool.h"
#include "PageMap.h"

class PageCache
{
public:
    static PageCache* GetInstance()
    {
        return &_sInstan;
    }

    // 获取从对象到span的映射
    Span * MapObjectToSpan(void* obj);

    // 释放空闲span回到Pagecache,并合并相邻的span
    void ReleaseSpanToPageCache(Span * span);

    //获取一个k页的span
    Span* NewSpan(size_t k);

    std::mutex _pagemtx;
private:
    SpanList _pagelists[NPAGES];
    ObjectPool<Span> _spanpool;

    //std::unordered_map<PAGE_ID, Span*> _idspanmap;//页号和span的映射
    //std::map<PAGE_ID, Span*> _idspanmap;//页号和span的映射

    TCMalloc_PageMap1<32 - PAGE_SHIFT> _idspanmap;

    PageCache()
    {}
    PageCache(const PageCache&) = delete;

    static PageCache _sInstan;
};

PageCache.cpp

#define _CRT_SECURE_NO_WARNINGS 1
#include "PageCache.h"

PageCache PageCache::_sInstan;

Span* PageCache::NewSpan(size_t k)
{
    assert(k > 0);

    //大于128页的直接向堆申请
    if (k > NPAGES - 1)
    {
        void* ptr = SystemAlloc(k);
        //Span* span = new Span;
        Span* span = _spanpool.New();

        span->_page = (PAGE_ID)ptr >> PAGE_SHIFT;
        span->_n = k;
        //_idspanmap[span->_page] = span;
        _idspanmap.set(span->_page, span);
        return span;
    }

    //先检查第k个桶里面有没有span,
    if (!_pagelists[k].Empty())
    {
        Span* kspan = _pagelists[k].PopFront();
        //建立id和span的映射,方便centralcache回收查找对应的span
        for (PAGE_ID i = 0; i < kspan->_n; i++)
        {
            //_idspanmap[kspan->_page + i] = kspan;//从页号开始起映射_n个页
            _idspanmap.set(kspan->_page + i, kspan);
        }
        return kspan;
    }

    //检查一下后面的桶里面有没有span,如果有可以把他进行切分
    for (size_t i = k + 1; i < NPAGES; ++i)
    {
        if (!_pagelists[i].Empty())//后面的某个桶不为空
        {
            //需要进行切分为一个k页的的span和一个n-k页的span
            //k页的span返回给cnetral cache
            //n-k页的span挂到相应的页位置

            Span* nspan = _pagelists[i].PopFront();//取下这个位置的页     起始页号是第 10 页
            //Span* kspan = new Span();                            //管理连续 5 页:10、11、12、13、14

            Span* kspan = _spanpool.New();

            //在nspan的头部切一个k页下来
            kspan->_page = nspan->_page;//kspan->_page = 10
            kspan->_n = k;//kspan->_n = 2 → 管理页10、11

            nspan->_page += k;//nspan->_page = 10 + 2 = 12
            nspan->_n -= k; // nspan->_n = 5 - 2 = 3 → 管理页12、13、14

            _pagelists[nspan->_n].PushFront(nspan);//把剩下的nspan挂到_n下标的位置
            //存储nspan的收尾页号跟nspan的映射,方便pagecache回收内存时进行的合并查找

            //_idspanmap[nspan->_page] = nspan;
            _idspanmap.set(nspan->_page, nspan);

            //_idspanmap[nspan->_page + nspan->_n - 1] = nspan;
            _idspanmap.set(nspan->_page + nspan->_n - 1, nspan);

            //建立id和span的映射,方便centralcache回收查找对应的span
            for (PAGE_ID i = 0; i < kspan->_n; i++)
            {
                //_idspanmap[kspan->_page + i] = kspan;//从页号开始起映射_n个页
                _idspanmap.set(kspan->_page + i, kspan);
            }
            return kspan;
        }
    }

    //走到这个位置就说明后面没有大页的span,这时就去找堆要一个128页的span
    //Span* bigspan = new Span;

    Span* bigspan = _spanpool.New();

    void* ptr = SystemAlloc(NPAGES - 1);//向堆申请 NPAGES - 1 个连续的内存页
    bigspan->_page = (PAGE_ID)ptr >> PAGE_SHIFT;//计算ptr的起始页
    bigspan->_n = NPAGES - 1;//申请的页数也就是_n

    _pagelists[bigspan->_n].PushFront(bigspan);//插入到对应的桶里面

    return NewSpan(k);
}

// 获取从对象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
    PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;//算出页号
    //std::unique_lock<std::mutex> lock(_pagemtx);
    ////auto ret = _idspanmap.find(id);//在建立的映射中查找
    //if (ret != _idspanmap.end())
    //{
    //    return ret->second;
    //}
    //else
    //{
    //    assert(false);
    //    return nullptr;
    //}

    auto ret = (Span*)_idspanmap.get(id);
    assert(ret != nullptr);
    return ret;
}

void PageCache::ReleaseSpanToPageCache(Span* span)
{
    //大于128页的直接还给堆
    if (span->_n > NPAGES - 1)
    {
        void* ptr = (void*)(span->_page << PAGE_SHIFT);
        SystemFree(ptr);
        //delete span;
        _spanpool.Delete(span);

        return;
    }

    //对span前后的页尝试进行合并,缓解内存碎片问题

    while (1)//向前合并
    {
        PAGE_ID previd = span->_page - 1;
        //auto ret = _idspanmap.find(previd);//是 std::unordered_map<PAGE_ID, Span*>::iterator 类型的迭代器
        ////前面的页号没有,不进行合并
        //if (ret == _idspanmap.end())
        //{
        //    break;
        //}

        auto ret = (Span*)_idspanmap.get(previd);
        if (ret == nullptr)
        {
            break;
        }


        //前面相邻的页的span在使用,不合并
        Span* prevspan = ret;//前一个相邻span
        if (prevspan->_isuse == true)
        {
            break;
        }
        //合并出超过128页的span,无法管理也不合并
        if (prevspan->_n + span->_n > NPAGES - 1)
        {
            break;
        }

        span->_page = prevspan->_page;
        span->_n += prevspan->_n;

        _pagelists[prevspan->_n].Erase(prevspan);//删除prevspan未合并之前的位置
        //delete prevspan;

        _spanpool.Delete(prevspan);
    }

    while (1)//向后合并
    {
        PAGE_ID nextid = span->_page + span->_n;
    /*    auto ret = _idspanmap.find(nextid);
        if (ret == _idspanmap.end())
        {
            break;
        }*/

        auto ret = (Span*)_idspanmap.get(nextid);
        if (ret == nullptr)
        {
            break;
        }

        Span* nextspan = ret;
        if (nextspan->_isuse == true)
        {
            break;
        }
        if (nextspan->_n + span->_n > NPAGES - 1)
        {
            break;
        }

        span->_n += nextspan->_n;

        _pagelists[nextspan->_n].Erase(nextspan);
        //delete nextspan;
        _spanpool.Delete(nextspan);
    }
    _pagelists[span->_n].PushFront(span);
    span->_isuse = false;

    //_idspanmap[span->_page] = span;
    _idspanmap.set(span->_page, span);

    //_idspanmap[span->_page + span->_n - 1] = span;
    _idspanmap.set(span->_page + span->_n - 1, span);
}

4.windows和Linux下如何直接向堆申请⻚为单位的⼤块内存:

VirtualAlloc_百度百科

Linux进程分配内存的两种方式--brk() 和mmap() - VinoZhu - 博客园

Common.h

static const size_t NPAGES = 129;//pagecache中一个桶的最大页数

// 直接去堆上按⻚申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
    void* ptr = VirtualAlloc(0, kpage * (1 << 13), MEM_COMMIT | MEM_RESERVE,PAGE_READWRITE);
#else

    // linux下brk mmap等
#endif
    if (ptr == nullptr)
        throw std::bad_alloc();
    return ptr;
}
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
    VirtualFree(ptr, 0, MEM_RELEASE);
#else

    // sbrk unmmap等
#endif
}

八.多线程并发环境下,对比malloc和ConcurrentAlloc申请和释放内存效率对比

Benchmark.cpp

#define _CRT_SECURE_NO_WARNINGS 1
#include "ConcurrentAlloc.h"

//多线程环境下对比malloc性能测试


//ntimes 一轮申请和释放内存的次数
//nworks 线程数
//rounds 轮次

void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
    std::vector<std::thread> vthread(nworks);
    std::atomic<size_t> malloc_costtime = 0;
    std::atomic<size_t> free_costtime = 0;

    for (size_t k = 0; k < nworks; ++k)
    {
        vthread[k] = std::thread([&, k](){
            std::vector<void*> v;
            v.reserve(ntimes);

            for (size_t j = 0; j < rounds; ++j)
            {
            size_t begin1 = clock();
            for (size_t i = 0; i < ntimes; i++)
            {
                v.push_back(malloc(16));
             
   //v.push_back(malloc((16 + i) % 8192 + 1));
            }
            size_t end1 = clock();

            size_t begin2 = clock();
            for (size_t i = 0; i < ntimes; i++)
            {
                free(v[i]);
            }
            size_t end2 = clock();
            v.clear();

            malloc_costtime += (end1 - begin1);
            free_costtime += (end2 - begin2);
            }
        });
    }

    for (auto& t : vthread)
    {
        t.join();
    }

    printf("%u个线程并发执⾏%u轮次,每轮次malloc %u次: 花费:%u ms\n",
        nworks, rounds, ntimes, malloc_costtime.load());

    printf("%u个线程并发执⾏%u轮次,每轮次free %u次: 花费:%u ms\n",
        nworks, rounds, ntimes, free_costtime.load());

    printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
        nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}


// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
    std::vector<std::thread> vthread(nworks);
    std::atomic<size_t> malloc_costtime = 0;
    std::atomic<size_t> free_costtime = 0;

    for (size_t k = 0; k < nworks; ++k)
    {
        vthread[k] = std::thread([&]() {
            std::vector<void*> v;
            v.reserve(ntimes);

            for (size_t j = 0; j < rounds; ++j)
            {
                size_t begin1 = clock();
                for (size_t i = 0; i < ntimes; i++)
                {
                v.push_back(ConcurrentAlloc(16));
              
 //v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
                }
                size_t end1 = clock();

                size_t begin2 = clock();
                for (size_t i = 0; i < ntimes; i++)
                {
                    ConcurrentFree(v[i]);
                }
                size_t end2 = clock();
                v.clear();

                malloc_costtime += (end1 - begin1);
                free_costtime += (end2 - begin2);
            }
        });
    }

    for (auto& t : vthread)
    {
        t.join();
    }

    printf("%u个线程并发执⾏%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
        nworks, rounds, ntimes, malloc_costtime.load());
    printf("%u个线程并发执⾏%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
        nworks, rounds, ntimes, free_costtime.load());

    printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
        nworks, nworks * rounds * ntimes, malloc_costtime + free_costtime);
}

int main()
{
    size_t n = 10000;
    cout << "==========================================================" << endl;
    BenchmarkConcurrentMalloc(n, 4, 10);
    cout << endl << endl;

    BenchmarkMalloc(n, 4, 10);
    cout << "==========================================================" << endl;

    return 0;
}

九.使用tcmalloc源码中实现基数树进行优化

#pragma once
#include "Common.h"

// Single-level array
template <int BITS>
class TCMalloc_PageMap1
{
private:
    static const int LENGTH = 1 << BITS;
//计算存储页号需要多少位,32位下为2^19,也就是19个位
    void** array_;//二级指针,指针数组,数组每个元素都是void*

public:
    typedef uintptr_t Number;
// 页号类型:uintptr_t 兼容32/64位系统,存储整数型页号

    //构造函数:初始化数组
    explicit TCMalloc_PageMap1()
    {
        size_t size = sizeof(void*) << BITS;
        size_t alignsize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
        array_ = (void**)SystemAlloc(alignsize >> PAGE_SHIFT);
     
  // 步骤2:初始化数组所有元素为NULL,也就是指针指向空
        memset(array_, 0, sizeof(void*) << BITS);
    }

    // Return the current value for KEY. Returns NULL if not yet set,
    // or if k is out of range.
    // get方法:根据页号查询指针

    void* get(Number k) const
    {
        if ((k >> BITS) > 0)
// 边界检查:k >> BITS > 0 表示k超出2^BITS范围(无效页号)
        {
            return NULL;
        }
        return array_[k];
    }

    // REQUIRES "k" is in range "[0,2^BITS-1]".
    // REQUIRES "k" has been ensured before.
    //
    // Sets the value 'v' for key 'k'.

    void set(Number k, void* v)
    {
        array_[k] = v;
    }
};

// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 
{
private:

    // Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
    // 根节点位宽:固定5位 → 根节点有 2^5=32 个子节点

    static const int ROOT_BITS = 5;
    static const int ROOT_LENGTH = 1 << ROOT_BITS;
// 根节点数组长度=32

    // 叶子节点位宽 = 总位宽 - 根节点位宽
    static const int LEAF_BITS = BITS - ROOT_BITS;
    static const int LEAF_LENGTH = 1 << LEAF_BITS;
// 每个叶子节点的存储槽位数量

    // Leaf node,// 叶子节点:存储实际的页指针映射
    struct Leaf 
    {

        //用「动态创建的数组结构体」模拟树的叶子节点
        void* values[LEAF_LENGTH];// 每个叶子节点有 LEAF_LENGTH 个槽位
    };

    Leaf* root_[ROOT_LENGTH]; //根节点:32个指针,每个指向一个叶子节点(初始为NULL),指针数组
    void* (*allocator_)(size_t); // 内存分配器(函数指针,保存外部分配器)

public:
    typedef uintptr_t Number;

    //explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) 
    explicit TCMalloc_PageMap2()
    {
        memset(root_, 0, sizeof(root_));
// 根节点所有指针初始化为NULL
        PreallocateMoreMemory();
    }

    void* get(Number k) const 
    {
        const Number i1 = k >> LEAF_BITS;
// 高 LEAF_BITS 位 → 根节点索引(i1)
        const Number i2 = k & (LEAF_LENGTH - 1);// 低 LEAF_BITS 位 → 叶子节点索引(i2)

        // 步骤2:边界检查 + 叶子节点存在性检查
        if ((k >> BITS) > 0 || root_[i1] == NULL) 
        {
            return NULL;
        }

        // 步骤3:访问叶子节点的对应槽位
        return root_[i1]->values[i2];
    }

    void set(Number k, void* v) 
    {
        const Number i1 = k >> LEAF_BITS;
        const Number i2 = k & (LEAF_LENGTH - 1);
        assert(i1 < ROOT_LENGTH);
// 断言:根节点索引不越界(调试用)
        root_[i1]->values[i2] = v;// 赋值(前提:叶子节点已创建)
    }

    //Ensure方法:按需创建叶子节点(核心延迟分配逻辑)
    bool Ensure(Number start, size_t n) 
    {
 
      // 遍历需要确保的页号范围 [start, start+n-1]
        for (Number key = start; key <= start + n - 1;) 
        {
 
          // 步骤1:提取当前key的根节点索引
            const Number i1 = key >> LEAF_BITS;

            // 步骤2:溢出检查(i1 >= 32 则超出根节点范围)
            if (i1 >= ROOT_LENGTH)
                return false;

            // 步骤3:创建叶子节点(如果不存在)
            if (root_[i1] == NULL) 
            {

                // 分配叶子节点内存
                static ObjectPool<Leaf> leafpool;
                Leaf* leaf = (Leaf*)leafpool.New();
                memset(leaf, 0, sizeof(*leaf));
// 叶子节点槽位初始化为NULL
                root_[i1] = leaf;// 根节点指向新创建的叶子节点
            }

            // 步骤4:批量跳过当前叶子节点覆盖的页号(优化循环)
            // 例如 LEAF_BITS=15 → 叶子节点覆盖32768个页号,key直接跳到下一个叶子节点的起始页号

            key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
        }
        return true;
    }

    //预分配所有叶子节点
    void PreallocateMoreMemory() 
    {

        // Ensure(0, 1<<BITS):确保 0 ~ 2^BITS-1 所有页号对应的叶子节点都被创建
        Ensure(0, 1 << BITS);
    }
};

// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 
{
private:

    // INTERIOR_BITS = (BITS + 2)/3:根/中间节点的位宽(向上取整,保证均匀拆分)
    static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
    static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;
// 根/中间节点的子节点数

    // LEAF_BITS = BITS - 2*INTERIOR_BITS:叶子节点位宽(总位宽 - 两层内部节点位宽)
    static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
    static const int LEAF_LENGTH = 1 << LEAF_BITS;
// 叶子节点槽位数

    //内部节点(根/中间层):仅存储子节点指针
    struct Node 
    {
        Node* ptrs[INTERIOR_LENGTH];// 指向子节点(中间节点或叶子节点)
    };

    // 叶子节点:存储实际的页指针映射
    struct Leaf 
    {
        void* values[LEAF_LENGTH];
    };

    Node* root_; // 根节点(三级树的唯一入口)
    void* (*allocator_)(size_t);// 内存分配器

    Node* NewNode() 
    {
 
      // 分配Node内存
        Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));
        if (result != NULL) 
        {
            memset(result, 0, sizeof(*result));
// 子节点指针初始化为NULL
        }
        return result;
    }

public:
    typedef uintptr_t Number;

    explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) 
    {
        allocator_ = allocator;
// 保存分配器
        root_ = NewNode(); // 创建根节点(仅占 INTERIOR_LENGTH * 8 字节)
    }

    void* get(Number k) const 
    {
        const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
        const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
        const Number i3 = k & (LEAF_LENGTH - 1);
        if ((k >> BITS) > 0 ||
            root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) 
        {
            return NULL;
        }
        return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];
    }

    void set(Number k, void* v) 
    {
        ASSERT(k >> BITS == 0);
        const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
        const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
        const Number i3 = k & (LEAF_LENGTH - 1);
        reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
    }

    bool Ensure(Number start, size_t n) 
    {
        for (Number key = start; key <= start + n - 1;) 
        {
            const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
            const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
         
  // Check for overflow
            if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
                return false;
         
  // Make 2nd level node if necessary
            if (root_->ptrs[i1] == NULL) 
            {
                Node* n = NewNode();
                if (n == NULL) return false;
                root_->ptrs[i1] = n;
            }
     
      // Make leaf node if necessary
            if (root_->ptrs[i1]->ptrs[i2] == NULL) 
            {
                Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));
                if (leaf == NULL) return false;
                memset(leaf, 0, sizeof(*leaf));
                root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
            }
         
  // Advance key past whatever is covered by this leaf node
            key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
        }
        return true;
    }

    void PreallocateMoreMemory() 
    {}
};

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐