C语言简单实现线程池

0 前言

网上关于线程池的例子还是不少,简单明了的倒是比较少,看了网上的资料,打算借鉴网上的一些例子,自己实现以下。

线程的概念就不多说,首先说一下多线程的好处:多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。

那么为什么又需要线程池呢?

我们知道应用程序创建一个对象,然后销毁对象是很耗费资源的。创建线程,销毁线程,也是如此。因此,我们就预先生成一些线程,等到我们使用的时候在进行调度,于是,一些"池化资源"技术就这样的产生了。

 

1 线程池优点

下面使用网上资源验证线程池如何提高服务器性能的。

我所提到服务器程序是指能够接受客户请求并能处理请求的程序,而不只是指那些接受网络客户请求的网络服务器程序。

多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。但如果对多线程应用不当,会增加对单个任务的处理时间。可以举一个简单的例子:

假设在一台服务器完成一项任务的时间为T

     T1 创建线程的时间

      T2 在线程中执行任务的时间,包括线程间同步所需时间

      T3 线程销毁的时间              

显然T = T1+T2+T3。注意这是一个极度简化的假设。

可以看出T1,T3是多线程本身的带来的开销,我们渴望减少T1,T3所用的时间,从而减少T的时间。但一些线程的使用者并没有注意到这一点,所以在程序中频繁的创建或销毁线程,这导致T1和T3在T中占有相当比例。显然这是突出了线程的弱点(T1,T3),而不是优点(并发性)。

线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。

线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目。在看一个例子:

假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。我们比较利用线程池技术和不利于线程池技术的服务器处理这些请求时所产生的线程总数。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目或者上限(以下简称线程池尺寸),而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池尺寸是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。

这些都是假设,不能充分说明问题,下面我将讨论线程池的简单实现并对该程序进行对比测试,以说明线程技术优点及应用领域。

 

2 线程池的简单实现

一般一个简单线程池至少包含下列组成部分。

  1. 线程池管理器(ThreadPoolManager):用于创建并管理线程池
  2. 工作线程(WorkThread): 线程池中线程
  3. 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
  4. 任务队列:用于存放没有处理的任务。提供一种缓冲机制。

 

下面是代码:

全局文件:

/********************************** 
 * @author     wallwind@yeah.net
 * @date        2012/06/13
 * Last update: 2012/06/13
 * License:     LGPL
 * 
 **********************************/
 
 #ifndef _GLOBAL_H_
 #define _GLOBAL_H_
 
 #include <sys/types.h>
#include <sys/time.h>
#include <unistd.h>             /* */
#include <stdarg.h>
#include <stddef.h>             /* offsetof() */
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <pwd.h>
#include <grp.h>
#include <dirent.h>
#include <glob.h>
#include <sys/vfs.h>            /* statfs() */

#include <sys/uio.h>
#include <sys/stat.h>
#include <fcntl.h>

#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/resource.h>
#include <sched.h>

#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>        /* TCP_NODELAY, TCP_CORK */
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/un.h>

#include <time.h>               /* tzset() */
#include <malloc.h>             /* memalign() */
#include <limits.h>             /* IOV_MAX */
#include <sys/ioctl.h>
#include <sys/sysctl.h>
#include <crypt.h>
#include <sys/utsname.h>        /* uname() */
#include <semaphore.h>

#include <sys/epoll.h>
#include <poll.h>
#include <sys/syscall.h>
#include <pthread.h>
 #endif


thread.c文件

/********************************** 
 * @author      wallwind@yeah.net
 * @date        2012/06/13
 * Last update: 2012/06/13
 * License:     LGPL
 * 
 **********************************/
 
 
 
 #ifndef _THPOOL_
 #define _THPOOL_
 
 #include "global.h"
 /**
	定义一个任务节点
 **/
 typedef void* (*FUNC)(void* arg);
 
 
 typedef struct _thpool_job_t{
//	void* (*function)(void* arg);    //函数指针
	FUNC 			 function;
	void*                   arg;     //函数参数。
	struct _thpool_job_t* prev;     // 指向上一个节点
	struct _thpool_job_t* next;	    //指向下一个节点
 } thpool_job_t;
 
 /**
	定义一个工作队列
 **/
 
