上一篇文章使用fork函数实现了多进程并发服务器,但是也提到了一些问题:

  1. fork是昂贵的。fork时需要复制父进程的所有资源,包括内存映象、描述字等;
  2. 目前的实现使用了一种写时拷贝(copy-on-write)技术,可有效避免昂贵的复制问题,但fork仍然是昂贵的;
  3. fork子进程后,父子进程间、兄弟进程间的通信需要进程间通信IPC机制,给通信带来了困难;
  4. 多进程在一定程度上仍然不能有效地利用系统资源;
  5. 系统中进程个数也有限制。

  下面就介绍实现并发服务器的另外一种方式,使用多线程实现。多线程有助于解决以上问题。

线程基础

  关于线程的概念就不介绍了,先了解一下linux下线程的一些基本操作。

线程基础函数

  • pthread_create 创建线程

  pthread_create 函数用于创建新线程。当一个程序开始运行时,系统产生一个称为初始线 程或主线程的单个线程。额外的线程需要由 pthread_create 函数创建。 pthread_create 函数原型如下:

#include <pthread.h> 
int pthread_create(pthread_t *tid, const pthread_attr_t *attr, void *(*func)(void *), void *arg); 

  如果新线程创建成功,参数 tid 返回新生成的线程 ID。一个进程中的每个线程都由一个 线程 ID 标识,其类型为 pthread_t。attr 指向线程属性的指针。每个线程有很多属性包括:优 先级、起始栈大小、是否是守护线程等等。通常将 attr 参数的值设为 NULL,这时使用系统 默认的属性。
  但创建完一个新的线程后,需要说明它将执行的函数。函数的地址由参数 func 指定。该函数必须是一个静态函数,它只有一个通用指针作为参数,并返回一个通用指针。该执行函 数的调用参数是由 arg 指定,arg 是一个通用指针,用于往 func 函数中传递参数。如果需要传递多个参数时,必须将它们打包成一个结构,然后让 arg 指向该结构。线程以调用该执行 函数开始。
  如果函数调用成功返回 0,出错则返回非 0。

  常见的返回错误值:

EAGAIN:超过了系统线程数目的限制。
ENOMEN:没有足够的内存产生新的线程。
EINVAL:无效的属性attr值。

  示例代码:

#include <pthread.h>
#include <stdio.h>
pthread_t  tid;
void *ex()
{
    printf("this is a thread");
}
void main()
{
    pthread_create(&tid,NULL,ex,NULL);
}

  给线程传递参数:

void *function(void *arg); 
struct ARG { 
     int connfd; 
     int other;   //other data 
 }; 
 void main()  
 { 
     struct ARG arg; 
     int connfd,sockfd; 
     pthread_t tid; 
     //...
     While(1) 
     {     
         if((connfd = accept(sockfd,NULL,NULL))== -1) 
         { 
              //handle exception                    
          } 
          arg.connfd = connfd; 
          if(pthread_create(&tid, NULL, funtion, (void *)&arg)) 
          { 
              // handle exception 
          } 
      } 
 } 
 void *funtion(void *arg) 
 { 
    struct  ARG info; 
    info.connfd = ((struct ARG *)arg) -> connfd; 
    info.other = ((struct ARG *)arg) -> other; 
    //… 
    close(info.connfd); 
    pthread_exit(NULL); 
 }
  • pthread_join
      看这个函数首先提出一个概念,线程的类型。线程分为两类:可联合的和分离的。

    1. 默认情况下线程都是可联合的。可联合的线程终止 时,其线程 ID 和终止状态将保留,直到线程调用 pthread_join 函数。
    2. 而分离的线程退出后, 系统将释放其所有资源,其他线程不能等待其终止。如果一个线程需要知道另一个线程什么 时候终止,最好保留第二个线程的可联合性。

  pthread_join 函数与进程的 waitpid 函数功能类似,等待一个线程终止。
pthread_join 函数原型如下:

