[ 项目 ] tcmalloc简化版—高并发内存池
目录
前言
本篇文章讲解了本人完成高并发内存池项目实现的过程,此项目的原型是google一个开源项目——tcmalloc(ThreadCaching Malloc),用于替代在多线程场景下的内存分配,实现了多线程下内存的高效分配,也就是说在多线程场景下内存分配释放的效率要比系统的内存分配相关函数(malloc、free)的效率高。
tcmalloc的源码代码量与复杂多很高,实现也是十分优秀与庞大,甚至go语言将其当作自己的内存分配器,因此并不会将其完全实现,究其精华与核心框架实现一个简化版,在这个过程中锻炼自己的编码能力与面向对象的工程实现逻辑。在本项目中,会用到面向对象、链表、哈希桶、内存管理、单例模式、TLS无锁访问、多线程、互斥锁等知识,知识点之间相互联系耦合,若不是很熟悉以上知识点,学习起来会有些费劲,可以提前补习一下,但是我也会在关键点处详细讲解,尽量将实现过程一五一十地讲解明白,快往下看看吧。
基本介绍
-
高并发
由于用户数量庞大,用户同时访问或操作系统的行为称为并发。高并发性能表现在系统能够快速响应每个并发请求,并能够保持系统的稳定性和可用性,而不会因为并发请求过多而导致系统崩溃、响应时间延长或资源耗尽等问题,因此高并发往往是高负载应用所面临的挑战。为了实现高并发,系统需要优化应用的实现架构和设计,构思合理的实现思想,相关策略来提高系统的并发处理能力。
在本项目中,高并发体现在当多个线程并行地申请释放内存时,系统并不会因此崩溃,准确稳定的响应每个申请释放过程,并且响应效率相比之下较高。
-
内存池
首先说说池化技术,程序先向系统申请过量的资源管理起来,当每次申请该资源时,可以快速稳定地拿到该资源,不需要反复访问系统申请,要知道,频繁访问系统会使得整机效率降低,这往往是解决资源管理的一大挑战。这种技术在很多应用场景下很常见,比如对象池,线程池,内存池,连接池等,在本项目中当然主要讲解的是内存池。
针对于内存池,一方面不仅要解决效率的问题,还要解决内存碎片的问题,那么内存碎片是什么呢?内存碎片分为外碎片和内碎片,外碎片是内存空间不连续,无法满足一个大块内存的申请,那我们就需要将不连续的内存拼接在一起提供给大块内存申请,因此外碎片问题是可以解决的;而内碎片是由于内存对齐的需求,申请的内存并没有充分利用,还会剩余相比之下并不是很大的内存,但是这很难解决或者说这是硬性条件,无法解决,在后续讲解中可以见得。
外碎片与内碎片举例如图所示:
定长内存池
-
基本介绍
我们知道,平时所用的malloc、free是适用于所有场景的,也就是什么场景下都可以用,但是相比于专门针对某场景所设计的申请释放内存程序肯定是逊色的。本小节先简单实现一个定长内存池,也就是申请释放一个固定内存大小的内存池,作为项目进行前的一个开胃菜,目的之一在于再次巩固以下内存池的概念,以降低整体项目的理解复杂度,另一个目的也是因为在高并发内存池中会用到这个定长内存池。
-
框架设计
首先,根据面向对象的思想,我们将定长内存池实现成一个对象——ObjectPool,并且使用类模板实现,在定义一个定长内存池时就必须传入一个对象,后面在使用这个内存池申请内存时只能申请这个对象内存大小的内存,属性成员有_memory、_freeList、_remainSize,_memory为指向提前向系统申请到的大块内存的指针,申请内存就是从这大块的内存中分割成小块通过New函数传出去,_freeList为自由链表,通过Delete函数回收申请出去的内存块,拿回来的小内存块一个个挂在后面,_remainSize是记录申请的大块内存的剩余内存大小,以字节为单位。
代码:
//template<size_t N> //非类型模板参数也可以,N就是sizeof(T)
template<class T>
class ObjectPool
{
public:
T* New()
{
}
void Delete(T* obj)
{
}
private:
char* _memory = nullptr; //总内存的起始指针,使用char*是因为分割内存块可以以一个字节为单位
size_t _remainSize = 0; //剩余内存字节数
void* _freeList = nullptr; //回收“释放”回来的小内存块,链成单链表,此为链表头指针
//这里为啥void*?因为不需要加1减1,指针类型无所谓
};
-
具体实现
在定长内存池内部实现了两个主要接口——New、Delete,分别是对外提供的申请和释放内存的函数。
对于New函数,我们先查看自由链表中是否有回收回来的内存块,因为内存块大小是固定的,所以可以直接传出来;其次若自由链表不存在内存块,那么可以从_memory大内存中切一个小内存块返回出去,但在此之前需要判断_memory是否为空,若为空就需要从系统申请一大块内存(默认申请128kb),若不为空则可以直接切下一块传出去。
对于Delete函数,将返回来的内存块头插到到自由链表_freeList的后面。
注意:
①在自由链表中,我们使用一个内存块的前4/8个字节(32位机下是4byte,64位机下是8byte)来存储下一个内存块的地址,这也是为什么最低申请的内存大小是8byte。那么如何拿到一个内存块的前4/8个字节呢?比如说一个内存块是obj,则*(void**)obj就是obj的前4/8个字节,因为(void**)obj是将一个内存地址强转成一个指针的地址(很巧妙,32位下是4byte,64位机下是8byte),再解引用就拿到了前4/8个字节的内容,就是下一个内存块的地址。所以取一个内存块的前4/8个字节无论是读还是写下一个内存块的地址都很方便了。
②在New函数中,需要向系统申请内存我们使用系统调用函数SystemAlloc(size_t kpages),以页为单位从系统取内存的接口。
③在New函数中,从大内存块切取内存时要注意,对象内存大小小于8byte时,是按照8byte进行切取,不然无法存放下一个内存块的地址,这就是内碎片的问题所在,解决办法是在设计哈希桶的内存划分时尽可能降低内碎片的产生。
④New函数中申请内存完毕后,调用定位new为对象初始化;Delete函数中链入自由链表之前需要调用析构函数释放对象内部资源。
全部代码:
#ifdef _WIN32
#include <windows.h>
#else
//Linux下所需头文件
#endif
// 跳过malloc函数,直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpages)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpages << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
//template<size_t N> //非类型模板参数也可以,N就是sizeof(T)
template<class T>
class ObjectPool
{
public:
T* New()
{
T* obj; //指向所要申请内存块的指针
//1.从回收链表的内存中申请
if (_freeList)
{
obj = (T*)_freeList; //拿走链表的一个节点内存
_freeList = *(void**)_freeList;
new(obj)T; //定位new会调用T的构造函数初始化
return obj;
}
//2.使用总内存池
//判断总内存池剩余内存大小
if (_remainSize < sizeof(T))
{
_remainSize = 128 * 1024;
//_memory = (char*)malloc(_remainSize);
_memory = (char*)SystemAlloc(_remainSize >> 13); //原本剩余内存直接丢了,进程结束会自动回收
if (_memory == nullptr) //检查申请空间是否申请成功
throw std::bad_alloc();
}
//从总内存块中申请
size_t sz= sizeof(T);
if (sz < sizeof(void*)) //当类型大小不足以存下一个指针(比如T是char),我们也申请一个指针大小的内存
sz = sizeof(void*);
obj = (T*)_memory;
_memory += sz;
_remainSize -= sz;
new(obj)T; //定位new会调用T的构造函数初始化
return obj;
}
void Delete(T* obj)
{
obj->~T();
//头插
*(void**)obj = _freeList;
_freeList = obj;
}
private:
char* _memory = nullptr; //总内存的起始指针,使用char*是因为分割内存块可以以一个字节为单位
size_t _remainSize = 0; //剩余内存字节数
void* _freeList = nullptr; //回收“释放”回来的小内存块,链成单链表,此为链表头指针
//这里为啥void*?因为不需要加1减1,指针类型无所谓
};
-
性能测试
我们定义一个测试对象TreeNode,是一个自定义类,而不是内置类型,更有说服力。在性能测试函数中,分别使用定长内存池和malloc、free对TreeNode对象申请释放8轮,每轮分别10万次,对比总体所花费的时间,结果如下,可见定长内存池的效率较高一些,大概是malloc、free的四倍多。
代码:
struct TreeNode
{
int _val;
TreeNode* _left;
TreeNode* _right;
TreeNode()
:_val(0)
, _left(nullptr)
, _right(nullptr)
{}
};
//测试性能
void TestObjectPool()
{
// 申请释放的轮次
const size_t Rounds = 8;
// 每轮申请释放多少次
const size_t N = 100000;
size_t begin1 = clock();
std::vector<TreeNode*> v1;
v1.reserve(N);
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v1.push_back(new TreeNode);
}
for (int i = 0; i < N; ++i)
{
delete v1[i];
}
v1.clear();
}
size_t end1 = clock();
ObjectPool<TreeNode> TNPool;
size_t begin2 = clock();
std::vector<TreeNode*> v2;
v2.reserve(N);
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v2.push_back(TNPool.New());
}
for (int i = 0; i < N; ++i)
{
TNPool.Delete(v2[i]);
}
v2.clear();
}
size_t end2 = clock();
cout << "new cost time:" << end1 - begin1 << endl;
cout << "object pool cost time:" << end2 - begin2 << endl;
}
整体框架介绍
在当前主流开发环境中,很多都是多线程场景,同时申请内存时,必定存在激烈的锁竞争问题,其实malloc、free在面对这些场景下表现都是非常优秀的,但是会稍逊于准们针对多线程设计的高并发内存池。针对要实现的多线程内存池需要考虑到:
①在多线程下的锁竞争问题;
②性能问题,因此申请释放锁的性能开销很大,因此如何优化锁竞争才是解决性能问题的关键;
③内存碎片,这是内存管理方面的通病,如何满足需求又能解决内存碎片是一项挑战。
高并发内存池主要由以下三个部分组成:
- 线程缓存threadcache:线程缓存为每个线程独有一份,一个线程对应一个线程缓存,用于小于256KB的内存的分配,申请内存不需要加锁,这也正是这个并发线程池高效的设计之处;
- 中心缓存centralcache:中心缓存是所有线程所共享,所有线程缓存对应这一个中心缓存,每个线程缓存中的内存过剩或过少时都会通过这个中心缓存来达到平衡,可见这个中心缓存是需要锁的,但是可根据每个线程缓存申请或释放的内存大小不同设置不同的区域锁,降低锁竞争概率,同时中心缓存的内存被线程缓存取完时,会向下一层的页缓存拿去内存,来达到自己的内存平衡;
- 页缓存pagecache:页缓存的内存以页为单位存储及分配,通过分配与合并大块内存的机制来缓解内存碎片的问题。
整体结构图:
申请内存过程
从上面的整体框架可以看出,threadcache、centralcache、pagecache三者层层相连,环环相扣,申请内存的过程如同递归一样一层一层向下取,释放内存也如此,因此我们先讲解申请内存的过程,当出现一个线程第一次申请内存时,该程序就会创建一个线程缓存threadcache,能够满足用户所需内存则直接返回出去,若不能,threadcache就会向下一层的centralcache申请过量的内存放在自己那里,但是若centralcache的内存也不能够,则centralcache就会向系统申请大块内存,为centralcache提供所需内存。下面我们先看threadcache。
-
threadcache
1.基本介绍
首先,根据面向对象思想,threadcache是一个类,属性包括一个哈希桶结构,哈希桶元素为FreeList——封装自由链表的一个类,FreeList类内包含指向自由链表的指针、链表节点个数以及增删节点的接口,其中属性MaxSize在后面需要时会谈及到,这里用不到不做讲解。
ThreadCache类内的Allocate函数与Deallocate函数是提供给上层拿取和返回内存的接口,而FetchFromCentralCache函数和ListTooLong函数则是从下层拿取和返回内存的接口。
值得注意的是,类外实现了一个静态的TLS无锁访问指针,在一个线程内使用此指针去new一个ThreadCache类,就代表创建出了一个线程缓存。什么是TLS?TLS是一种变量的存储方法,这个变量在其所在线程内是全局可访问的,但不能被其他线程访问,而普通的全局变量,是所有线程都可以访问到的,这样就不可避免需要锁来控制,要知道加锁会大大降低效率。正常情况下,普通的全局变量在一个进程中只会有一份,即使创建多个线程看到的也是同一份,但TLS定义的全局变量,每个线程看到的是不同的一份,也就是每创建一个线程,都会生成一份变量。
此外,我们将取内存前4/8个字节的功能封装成一个函数NextObj,方便使用。
threadcache逻辑图:
框架:
//每个线程都有一个ThreadCache
class ThreadCache
{
public:
//申请释放内存对象
void* Allocate(size_t size);
//当无内存时,从中心缓存获取内存
void* FetchFromCentralCache(size_t index, size_t size);
void Deallocate(void* ptr, size_t size); //通过size计算该内存块属于哪个桶
//当内存链表太长时,还部分给CentralCache
void ListTooLong(FreeList& list, size_t size);
private:
FreeList _freeLists[NFREELIST];
};
static __declspec(thread) ThreadCache* pTLSThreadCache = nullptr; //TLS无锁访问
//返回一块内存的前4/8个字节
static void*& NextObj(void* obj)
{
return *(void**)obj;
}
//将一个freeList链表封装成一个类,方便插入删除
class FreeList
{
public:
void Push(void* obj)
{
assert(obj);
//头插
NextObj(obj) = _freeList;
_freeList = obj;
_size++;
}
void PushRange(void* start, void* end, size_t n)
{
NextObj(end) = _freeList;
_freeList = start;
_size += n;
}
void* Pop()
{
assert(_freeList);
void* obj = _freeList;
_freeList = NextObj(_freeList);
_size--;
return obj;
}
void PopRange(void*& start, void*& end, size_t n)
{
assert(n <= _size);
start = end = _freeList;
for (size_t i = 0; i < n - 1; i++)
{
end = NextObj(end);
}
_freeList = NextObj(end);
NextObj(end) = nullptr;
_size -= n;
}
bool Empty()
{
return _freeList == nullptr;
}
size_t& MaxSize()
{
return _maxSize;
}
size_t Size()
{
return _size;
}
private:
void* _freeList = nullptr;
size_t _maxSize = 1; //用于threadcache的每个桶中,表示从centralcache获取内存的批量数
//含义:一开始可以取1个,之后不断增多,代表这个内存数的数量的需求在增加,为了提高效率,
//每一次centralcache给threadcache的数量都比上一次多
//maxsize还有个作用就是作为链表内存对象过长的一个标志
size_t _size = 0;
};
2.具体实现
首先,先来讲解一下Roundup函数和Index函数(代码如下),如下图是256kb以下内存的内存划分范围以及所对应的对齐数和对应桶下标,ThreadCache的哈希桶结构设计成208个桶,从0号桶到207号桶划分不同区域,其中,[1,128]按照8byte对齐,128/8=16,对应[0,15]号桶;[129,1024]按照16byte对齐,(1024-128)/16=56,对应[16,71]号桶......以此类推。这样的划分方法可以使得整体的内碎片浪费控制在最多10%左右,达到最好内碎片降低效果。
而Roundup函数和Index函数就是据此所得出的计算对齐数和对应桶下标的函数,比如给你7byte,Roundup函数会返回8,向上对齐后为8byte,Index函数会返回0,对应0号下标的桶;再比如给你129byte,Roundup函数会返回144,向上对齐后为144byte,Index函数会返回16,对应16号下标的桶。
对于Allocate函数,根据所传入的字节数,计算出对齐之后的大小和对应桶下标,,先看对应桶是否存在内存块,有则直接pop出来传出去,没有则调用FetchFromCentralCache函数向下层取内存。
对于FetchFromCentralCache函数,我们需要先知道一次向CebtralCahce拿多少个内存块,这里我们使用慢开始反馈调节算法计算批量数(batchnum),一方面NumMoveSize函数可以给出批量数的最大值,一方面FreeList类内的MaxSize属性给出上一次取的批量数,而这进行比较之后取较小值为目标批量数,其中对于NumMoveSize函数,提供对应内存的批量数的上下限,保证内存小的上限值高(给多些),内存大的上限值低(给少点)。在计算出目标批量数之后,就可以调用CentralCache提供的FetchRangeObj函数从中取内存块,这个函数会返回真实拿到的批量数,这个数并不一定是目标批量数,但也至少保证必须有一个内存块,若是1个正好返回出去,若是多个,将其中1个返回出去,其他的放到对应的桶中。
代码:
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t alignNum = SizeClass::Roundup(size);
size_t index = SizeClass::Index(size);
if (!_freeLists[index].Empty())
{
return _freeLists[index].Pop();
}
else
{
return FetchFromCentralCache(index, alignNum);
}
}
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
//慢开始反馈调节算法
//1.取较小值保证了一次不会要太多,导致用不完浪费掉
//2.当不断有申请同一size大小的需求,那么batchNum就会不断增加,直到上限
//3.size越大,batch越小;反之,size越小,batch就越大
size_t batchNum = (std::min)(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size)); //第一个参数为慢开始增多的数量,第二个参数是数量上限
//这里min加上括号:因为window头文件中有min宏定义,std中min函数,发生冲突导致这里优先选择宏定义,加上括号表示阻止宏替换
if (batchNum == _freeLists[index].MaxSize())
_freeLists[index].MaxSize()++;
//上面是计算获取size大小的批量数,因为反复获取不好,如果你需要,我直接给你一批,上面的batchNum就是这一批多少个
//下面是真正从CentralCache获取内存的过程
void* start = nullptr;
void* end = nullptr;
size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
//actualNum是真实拿到的内存数量,必须一个以上
if (actualNum == 1)
{
return start;
}
else
{
_freeLists[index].PushRange(NextObj(start), end, actualNum - 1);
return start;
}
}
// 整体控制在最多10%左右的内碎片浪费
// 内存大小范围 对齐数 对应桶
// [1,128] 按照8byte对齐 freeList[0,16)
// [128+1,1024] 按照16byte对齐 freeList[16,72)
// [1024+1,8*1024] 按照128byte对齐 freeList[72,128)
// [8*1024+1,64*1024] 按照1024byte对齐 freeList[128,184)
// [64*1024+1,256*1024] 按照8*1024byte对齐 freeList[184,208)
/*static inline size_t _RoundUp(size_t bytes, size_t alignNum) //我写
{
return bytes / alignNum * alignNum + alignNum;
}*/
static inline size_t _RoundUp(size_t bytes, size_t alignNum) //大佬写法
{
return ((bytes + alignNum - 1) & ~(alignNum - 1));
}
//对齐大小计算,也就是给定一个大小,可以返回其对齐之后的内存大小,比如给5byte,返回8byte;给129,返回144
static inline size_t Roundup(size_t bytes)
{
if (bytes <= 128)
{
return _RoundUp(bytes, 8); //此范围按照8字节对齐
}
else if (bytes <= 1024)
{
return _RoundUp(bytes, 16); //按照16字节对齐
}
else if (bytes <= 8 * 1024)
{
return _RoundUp(bytes, 128);
}
else if (bytes <= 64 * 1024)
{
return _RoundUp(bytes, 1024);
}
else if (bytes <= 256 * 1024)
{
return _RoundUp(bytes, 8 * 1024);
}
else //超过256kb
{
return _RoundUp(bytes, 1 << PAGE_SHIFT);
}
return -1;
}
static inline size_t _Index(size_t bytes, size_t alignShift) //alignShift就是:2^alignShift=对齐数
{
return ((bytes + (1 << alignShift) - 1) >> alignShift) - 1;
}
//获取给定(对齐过后的)内存大小所在桶下标,比如5通过Randup函数计算得到对齐过后的内存大小为8,8对应0号桶,即freelist[0]
//比如129通过Randup函数计算得到对齐过后的内存大小为144,144对应16号桶,即freelist[16]
static inline size_t Index(size_t bytes) //大佬写法
{
assert(bytes < MAX_BYTES);
static int group_array[4] = { 16, 56, 56, 56 };
if (bytes <= 128)
{
return _Index(bytes, 3);
}
else if (bytes <= 1024)
{
return _Index(bytes - 128, 4) + group_array[0];
}
else if (bytes <= 8 * 1024)
{
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
}
else if (bytes <= 64 * 1024)
{
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1]
+ group_array[0];
}
else if (bytes <= 256 * 1024)
{
return _Index(bytes - 64 * 1024, 13) + group_array[3] +
group_array[2] + group_array[1] + group_array[0];
}
else
{
assert(false);
}
return -1;
}
// 计算一次thread cache从中心缓存获取多少个的上限值
// 根据size(对齐过后)大小:越大,给少点;越小,给多点,但同时也有一个上下限
// 比如size为8,上限为256*1024 / 8=32768,取512;size为8*1024,上限为256*1024 / 8*1024=32
static size_t NumMoveSize(size_t size)
{
assert(size > 0);
// [2, 512],一次批量移动多少个对象的(慢启动)上限值
// 小对象一次批量上限高
// 小对象一次批量上限低
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
-
centralcache
1.基本介绍
centralcache也是一个哈希桶结构,并且映射关系和threadcache中哈希桶是一样的,也就是说也可以用Index函数计算一个内存大小所在的桶下标,但是不同的是,哈希桶元素是SpanList,这是一个Span链表,节点是一个Span类。
SpanList:最重要的是SpanList是一个带头双向链表,属性包括一个Span指针和一个锁,这就是所谓的桶锁,在线程访问响应桶时,需要加锁防止线程安全问题,此外就是一些增删Span节点的接口。
Span:管理一个跨度的大块空间(以页为单位)的类,具体地,_pageId就是起始地址((void*)_pageId*8k),_n就是总大小(_n*8k就是总字节数),所谓的大块切分就是从起始地址开始切成小块挂在_freeList,小块合并就是将_freeList置空,即不看_freeList了,只看页号和页数。此外,Span类还存在指向自由链表的指针_freeList(用来指向一个个被切分的小内存块)、指向前后Span的指针、_useCount(所切分的小内存块的使用情况,一个内存块被使用了,值就加1)、_objSize(记录当前Span切分的小内存块的大小)、_isUse(记录当前Span是否被使用,也就是是否被切分成小内存块),后三者会在后面使用,目前只需要维护好其值。
CentralCache类除了哈希桶结构外,被实现成了单例模式(饿汉模式),此外,FetchRangeObj函数和GetOneSpan函数就是提供给上层拿取内存的接口,ReleaseListToSpans函数就是将内存返还给下层的接口。
centralcache逻辑图:
代码:
class CentralCache
{
public:
static CentralCache* GetInstance()
{
return &_sInst;
}
// 获取一个非空的span
Span* GetOneSpan(SpanList& list, size_t size);
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
// 将一定数量的内存块释放到对应span
void ReleaseListToSpans(void* start, size_t size);
private:
CentralCache()
{}
CentralCache(const CentralCache& obj) = delete;
CentralCache& operator=(const CentralCache& obj) = delete;
SpanList _spanLists[NFREELIST];
static CentralCache _sInst;
};
struct Span
{
PAGE_ID _pageId = 0; //页号
size_t _n = 0; //页的数量
Span* _next = nullptr; //前后指针 //双向链表适应于任何位置的插入删除
Span* _prev = nullptr;
void* _freeList = nullptr; //将大块空间切分对应多个小块,此指针就是将其链接起来
size_t _useCount = 0; //小块使用情况,分配出去就+1,回收回来就-1
size_t _objSize = 0; //记录当前span切分的小内存块的大小
//作用:在ConcurrentFree中优化掉需要传内存大小size的参数
bool _isUse = false; //此span是否在被使用,即span在pagecache就是false,在centralcache就是true
};
//带头双向循环
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
Span* Begin()
{
return _head->_next;
}
Span* End()
{
return _head;
}
bool Empty()
{
return _head->_next == _head;
}
void PushFront(Span* span)
{
Insert(Begin(), span);
}
Span* PopFront()
{
Span* front = _head->_next;
Erase(front);
return front;
}
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);
Span* prev = pos->_prev;
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head);
Span* prev = pos->_prev;
Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
//这里并不delete掉pos指向的span,只是从此链表中解掉,因为还要交给下一层处理
}
private:
Span* _head;
public:
std::mutex _mtx; //桶锁
};
2.具体实现
对于FetchRangeObj函数,提供给上层拿取目标批量的小内存块,通过Index函数计算对应桶下标,紧接着调用GetOneSpan函数从桶中获取一个非空Span,使得start指向Span中的自由链表的第一个节点,之后向后遍历出目标批量数个小内存块,但是其中可能并不会存在那么多小内存块,我们只要保证尽可能拿到多个(至少一个),让end指向最后一个内存块,然后返回真实拿取的内存块的个数。
对于GetOneSpan函数,从一个桶中获取一个非空Span并不简单,非空是指Span指针不为空且其中的自由链表指针不为空,遍历所有Span,若存在这样的Span就直接Pop出来再返回,若不存在就得去找PageCache要(调用PageCache的NewSpan函数),但是要过来的是一个未被切分的Span,接下来我们需要按照所需内存大小去切分这个Span,之后将所有小内存块挂在自由链表中,再将此Span挂在相应桶中即可。
值得注意的是,在GetOneSpan函数中调用Pagecache的NewSpan函数向PageCache要内存时,我们需要指定要多少页的Span,那么这里就需要一个函数NumMovePage(size)来根据所需内存块大小(size)计算需要向PageCache要多少页的Span,其实这个函数也是简单粗暴,直接计算threadcache向centralcache要的内存的批量数,然后根据内存块大小,计算总共需要的内存数,再转为页数,即为需要传入NewSpan函数的参数,注意至少也是要为1页。
代码:
//中心缓存对外提供的拿取内存的接口,保证提供一个或以上数量的内存大小,start和end是这多个内存块链表的头尾指针
//batchNum是外界需要的内存块的数量,size是内存块大小,返回真实提供的内存块的数量。其中start和end是输入型变量
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();
Span* span = GetOneSpan(_spanLists[index], size); //从桶中获取一个非空span以从中拿到一个或以上的内存数量返回给threadcache
assert(span);
assert(span->_freeList);
start = end = span->_freeList;
size_t actualNum = 1; //记录真实拿到的内存数量
while (actualNum < batchNum && NextObj(end) != nullptr)
{
end = NextObj(end);
actualNum++;
}
span->_freeList = NextObj(end);
span->_useCount += actualNum;
//cout << span->_useCount << endl;
NextObj(end) = nullptr;
_spanLists[index]._mtx.unlock();
return actualNum;
}
// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList)
{
return it;
}
it = it->_next;
}
//先把central_cache的桶锁解掉,以便于其他线程释放内存对象回来,不会阻塞
list._mtx.unlock();
//向PageCache申请内存
PageCache::GetInstance()->_mtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_isUse = true;
span->_objSize = size;
PageCache::GetInstance()->_mtx.unlock();
//计算此span这个大块内存的起始地址和总字节数
char* start = (char*)(span->_pageId << PAGE_SHIFT);
size_t bytes = span->_n << PAGE_SHIFT;
char* end = start + bytes;
//分割成size大小的小块内存链到自由链表(尾插)
span->_freeList = start;
start += size;
//void* tail = start; //写错,无语
void* tail = span->_freeList;
while (start < end)
{
NextObj(tail) = start;
tail = NextObj(tail);
start += size;
}
NextObj(tail) = nullptr; //注意切好以后让末尾指向空(这句不能忘,调试好久)
list._mtx.lock(); //上面解锁以后,中间是分割不需要加锁,那这里访问桶需要加锁
list.PushFront(span);
return span;
}
//根据所需内存块大小计算需要向PageCache要多少页的Span
static size_t NumMovePage(size_t size)
{
size_t num = NumMoveSize(size); //计算threadcache向centralcache要的内存块数量的上限值,保证给予的页数能够切分这些
size_t npage = num * size;
npage >>= PAGE_SHIFT; //字节数转页数
if (npage == 0)
npage = 1; //最低也是一页
return npage;
}
-
pagecache
1.基本介绍
从下面PageCache类实现的代码可以看出,PageCache也是一个单例模式(饿汉模式)实现的类,同时也有哈希桶结构,哈希桶元素也是一个SpanList,但是其中的Span节点不允许切分,以一个Span为单位拿取与返还,而且桶下标映射与threadcache和cebtralcache中的不同,这里是直接映射,并且以页为单位,也就是说1页的Span对应1号桶,2页的Span对应2号桶......,总共128个桶,对饮128号桶,即最大桶Span是128页。
此外,属性中包括一个对象为Span的定长内存池,这是因为在类函数中我们需要定义Span,这个资源就可以借助定长内存池,否则就得使用malloc/free,会与其产生耦合。还包括一个存储页号与Span* 映射关系的map容器,在合并Span时需要页号页号找到对应Span拼接起来。并且,可以看到PageCache单独一把锁,注意这里并不是桶锁,这个需要一把“全局”锁,为什么?
①虽然pagecache上层只有一个centralcache会向它申请内存(span),但是在我们处理申请大于256kb大小的内存块时是直接向pagecache申请的,这也涉及到线程安全问题;
②在pagecache中涉及到切割和合并span的操作,也就是访问一个桶的同时也是需要访问其他桶的。可以加桶锁,但是这样申请释放锁的频率会有所增加,相比之下使用大锁将整个哈希表都锁起来,来一个线程获得span时,其他线程只能等着,效率会更可观一些(申请释放一次全局锁要比多次申请释放局部锁的效率高)PageCache有三个接口——NewSpan函数为提供给上层拿取k页Span的接口,ReleaseSpanToPageCache函数是上层返还Span到PageCache中,PageCache去合并相邻的Span(原本大页Span切分出去的小页Span)为大页Span,而MapObjectToSpan函数则是针对于记录页号与Span* 映射关系的map容器,给定一个内存块首地址返回其所在Span。
pagecache逻辑图:
代码:
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
// 提供给上层拿取k页Span的接口
Span* NewSpan(size_t k);
// 获取从对象到span的映射
Span* MapObjectToSpan(void* obj);
// 释放空闲span回到Pagecache,并合并相邻的span
void ReleaseSpanToPageCache(Span* span);
private:
PageCache()
{}
PageCache(const PageCache& obj) = delete;
PageCache& operator=(const PageCache& obj) = delete;
ObjectPool<Span> _spanPool;
SpanList _spanLists[PAGES];
std::unordered_map<PAGE_ID, Span*> _idSpanMap; //两者都可以
//std::map<PAGE_ID, Span*> _idSpanMap;
static PageCache _sInst;
public:
std::mutex _mtx; //使用全局锁,而不是桶锁
};
2.具体实现
对于NewSpan函数,我们需要返回出去一个k页Span,首先先看k号桶下是否存在Span节点,若存在直接将其pop出来返回出去,与此同时记录好每个页号与此Span的映射关系,若不存在则向下查找是否存在大于k页的Span,若存在,此时我们就需要分割这个大于k页的Span,比如说这个Span是n页的,将其分割成一个k页的Span和一个n-k页的Span,k页Span返回出去,n-k页的Span挂在n-k页的桶中,所谓的分割其实就是修改起始页号和对应页数,注意也不能忘记记录好每个页号与此Span的映射关系,但是若连大于k页的Span都不存在,此时就需要向系统申请,申请128页的内存放置在最后一个桶中,然后递归调用NewSpan函数,再次去查找分割,但是这一次肯定会在最后一个桶停下,将其分割成所需Span和剩下页数的Span挂在对应位置,并将所需Span返回出去。但是!!!若k大于最大页数怎么办(即申请大于128页的内存),所以在以上逻辑之前,加一个申请大于128页内存直接像系统申请。
对于MapObjectToSpan函数,并不是简单的给定一个页号然后返回对应所在Span,而是给定一个内存块的首地址,返回其所在Span,这其实是有难度的。我们知道,地址其实就是所在页号加上一定的偏移量转成16进制数而成,那如果我们将地址除以一页的大小,得到就是所在页号,比如说一个地址为0x1234ff12,将其除以8*1024(即8k,也可以右移13位)得到37287页,那么根据这个页号就可以通过map容器得到所在Span。
代码:
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);
if (k > PAGES - 1) //申请128页以上的内存直接向系统申请
{
void* ptr = SystemAlloc(k);
//Span* span = new Span();
Span* span = _spanPool.New();
span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = k;
_idSpanMap[span->_pageId] = span;
return span;
}
//先看k桶有无span
if (!_spanLists[k].Empty())
{
//注意:不仅从大于k桶找的span需要去记录表id与span的映射,k桶也需要(不能忘,调试很久)
Span* kspan = _spanLists[k].PopFront();
for (PAGE_ID i = 0; i < kspan->_n; i++)
{
_idSpanMap[kspan->_pageId + i] = kspan;
}
return kspan;
}
//没有再看后续桶有无span
size_t i = k + 1;
while (i < PAGES)
{
if (!_spanLists[i].Empty())
{
Span* ispan = _spanLists[i].PopFront();
//分割成两个span,一个span返回出去,另一个插入到相应桶中
//Span* kspan = new Span;
Span* kspan = _spanPool.New();
kspan->_pageId = ispan->_pageId;
kspan->_n = k;
ispan->_pageId += k;
ispan->_n -= k;
_spanLists[ispan->_n].PushFront(ispan);
//存储ispan的首尾页号与ispan的映射关系,用处:pagecache回收内存时进行查找合并
//因为合并前后span时只需要用到span首尾页号
_idSpanMap[ispan->_pageId] = ispan;
_idSpanMap[ispan->_pageId + ispan->_n - 1] = ispan;
//建立kspan中所有页号和此span的映射,用处:threadcache中的小内存还给centralcache时
//能找到哪个小内存对应是哪个span中 && pagecache合并前后span
for (PAGE_ID i = 0; i < kspan->_n; i++)
{
_idSpanMap[kspan->_pageId + i] = kspan;
}
return kspan;
}
i++;
}
//再没有就向系统申请内存放入最后一个桶中
void* ptr = SystemAlloc(PAGES - 1);
//Span* span = new Span;
Span* span = _spanPool.New();
span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = PAGES - 1;
_spanLists[PAGES - 1].PushFront(span);
return NewSpan(k); //再递归去分割,也就是再次调用就会走到第二步返回
}
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
auto it = _idSpanMap[id];
assert(it != nullptr);
return it;
}
-
申请内存连通
针对于用户的使用,我们在定义一个申请内存函数ConcurrentAlloc,用户可传入所需申请内存的大小,若内存大小低于256k属于一个普遍情况,则是创建threadcache向centralcache申请,若内存大小高于256k,则直接向pagecache申请,而在pagecache中,若内存大小低于最大页数,则可以拿取对应桶中的Span,若高于则是直接向系统申请,也就是高于256kb大小的内存交给PageCache处理。对普遍情况补充一点,我们需要定义出一个threadcache类,使用定长内存池可以摆脱对malloc的依赖,紧接着就是调用ThreadCache提供的申请内存函数Allocate拿取内存。
代码:
static void* ConcurrentAlloc(size_t size)
{
//对于申请大于256kb大小的内存块,找pagecache
//大于256kb的内存块:①256k<size<=128页 ②size>128页
//①可以从pagecache的桶中拿,②在newspan函数中实现向堆申请
if (size > MAX_BYTES)
{
size_t alignSize = SizeClass::Roundup(size);
size_t kpage = alignSize >> PAGE_SHIFT;
PageCache::GetInstance()->_mtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(kpage);
span->_objSize = size;
PageCache::GetInstance()->_mtx.unlock();
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
return ptr;
}
else
{
if (pTLSThreadCache == nullptr)
{
static ObjectPool<ThreadCache> tcPool;
//pTLSThreadCache = new ThreadCache;
pTLSThreadCache = tcPool.New();
}
return pTLSThreadCache->Allocate(size);
}
}
释放内存过程
综上,申请内存过程实现连通,用户调用ConcurrentAlloc函数可以申请所需内存。释放内存的过程也是一样,需要释放的内存块放回入ThreadCache,若一直释放会导致桶的链表太长(“长”的标准是可以自己设置的),下一步则需要拿下一部分的内存块还给centralcache,当CentralCache中某个Span的内存块全部还回来了(Span.usecount==0),代表这个Span暂时用不到了,可以还给下一层PageCache了,PageCache拿到这个Span并不是直接将其挂在对应桶中,而是合并此Span的相邻Span,发现相邻Span也用不到了,就可以合并成大页的Span挂在更大的桶中,以便于需要更大的内存时,可以提供出去,这就是解决外碎片问题的关键所在。具体先看看ThreadCache怎么做的吧。
-
threadcache
对于Deallocate函数,传入所需释放的内存块首地址,据此通过Index函数算出ThreadCache的桶下标,将内存块push进去即可。但是一直push总有过长的时候,设置策略——当桶长度大于当时向CentralCache申请的内存块批量数上限值时(也可以设置为上限值的2/3或70%),就将一部分内存块pop出来返还给CentralCache,这依赖于ListTooLong函数。
对于ListTooLong函数,指定好是哪一个桶的链表过长了,函数体内只需要将上限值大小长度的内存块pop出来,交给CentralCache的ReleaseListToSpans函数。
代码:
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
//如果自由链表长度过长,就将一部分还给centralcache
//“过长”的判定可自由设置,tcmalloc中“过长”=长度达到一次向centralcache申请内存块的数量(MaxSize) || 总内存达到一定量
//这里做了简化,仅看第一个判定
if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
{
ListTooLong(_freeLists[index], size);
}
}
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
void* start = nullptr;
void* end = nullptr;
list.PopRange(start, end, list.MaxSize());
CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
-
centralcache
将一串ThreadCache传过来的内存块,对应找到所在Span这一步是不容易的,因为ThreadCache的每一个内存块我们是不知道属于CentralCache的哪一个Span中,但是我们又必须将每个内存块放到所在的Span的对应自由链表中,此时这就可以见得PageCache中的map容器的重要性了。遍历所有内存块,根据内存块首地址通过MapObjectToSpan函数得到其所在Span,将其插入到自由链表中即可,插入之后我们就需要判断当前Span切分出去的内存块是否还回来完了,若还回来完了(Span->usecount==0),就需要将此Span pop出来还给下一层PageCache,即交给PageCache的ReleaseSpanToPageCache函数。
代码:
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();
while (start)
{
void* tmp = NextObj(start);
//找到每个内存块对应的span
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
//将内存块插入到对应span的freeList中
NextObj(start) = span->_freeList;
span->_freeList = start;
span->_useCount--;
//cout << span->_useCount << endl;
if (span->_useCount == 0)
{
//说明这一span的内存全部归还回来了,可以将其还给pagecache
_spanLists[index].Erase(span);
span->_next = span->_prev = nullptr;
span->_freeList = nullptr; //置空表示合并掉freelist的小内存块
//还给pagecache,此时已经不再用此桶了,先解掉,防止其他线程不能访问此桶
_spanLists[index]._mtx.unlock();
PageCache::GetInstance()->_mtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_mtx.unlock();
_spanLists[index]._mtx.lock(); //回来再锁上
}
start = tmp; //遍历start到end所有内存块
}
_spanLists[index]._mtx.unlock();
}
-
pagecache
对于ReleaseSpanToPageCache函数,拿到CentralCache释放的Span,我们并不直接将其挂在相应桶中,应当合并其相邻Span,什么意思?当初CentralCache向PageCache拿取内存时,是将一个大页Span切成所需Span和剩下页的Span,当CentralCache还回来一个Span时,我们也应当找出其相邻Span进行合并成为大页Span,防止再次需要大页Span时没有,这也印证了外碎片的解决办法。
函数体内,因为PageCache的NewSpan函数承担了申请超大块内存(128页以上)的功能,因此这里也需要具备释放超大块内存的功能,所以一开始我们先根据Span的页数判断是否是向系统直接申请的内存,若是则直接释放给系统,若不是就进入到该函数的主要功能。
从当前Span开始,循环向前合并,根据该Span的起始页数,减一得到相邻的前一Span的末尾页数,调用map容器查找其所在Span即为相邻的前一Span,若不存在则说明该Span前无Span了,若存在则查看前一Span的isUse属性得到其是否在被使用(是否被切分出去使用),若正在被使用,则合并中断,若未被使用则需要再查看前一Span和当前Span的页数之和是否大于最大桶页数,若大于则说明前一Span并不是PageCache申请的(可能是直接向系统申请的),只是恰好内存前后相邻了,若不大于则可以合并了,合并其实也就是留下一个Span,然后页数加在一起,因此循环向前合并直到以上某条件不满足中断合并。从该Span开始循环向后合并也是一样,最终该Span会形成一个较大的Span,将其挂在相应桶中即可。
代码:
void PageCache::ReleaseSpanToPageCache(Span* span)
{
if (span->_n > PAGES - 1) //根据span页数来判断是否是向系统直接申请
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
//delete span;
_spanPool.Delete(span);
return;
}
//对span前后的页进行尝试合并,缓解内存碎片问题
while (1) //向前合并
{
PAGE_ID prevId = span->_pageId - 1;
auto ret = _idSpanMap.find(prevId);
//三种情况退出循环
//前一页并不在申请内存的范围内
if (ret == _idSpanMap.end())
{
break;
}
//前一页所在span在使用中
Span* prevSpan = ret->second;
if (prevSpan->_isUse == true)
{
break;
}
//当前span与前一span合并的总页数超过了桶能存放的最大页数(128)
//一开始从128页开始申请,怎么可能会超过128呢?
//因为有可能申请了两个及以上的128页的span并且正好后一个span与前一个span地址上是连续的,不加以防止就会两个span混合合并
if (span->_n + prevSpan->_n > PAGES - 1)
{
break;
}
//走到这就可以合并了
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
_spanLists[prevSpan->_n].Erase(prevSpan);
//delete prevSpan;
_spanPool.Delete(prevSpan);
}
while (1) //向后合并
{
PAGE_ID nextId = span->_pageId + span->_n;
auto ret = _idSpanMap.find(nextId);
//三种情况退出循环
//前一页并不在申请内存的范围内
if (ret == _idSpanMap.end())
{
break;
}
//前一页所在span在使用中
Span* nextSpan = ret;
if (nextSpan->_isUse == true)
{
break;
}
//当前span与前一span合并的总页数超过了桶页数的最大范围
if (span->_n + nextSpan->_n > PAGES - 1)
{
break;
}
span->_n += nextSpan->_n;
_spanLists[nextSpan->_n].Erase(nextSpan);
//delete nextSpan;
_spanPool.Delete(nextSpan);
}
//前后合并之后形成一个大的span,再链到桶中、设置当前状态、以及存储此span的首尾id和span的映射关系
_spanLists[span->_n].PushFront(span);
span->_isUse = false;
_idSpanMap[span->_pageId] = span;
_idSpanMap[span->_pageId + span->_n - 1] = span;
}
-
释放内存连通
针对于用户的使用,我们也定义一个申请内存函数ConcurrentFree,用户传入释放内存的首地址,若内存块大小大于高并发内存池的最大内存(128页),则说明此内存是在ConcurrentAlloc函数中向系统直接申请的,因此这里也是直接将内存块还给系统(同样地,也是交给PageCache的ReleaseSpanToPageCache来做);若内存块大小不大于最大内存,则是调用ThreadCache的Deallocate函数进行释放。
值得注意的是,按理来说提供给用户释放内存的接口不该要求用户提供内存的大小,但是ThreadCache的Deallocate函数又需要,所以代码块后半部分的函数是针对ConcurrentFree函数的优化,即去除参数size,办法其实就是在每个Span中加上一个属性_objSize记录此Span被切分的小块内存块的大小,在通过内存块首地址找到所在Span即可得到小内存块的大小,进而传入Deallocate函数。
代码:
static void ConcurrentFree(void* ptr, size_t size)
{
if (size > MAX_BYTES) //大块内存是向系统申请的,因此释放是返给系统
{
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr); //单独加锁
PageCache::GetInstance()->_mtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_mtx.unlock();
}
else
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
}
//通过span中的_objsize优化掉size参数:由地址得到页号,映射到span得到_objSize
//static void ConcurrentFree(void* ptr)
//{
// Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr); //单独加锁
// size_t size = span->_objSize;
//
// if (size > MAX_BYTES) //大块内存是向系统申请的,因此释放是返给系统
// {
// PageCache::GetInstance()->_mtx.lock();
// PageCache::GetInstance()->ReleaseSpanToPageCache(span);
// PageCache::GetInstance()->_mtx.unlock();
// }
// else
// {
// assert(pTLSThreadCache);
// pTLSThreadCache->Deallocate(ptr, size);
// }
//}
多线程环境下对比malloc
针对于多线程环境下ConcurrentAlloc与malloc的对比,我们写一个测试函数(如下代码块)分别测试nworks个线程下rounds轮次申请释放ntimes次随机内存大小的内存,这里我们选择4个线程下10轮申请释放100000次内存,对比结果如下:
通过结果可以发现,多线程下ConcurrentAlloc(我们的方法)确实比malloc的效率要高些,性能相对好一些。但是,这并不是我们的最终目标,我的意思是还可以在优化,具体可见下一小结的讲解,这里先贴出优化后的高并发内存池与malloc的对比结果,可见效率差距又扩大了一些。
代码:
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&, k]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
//v.push_back(malloc(16));
v.push_back(malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
free(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, malloc_costtime.load());
printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",nworks, rounds, ntimes, free_costtime.load());
printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}
// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector<std::thread> vthread(nworks);
std::atomic<size_t> malloc_costtime = 0;
std::atomic<size_t> free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&]() {
std::vector<void*> v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
//v.push_back(ConcurrentAlloc(16));
v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
ConcurrentFree(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, malloc_costtime.load());
printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
nworks, rounds, ntimes, free_costtime.load());
printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}
int main()
{
size_t n = 10000;
cout << "==========================================================" <<endl;
BenchmarkConcurrentMalloc(n, 4, 10);
cout << endl << endl;
BenchmarkMalloc(n, 4, 10);
cout << "==========================================================" <<
endl;
return 0;
}
基数树优化
基于最终的性能对比结果所得,多线程环境下高并发内存池的性能优势并没有太明显,因此需要对该程序进行优化,但是可优化之处并不是凭感觉所得,而是借助性能评测工具(由于是在Visual Studio 2019开发完成,因此借助VS2019中的性能评测工具)。评测结果显示,多处消耗性能的地方在于PageCache中的map容器(通过页号查找所在Span),因为不仅这个功能需多次使用,并且访问此容器需要加锁(要知道加锁释放锁的性能消耗是非常大的)。
查阅tcmalloc源码以后,其实可以发现tcmalloc使用的是【基数树】代替PageCache的map容器,不仅可以快速查询,更重要的是访问无需加锁,这就使得性能得到很大的改善。为什么不需要加锁呢?因为基数树的内部结构在增加新的键值对时是不动的,不像map或unordered_map那样,比如说map内部的红黑树,当写入map容器时,红黑树的结构是很大概率发生变化的,所以需要加锁保护,防止结构变化导致其他线程读取的内容失效,但是基数树的内部结构原本就会固定好,所以无需加锁保护。
基数树代码如下,将PageCache中的map换成TCMalloc_PageMap1<32 - PAGE_SHIFT>,且将PageCache的接口函数实现中的容器读写函数换成基数树的对应读写函数即可。
基数树代码:
// 使用单层数组
template <int BITS>
class TCMalloc_PageMap1 {
private:
static const int LENGTH = 1 << BITS;
void** array_=nullptr;
public:
typedef uintptr_t Number;
explicit TCMalloc_PageMap1() {
size_t size = sizeof(void*) << BITS;
size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
memset(array_, 0, sizeof(void*) << BITS);
}
// key存在返回对应value,否则返回NULL
void* get(Number k) const {
if ((k >> BITS) > 0) {
return NULL;
}
return array_[k];
}
// 设置键值对 <k,v>
void set(Number k, void* v) {
array_[k] = v;
}
};
后记
事实上,tcmalloc的源码代码量是以万行为单位的,我们实现的简化版本只以千行为单位,可见tcmalloc的很多细节是并没有在此项目中展示出来,还需要进一步的学习扩充这个项目,这将作为下一步的目标。这里贴出tcmalloc的源码链接【tcmalloc源码】以及大佬对tcmalloc的学习理解经验【tcmalloc参考学习】,以供大家共同学习,有疑惑的也可在评论区写出我们共同商讨,OVER!
更多推荐
所有评论(0)