typedef struct _thpool_job_queue{
	thpool_job_t*    head;            //队列头指针 
	thpool_job_t*    tail;			   // 队列末尾指针
	int              jobN;					  //任务数
	sem_t*           queueSem;			  //x信号量
}thpool_jobqueue; 
 
 /**
	线程池
 **/
 
 typedef struct _thpool_t{
	pthread_t*      threads;    线程指针数
	int 		    threadsN;     线程数
	thpool_jobqueue* jobqueue;   // 指向队列指针
 }thpool_t;
 
 typedef struct thread_data{                            
	pthread_mutex_t *mutex_p;
	thpool_t        *tp_p;
}thread_data;

 //初始化线程池内部的线程数
thpool_t*  thpool_init(int threadN);

void thpool_thread_do(thpool_t* tp_p);

int thpool_add_work(thpool_t* tp_p, void *(*function_p)(void*), void* arg_p);

void thpool_destroy(thpool_t* tp_p);



int thpool_jobqueue_init(thpool_t* tp_p);



void thpool_jobqueue_add(thpool_t* tp_p, thpool_job_t* newjob_p);

int thpool_jobqueue_removelast(thpool_t* tp_p);

thpool_job_t* thpool_jobqueue_peek(thpool_t* tp_p);

void thpool_jobqueue_empty(thpool_t* tp_p);

 #endif

 

thread.c

/********************************** 
 * @author     wallwind@yeah.net
 * @date        2012/06/13
 * Last update: 2012/06/13
 * License:     LGPL
 * 
 **********************************/
 #include "global.h"
 #include "Thread.h"
 #include <errno.h>
 
 static int thpool_keepalive = 1;
 
 /* 创建互斥量,并初始化 */
  pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /* used to serialize queue access */
 
thpool_t*  thpool_init(int threadN)
{
	thpool_t* thpool;
	if(!threadN || threadN < 1)
		threadN = 1;
	///分配线程池内存
	thpool = (thpool_t*) malloc(sizeof(thpool_t));
	if(thpool ==NULL)
	{
		printf("malloc thpool_t error");
		return NULL;
	}
	//分配线程数
	thpool->threadsN = threadN;
	thpool->threads =(pthread_t*) malloc(threadN*sizeof(pthread_t));
	if(thpool->threads == NULL)
	{
		printf("malloc thpool->threads error");
		return NULL;
	}
	if(thpool_jobqueue_init(thpool))
		return -1;

	thpool->jobqueue->queueSem =(sem_t*)malloc(sizeof(sem_t));
	sem_init(thpool->jobqueue->queueSem,0,1);
	int t;
	for(t = 0;t< threadN ;t++)
	{
		pthread_create(&(thpool->threads[t]),NULL,(void *)thpool_thread_do,(void*)thpool);
	}
	
	return thpool;
}

void thpool_destroy(thpool_t* tp_p)
{
	int i ;
	thpool_keepalive = 0;
	
	for(i = 0;i < (tp_p->threadsN); i++)
	{
		if(sem_post(tp_p->jobqueue->queueSem))
		{
			fprintf(stderr, "thpool_destroy(): Could not bypass sem_wait()\n");
		}

	}
	if(sem_post(tp_p->jobqueue->queueSem)!=0)
	{
		fprintf(stderr, "thpool_destroy(): Could not destroy semaphore\n");
	}
	for(i = 0;i < (tp_p->threadsN); i++)
	{
		pthread_join(tp_p->threads[i],NULL);
	}
	thpool_jobqueue_empty(tp_p);
	
	free(tp_p->threads);
	free(tp_p->jobqueue->queueSem);
	free(tp_p->jobqueue);
	free (tp_p);
	
}
对双向队列初始化
/* Initialise queue */
int thpool_jobqueue_init(thpool_t* tp_p){
	tp_p->jobqueue=(thpool_jobqueue*)malloc(sizeof(thpool_jobqueue));      /* MALLOC job queue */
	if (tp_p->jobqueue==NULL) return -1;
	tp_p->jobqueue->tail=NULL;
	tp_p->jobqueue->head=NULL;
	tp_p->jobqueue->jobN=0;
	return 0;
}