#inlcude <pthread.h>
int pthread_join(pthread_t tid, void **status); 

  参数 tid 指定所等待的线程 ID。该函数必须指定要等待的线程,不能等待任一个线程结束。要求等待的线程必须是当前进程的成员,并且不是分离的线程或守护线程。
  几个线程不 能同时等待一个线程完成,如果其中一个成功调用 pthread_join 函数,则其他线程将返回 ESRCH 错误。
  如果等待的线程已经终止,则该函数立即返回。如果参数 status 指针非空,则 指向终止线程的退出状态值。
  该函数如果调用成功则返回 0,出错时返回正的错误码。

  • pthread_detach
      pthread_detach 函数将指定的线程变成分离的。 pthread_detach 函数原型如下:
#inlcude <pthread.h> 
int pthread_detach(pthread_t tid) ;

  参数 tid 指定要设置为分离的线程 ID。

  • pthread_self
      每一个线程都有一个 ID,pthread_self 函数返回自己的线程 ID。 pthread_self 函数原型如下:
#inlcude <pthread.h> 
pthread_t pthread_self(void); 

  参数 tid 指定要设置为分离的线程 ID。 函数返回调用函数的线程 ID。
  例如,线程可以通过如下语句,将自己设为可分离的:

pthread_detach(pthread_self()); 
  • pthread_exit
      函数 pthread_exit 用于终止当前线程,并返回状态值,如果当前线程是可联合的,则其 退出状态将保留。 pthread_exit函数原型如下:
#include <pthread.h> 
void pthread_exit(void *status);

  参数 status 指向函数的退出状态。这里的 status 不能指向一个局部变量,因为当前线程 终止后,其所有局部变量将被撤销。
  该函数没有返回值。
  还有两种方法可以使线程终止:

  1. 启动线程的函数 pthread_create 的第三个参数返回。该返回值就是线程的终止状态。
  2. 如果进程的 main 函数返回或者任何线程调用了 exit 函数,进程将终止,线程将随之 终止。

  下面可以看一下多线程并发服务器的实例了,需要注意的是,线程建立后,父、子线程不需要关闭任何的描述符,因为线程中使用的描述符是共享进程中的数据。

#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <unistd.h>  
#include <sys/types.h>  
#include <sys/socket.h>  
#include <netinet/in.h>  
#include <arpa/inet.h>  
#include <pthread.h>  

#define PORT 1234  
#define BACKLOG 5  
#define MAXDATASIZE 1000  

void process_cli(int connfd, struct sockaddr_in client);  
void *function(void* arg);  
struct ARG {  
    int connfd;  
    struct sockaddr_in client;  
};  

void main()  
{  
    int listenfd,connfd;  
    pthread_t  tid;  
    struct ARG *arg;  
    struct sockaddr_in server;  
    struct sockaddr_in client;  
    socklen_t  len;  

    if ((listenfd =socket(AF_INET, SOCK_STREAM, 0)) == -1) {  
        perror("Creatingsocket failed.");  
        exit(1);  
    }  

    int opt =SO_REUSEADDR;  
    setsockopt(listenfd,SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));  

    bzero(&server,sizeof(server));  
    server.sin_family=AF_INET;  
    server.sin_port=htons(PORT);  
    server.sin_addr.s_addr= htonl (INADDR_ANY);  
    if (bind(listenfd,(struct sockaddr *)&server, sizeof(server)) == -1) {  
        perror("Bind()error.");  
        exit(1);  
    }  

    if(listen(listenfd,BACKLOG)== -1){  
        perror("listen()error\n");  
        exit(1);  
    }  

    len=sizeof(client);  
    while(1)  
    {  
        if ((connfd =accept(listenfd,(struct sockaddr *)&client,&len))==-1) {  
            perror("accept() error\n");  
            exit(1);  
        }  
        arg = (struct ARG *)malloc(sizeof(struct ARG));  
        arg->connfd =connfd;  
        memcpy((void*)&arg->client, &client, sizeof(client));  

        if(pthread_create(&tid, NULL, function, (void*)arg)) {  
            perror("Pthread_create() error");  
            exit(1);  
        }  
    }  
    close(listenfd);  
}  