void thpool_thread_do(thpool_t* tp_p)
{
	while(thpool_keepalive ==1)
	{
		if(sem_wait(tp_p->jobqueue->queueSem)) ///线程阻塞,等待通知 直到消息队列有数据
		{
			perror("thpool_thread_do(): Waiting for semaphore");
			exit(1);
		}
		if(thpool_keepalive)
		{
			//(void*)(*function)(void *arg);
			FUNC function;
			void* arg_buff;
			thpool_job_t*  job_p;
			
			pthread_mutex_lock(&mutex);
			 job_p = thpool_jobqueue_peek(tp_p);
			function = job_p->function;
			arg_buff = job_p->arg;
			if(thpool_jobqueue_removelast(tp_p))
				return ;
			pthread_mutex_unlock(&mutex);
			function(arg_buff);   //运行 你的方法。
			free(job_p);         释放掉。
		}
		else
		{
			return ;
		}
			
	}
	return ;
}

//得到第一个队列的一个节点
thpool_job_t* thpool_jobqueue_peek(thpool_t* tp_p)
{
	return tp_p->jobqueue->tail;
}
/删除队列的最后一个节点
int thpool_jobqueue_removelast(thpool_t* tp_p)
{
	if(tp_p ==NULL)
		return -1;
	thpool_job_t* theLastJob;
	theLastJob = tp_p->jobqueue->tail;
	switch(tp_p->jobqueue->jobN)
	{
		case 0:
			return -1;
		case 1:
			tp_p->jobqueue->head =NULL;
			tp_p->jobqueue->tail =NULL;
			break;
		default:
			theLastJob->prev->next = NULL;
			tp_p->jobqueue->tail = theLastJob->prev;
				
	}
	(tp_p->jobqueue->jobN)--;
	int reval;
	sem_getvalue(tp_p->jobqueue->queueSem,&reval);
	return 0;	
}

void thpool_jobqueue_add(thpool_t* tp_p, thpool_job_t* newjob_p)
{
	newjob_p->next = NULL;
	newjob_p->prev = NULL;
	thpool_job_t* oldFirstJob;
	oldFirstJob = tp_p->jobqueue->head;
	
	switch(tp_p->jobqueue->jobN)
	{
		case 0:
			tp_p->jobqueue->head = newjob_p;
			tp_p->jobqueue->tail = newjob_p;
			break;
		default:
			oldFirstJob->prev = newjob_p;
			newjob_p->next = oldFirstJob;
			tp_p->jobqueue->head = newjob_p;
	
	}
	(tp_p->jobqueue->jobN)++;
	sem_post(tp_p->jobqueue->queueSem);
	
	int reval;
	sem_getvalue(tp_p->jobqueue->queueSem,&reval);
	return;
}

/将消息加入线程池
int thpool_add_work(thpool_t* tp_p, void* (*function_p)(void*), void* arg_p)
{
	thpool_job_t* newjob;
	newjob = (thpool_job_t*) malloc(sizeof(thpool_job_t));
	
	if(newjob ==NULL)
	{
		fprintf(stderr, "thpool_add_work(): Could not allocate memory for new job\n");
		exit(1);
	}
	newjob ->function = function_p;
	newjob ->arg      = arg_p;
	pthread_mutex_lock(&mutex);
	thpool_jobqueue_add(tp_p,newjob);
	pthread_mutex_unlock(&mutex);     
	return 0;
}