void process_cli(int connfd, struct sockaddr_in client)  
{  
    int num;  
    char recvbuf[MAXDATASIZE], sendbuf[MAXDATASIZE], cli_name[MAXDATASIZE];  

    printf("Yougot a connection from %s. \n ",inet_ntoa(client.sin_addr) );  
    num = recv(connfd,cli_name, MAXDATASIZE,0);  
    if (num == 0) {  
        close(connfd);  
        printf("Clientdisconnected.\n");  
        return;  
    }  
    cli_name[num - 1] ='\0';  
    printf("Client'sname is %s.\n",cli_name);  

    while (num =recv(connfd, recvbuf, MAXDATASIZE,0)) {  
        recvbuf[num] ='\0';  
        printf("Receivedclient( %s ) message: %s",cli_name, recvbuf);  
        int i;  
        for (i = 0; i <num - 1; i++) {  
            if((recvbuf[i]>='a'&&recvbuf[i]<='z')||(recvbuf[i]>='A'&&recvbuf[i]<='Z'))  
            {  
                recvbuf[i]=recvbuf[i]+ 3;  
                if((recvbuf[i]>'Z'&&recvbuf[i]<='Z'+3)||(recvbuf[i]>'z'))  
                recvbuf[i]=recvbuf[i]- 26;  
            }  
            sendbuf[i] =recvbuf[i];  
        }  
        sendbuf[num -1] = '\0';  
        send(connfd,sendbuf,strlen(sendbuf),0);  
    }  
    close(connfd);  
}  

void *function(void* arg)  
{  
    struct ARG *info;  
    info = (struct ARG*)arg;  
    process_cli(info->connfd,info->client);  
    free (arg);  
    pthread_exit(NULL);  
}  

线程安全性

  上面的示例代码服务器端的业务逻辑都比较简单,没有涉及到共享数据产生的同步问题。在某些情况下,我们需要多个线程共享全局数据,在访问这些数据时就需要用到同步锁机制。而在共享线程内的全局数据时,可以使用Linux提供的线程特定数据TSD解决。

同步机制

  在linux系统中,提供一种基本的进程同步机制—互斥锁,可以用来保护线程代码中共享数据的完整性。
  操作系统将保证同时只有一个线程能成功完成对一个互斥锁的加锁操作。
  如果一个线程已经对某一互斥锁进行了加锁,其他线程只有等待该线程完成对这一互斥锁解锁后,才能完成加锁操作。

互斥锁函数
pthread_mutex_lock(pthread_mutex_t  *mptr)

参数说明:

mptr:指向互斥锁的指针。
该函数接受一个指向互斥锁的指针作为参数并将其锁定。如果互斥锁已经被锁定,调用者将进入睡眠状态。函数返回时,将唤醒调用者。
如果互斥锁是静态分配的,就将mptr初始化为常值PTHREAD_MUTEX_INITIALIZER。 

  锁定成功返回0,否则返回错误码。

pthread_mutex_unlock(pthread_mutex_t  *mptr);

  用于互斥锁解锁操作。成功返回0,否则返回错误码。

示例代码:

#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
int myglobal;
pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
void *thread_function(void *arg) {
    int i, j;
    for (i = 0; i < 5; i++) {
        pthread_mutex_lock(&mymutex);
        j = myglobal;
        j = j + 1;
        printf(".");
        fflush(stdout);
        sleep(1);
        myglobal = j;
        pthread_mutex_unlock(&mymutex);
    }
    return NULL;
}
int main(void) {
    pthread_t mythread;
    int i;
    if (pthread_create(&mythread, NULL, thread_function, NULL)) {
        printf("error creating thread.");
        abort();
    }
    for (i = 0; i < 5; i++) {
        pthread_mutex_lock(&mymutex);
        myglobal = myglobal + 1;
        pthread_mutex_unlock(&mymutex);
        printf("o");
        fflush(stdout);
        sleep(1);
    }
    if (pthread_join(mythread, NULL)) {
        printf("error joining thread.");
        abort();
    }
    printf("\nmyglobal equals %d\n", myglobal);
    exit(0);
}

运行效果

线程私有数据

  在多线程环境里,应避免使用静态变量。在 Linux 系统中提供 了线程特定数据(TSD)来取代静态变量。它类似于全局变量,但是,是各个线程私有的, 它以线程为界限。TSD 是定义线程私有数据的惟一方法。同一进程中的所有线程,它们的同 一特定数据项都由一个进程内惟一的关键字 KEY 来标志。用这个关键字,线程可以存取线程私有数据。 在线程特定数据中通常使用四个函数。

  • pthread_key_create