///清空队列
void thpool_jobqueue_empty(thpool_t* tp_p)
{
	thpool_job_t* curjob;
	curjob = tp_p->jobqueue->tail;
	
	while(tp_p->jobqueue->jobN)
	{
		tp_p->jobqueue->tail = curjob->prev;
		free (curjob);
		curjob = tp_p->jobqueue->tail;
		(tp_p->jobqueue->jobN)--;
	}
	tp_p->jobqueue->head = NULL;
	tp_p->jobqueue->tail = NULL;
}



下面是mian函数文件

/********************************** 
 * @author      wallwind@yeah.net
 * @date        2012/06/13
 * Last update: 2012/06/13
 * License:     LGPL
 * 
 **********************************/

#include "global.h"
#include "Thread.h"

	void* task1()
	{
		printf("# Thread working: %u\n", (int)pthread_self());
		printf("  Task 1 running..\n");
	}


    /* Some arbitrary task 2 */
	void* task2(int a)
	{
		printf("# Thread working: %u\n", (int)pthread_self());
		printf("  Task 2 running..\n");
		printf("%d\n", a);
	}
int main()
{
	printf("~~~~~~~~~~~");
	thpool_t* thpool;
	int i;
	thpool = thpool_init(5);
	puts("Adding 20 tasks to threadpool");
	int a=54;
	for (i=0; i<20; i++){
		thpool_add_work(thpool, (void*)task1, NULL);
		thpool_add_work(thpool, (void*)task2, (void*)a);
	};


    puts("Will kill threadpool");
	thpool_destroy(thpool);
	
}


在linux下写程序少不了makefile文件。于是我自己写了一个比较通用的makefile文件。仅供大家参考

makefile 代码

SRCS=$(wildcard *.c)

OBJS=$(SRCS:.c=.o)

CC=gcc

INCLUDES=-I/

LIBS=-L/ -lpthread

CCFLAGS = -g -Wall -O0

cThreadPool : $(OBJS)

        $(CC) $^ -o $@ $(INCLUDES) $(LIBS)

%.o : %.cpp

        $(CC) -c $<$(CCFLAGS)


clean:

        rm *.o

        .PHONY:clean

运行效果如下图

./test
Created thread 0 in pool 
Created thread 1 in pool 
Created thread 2 in pool 
Created thread 3 in pool 
Adding 20 tasks to threadpool
# Thread working: 3086773136
  Task 1 running..
# Thread working: 3076283280
  Task 2 running..
54
# Thread working: 3086773136
  Task 1 running..
# Thread working: 3086773136
  Task 2 running..
54
# Thread working: 3076283280
  Task 1 running..
# Thread working: 3086773136
  Task 2 running..
54
# Thread working: 3076283280
  Task 1 running..
# Thread working: 3086773136
  Task 2 running..
54
# Thread working: 3076283280
  Task 1 running..
# Thread working: 3086773136
  Task 2 running..
54
# Thread working: 3076283280
  Task 1 running..
# Thread working: 3086773136
  Task 2 running..
54
# Thread working: 3076283280
  Task 1 running..
# Thread working: 3086773136
  Task 2 running..
54
# Thread working: 3076283280
  Task 1 running..
# Thread working: 3086773136
  Task 2 running..
54
# Thread working: 3076283280
  Task 1 running..
# Thread working: 3086773136
  Task 2 running..
54
# Thread working: 3076283280
  Task 1 running..
# Thread working: 3086773136
  Task 2 running..
54
Will kill threadpool


线程池也是参考了别人的。

 

更多文章欢迎访问:http://blog.csdn.net/wallwind 



 

GitHub 加速计划 / li / linux-dash
10.39 K
1.2 K
下载
A beautiful web dashboard for Linux
最近提交(Master分支:2 个月前 )
186a802e added ecosystem file for PM2 4 年前
5def40a3 Add host customization support for the NodeJS version 4 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