#include <pthread.h> 
int pthread_key_create(pthread_key_t *key, void (* destructor)(void *value)); 

  pthread_key_create 函数在进程内部分配一个标志 TSD 的关键字。
  参数 key 指向创建的关 键字,该关键字对于一个进程中的所有线程是惟一的。所以在创建 key 时,每个进程只能调 用一次创建函数 pthread_key_create。在 key 创建之前,所有线程的关键字值是 NULL。一旦 关键字被建立,每个线程可以为该关键字绑定一个值。这个绑定的值对于线程是惟一的,每 个线程独立维护。
   参数 destructor 是一个可选的析构函数,可以和每个关键字联系起来。如果一个关键字 的 destructor 函数不为空,且线程为该关键字绑定了一个非空值,那么在线程退出时,析构函 数将会被调用。对于所有关键字的析构函数,执行顺序是不能指定的。
  该函数正常执行后返回值为 0,否则返回错误码。

  • pthread_once
#include <pthread.h> 
int pthread_once(pthread_once_t *once, void (*init) (void)); 

  pthread_once 函数使用 once 参数所指的变量,保证每个进程只调用一次 init 函数。通常 once 参数取常量 PTHREAD_ONCE_INIT,它保证每个进程只调用一次 init 函数。
  该函数正常执行后返回值为 0,否则返回错误码。

  • pthread_setspecific
#include <pthread.h> 
int pthread_setspecific(pthread_key_t key, const void *value);

  pthread_setspecific 函数为 TSD 关键字绑定一个与本线程相关的值。
  参数 key 是 TSD 关 键字。
  参数 value 是与本线程相关的值。value 通常指向动态分配的内存区域。
  该函数正常执行后返回值为 0,否则返回错误码。

  • pthread_getspecific
#include <pthread.h> 
void * pthread_getspecific(pthread_key_t key); 

  pthread_getspecific 函数获取与调用线程相关的 TSD 关键字所绑定的值。
  参数 key 是 TSD 关键字。
  该函数正常执行后返回与调用线程相关的 TSD 关键字所绑定的值。否则返回 NULL。

  线程安全性代码示例:

#include <stdio.h>
#include <pthread.h>
pthread_key_t   key;
void echomsg(int t)
{
    printf("destructor excuted in thread %d,param=%d\n", pthread_self(), t);
}
void * child1(void *arg)
{
    int tid = pthread_self();
    printf("thread1 %d enter\n", tid);
    pthread_setspecific(key, (void *)tid);
    sleep(2);
    printf("thread1 %d key’s %d\n", tid, pthread_getspecific(key));
    sleep(5);
}
void * child2(void *arg)
{
    int tid = pthread_self();
    printf("thread2 %d enter\n", tid);
    pthread_setspecific(key, (void *)tid);
    sleep(1);
    printf("thread2 %d key’s %d\n", tid, pthread_getspecific(key));
    sleep(5);
}
int main(void)
{
    pthread_t tid1, tid2;

    printf("hello\n");
    pthread_key_create(&key, (void *)echomsg);
    pthread_create(&tid1, NULL, child1, NULL);
    pthread_create(&tid2, NULL, child2, NULL);
    sleep(10);
    pthread_join(tid1, NULL);
    pthread_join(tid2, NULL);
    pthread_key_delete(key);
    printf("main thread exit\n");
    return 0;
}

运行结果

小结

  使用多线程实现并发服务器的优点是线程的开销小,切换容易。但是由于线程共享相同 的内存区域,所以在对共享数据的进行操作时,要注意同步问题。其中线程特定数据虽然实现起来比较烦琐,但是它是将一个非线程安全 函数转换成线程安全函数的常用方法。
  除此之外,还可以通过改变调用函数参变量的方式实现线程的安全性,这里不作介绍。
  下一篇文章将介绍另外一种实现并发服务器的方法:I/O 多路复用。

GitHub 加速计划 / li / linux-dash
10.39 K
1.2 K
下载
A beautiful web dashboard for Linux
最近提交(Master分支:2 个月前 )
186a802e added ecosystem file for PM2 4 年前
5def40a3 Add host customization support for the NodeJS version 4 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