线程控制[pthread_create() pthread_join()] 线程同步[互斥锁 条件变量 信号量]
x86/Debian Linux/gcc
2 线程控制
此笔记涉及的线程库函数是由POSIX标准定义的,称为POSIXthread或者pthread。其它也有很多关于线程的库如C++ Boost的线程库。
(1) 创建线程函数( man pthread_create )
pthread_create ---- 创建一个新的线程
#include <pthread.h> int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void*), void *arg);
Compile and link with -pthread. |
[1] start_routine和arg
在一个线程中调用pthread_create()创建新的线程后,当前线程从pthread_create()返回继续往下执行,而新的线程从start_routine()开始执行。start_routine()函数接收一个参数,是通过pthread_create的arg参数传递给它的,该参数的类型为void *,这个指针按什么类型解释由调用者自己定义。start_routine()的返回值类型也是void*,这个指针的含义同样由调用者自己定义。
如果需要只终止某个线程如新进程终止则有以下几种方式:
- 在新进程中调用pthread_exit(3)函数,在这个函数中指定的退出状态能被同进程中的其它线程调用pthread_join(3)函数获得,类似于父进程调用wait(2)得到子进程的退出状态[进程控制[fork() exec()wait() waitpid() ]。
- 从start_routine()中返回(return)。这种给return提供返回值的方式相当于在新线程中调用pthread_join(3)函数。
- 新线程被取消( pthread_cancel(3) )。
任何线程调用exit(3)函数,或者主线程从main()函数中返回,会引起统一进程中的所有线程终止。
[2] attr
attr参数指向一个pthread_atrr_t结构体,这个结构体的内容被用在线程创建的时间以决定被创新线程的属性;这个结构体被pthread_init(3)及相关的函数初始化。如果attr为NULL,那么被创建的线程为默认属性。
[3] thread
成功调用pthread_reate()函数,在返回前,将新线程的ID保存在thread指向的缓冲区中;在同一进程中,这个ID可以唯一的标识这个线程,可供其它线程中的函数使用。
[4] 返回值
pthread_create()函数执行成功则返回0;否则返回错误号,此时*thread内容不定。
(2) 线程控制代码[ pthread_create() pthread_join() ]
[1] 代码
man pthread_create下的例子:
#include <pthread.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <ctype.h>
#define handle_error_en(en, msg) \
do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0)
#define handle_error(msg) \
do { perror(msg); exit(EXIT_FAILURE); } while (0)
struct thread_info { /* Used as argument to thread_start() */
pthread_t thread_id; /* ID returned by pthread_create() */
int thread_num; /* Application-defined thread # */
char *argv_string; /* From command-line argument */
};
/* Thread start function: display address near top of our stack,
and return upper-cased copy of argv_string */
static void *
thread_start(void *arg)
{
struct thread_info *tinfo = arg;
char *uargv, *p;
printf("Thread %d: top of stack near %p; argv_string=%s\n",
tinfo->thread_num, &p, tinfo->argv_string);
uargv = strdup(tinfo->argv_string);
if (uargv == NULL)
handle_error("strdup");
for (p = uargv; *p != '\0'; p++)
*p = toupper(*p);
return uargv;
}
int
main(int argc, char *argv[])
{
int s, tnum, opt, num_threads;
struct thread_info *tinfo;
pthread_attr_t attr;
int stack_size;
void *res;
/* The "-s" option specifies a stack size for our threads */
stack_size = -1;
while ((opt = getopt(argc, argv, "s:")) != -1) {
switch (opt) {
case 's':
stack_size = strtoul(optarg, NULL, 0);
break;
default:
fprintf(stderr, "Usage: %s [-s stack-size] arg...\n",
argv[0]);
exit(EXIT_FAILURE);
}
}
num_threads = argc - optind;
/* Initialize thread creation attributes */
s = pthread_attr_init(&attr);
if (s != 0)
handle_error_en(s, "pthread_attr_init");
if (stack_size > 0) {
s = pthread_attr_setstacksize(&attr, stack_size);
if (s != 0)
handle_error_en(s, "pthread_attr_setstacksize");
}
/* Allocate memory for pthread_create() arguments */
tinfo = calloc(num_threads, sizeof(struct thread_info));
if (tinfo == NULL)
handle_error("calloc");
/* Create one thread for each command-line argument */
for (tnum = 0; tnum < num_threads; tnum++) {
tinfo[tnum].thread_num = tnum + 1;
tinfo[tnum].argv_string = argv[optind + tnum];
/* The pthread_create() call stores the thread ID into
corresponding element of tinfo[] */
s = pthread_create(&tinfo[tnum].thread_id, &attr,
&thread_start, &tinfo[tnum]);
if (s != 0)
handle_error_en(s, "pthread_create");
}
/* Destroy the thread attributes object, since it is no
longer needed */
s = pthread_attr_destroy(&attr);
if (s != 0)
handle_error_en(s, "pthread_attr_destroy");
/* Now join with each thread, and display its returned value */
for (tnum = 0; tnum < num_threads; tnum++) {
s = pthread_join(tinfo[tnum].thread_id, &res);
if (s != 0)
handle_error_en(s, "pthread_join");
printf("Joined with thread %d; returned value was %s\n",
tinfo[tnum].thread_num, (char *) res);
free(res); /* Free memory allocated by thread */
}
free(tinfo);
exit(EXIT_SUCCESS);
}
[2] 解释
- getopt()的功能是解析程序的命令行参数并查找到其中的可选参数(以“-”开头,如-s)optstring。optind被系统初始化为1,如果getopt()解析到命令行参数中的optstring,就设置变量optind的值为optstring的位置值加1。optstring是用户给的包含命令行中可选参数的字符串,如果在optstring中的可选字符后面跟一个冒号,就说明此可选参数需要一个参数,那么getopt()在解析到这个可选参数之后就会设置optarg指针指向紧跟可选参数后面的那个元素。optstring中的可选字符后跟两个冒号表示可选字符后面跟有可选参数。
- strtoul()函数将可选参数后面的字符串转换为一个unsigned long int型。
- pthread_attr_init()初始化线程的属性参数attr。pthread_attr_setstacksize()函数设置线程属性attr中栈大小属性值为stacksize,那么用attr属性来创建的线程的栈大小至少有stacksize大。
- 在主线程中用pthread_create()函数创建了num_threads个线程。并把线程id号,传递给新线程的参数(来自命令行参数,把命令行参数对应的标记为线程的名字)都记录下来。
- 线程函数thread_start()将在三个线程中运行,局部变量p在函数最后被定义,所以它的地址最接近线程的栈地址(一个C源文件到可执行文件 [反汇编-函数栈帧 编译 链接])。
- 主线程调用pthread_join()等待子线程结束,并在子线程结束时获取其结束状态。通过pthread_join()得到的终止状态是不同的:(1)如果thread线程通过return返回,value_ptr所指向的单元里存放的是thread线程函数的返回值。(2) 如果thread线程被别的线程调用pthread_cancel()异常终止掉,value_ptr所指向的单元里存放的是常数PTHREAD_CANCELED。(3) 如果thread线程是自己调用pthread_exit()终止的,value_ptr所指向的单元存放的是传给pthread_exit()的参数。一般情况下,线程终止后,其终止状态一直保留到其它线程调用pthread_join()获取它的状态为止。但是线程也可以被置为detach状态,这样的线程一旦终止就立刻回收它占用的所有资源,而不保留终止状态。不能对一个已经处于detach状态的线程调用pthread_join(),这样的调用将返回EINVAL。对一个尚未detach的线程调用pthread_join()或pthread_detach()都可以把该线程置为detach状态,也就是说,不能对同一线程调用两次pthread_join(),或者如果已经对一个线程调用了pthread_detach()就不能再调用pthread_join()了。
[3] 一运行结果
编译运行此程序gccpthread_create.c -o pthread_reate -pthread
./pthread -s 0x1000000 you me him
输出结果:
Thread 1: top of stack near 0xb7591384; argv_string=you Thread 3: top of stack near 0xb558f384; argv_string=him Joined with thread 1; returned value was YOU Thread 2: top of stack near 0xb6590384; argv_string=me Joined with thread 2; returned value was ME Joined with thread 3; returned value was HIM |
-s是可选择参数,紧跟其后的0x1000000是可选择参数-s的参数,optind的值为3,optarg指向0x1000000。you me him的数量是pthread_create要创建的线程数量,并以它们为名。
在主线程中调用pthread_join()来获取子线程结束时(return方式)的返回状态。线程1结束时,线程2还未运行。后运行的线程2运行结束后线程3才运行结束。操作系统会在各线程之间调度和切换,就像在多个进程之间调度和切换一样。
3 线程间同步
(1) 互斥锁(mutex)
多个线程同时访问共享数据时可能会冲突。对于多线程的程序,访问冲突的问题是很普遍的,解决的办法是引入互斥锁(Mutex,Mutual Exclusive Lock),获得锁的线程可以完成“读-修改-写”的操作,然后释放锁给其它线程,没有获得锁的线程只能等待而不能访问共享数据,这样“读-修改-写”三步操作组成一个原子操作,要么都执行,要么都不执行,不会执行到中间被打断,也不会在其它处理器上并行做这个操作。
[1] 代码
书中代码(p.663- 664):
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define NLOOP 5000
int counter; /* incremented by threads */
pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;
void *doit(void *);
int main(int argc, char **argv)
{
pthread_t tidA, tidB;
pthread_create(&tidA, NULL, doit, NULL);
pthread_create(&tidB, NULL, doit, NULL);
/* wait for both threads to terminate */
pthread_join(tidA, NULL);
pthread_join(tidB, NULL);
return 0;
}
void *doit(void *vptr)
{
int i, val;
/* * Each thread fetches, prints, and increments the counter NLOOP times.
* The value of the counter should increase monotonically.
*/
for (i = 0; i < NLOOP; i++) {
pthread_mutex_lock(&counter_mutex);
val = counter;
printf("%x: %d\n", (unsigned int)pthread_self(), val + 1);
counter = val + 1;
pthread_mutex_unlock(&counter_mutex);
}
return NULL;
}
- 程序中的互斥变量counter_mutex为全局变量,可以用宏定义PTHREAD_MUTEX_INITIALIZER来初始化,相当于用pthread_mutex_init()初始化并且attr参数为NULL。
- 主线程调用pthread_join()等待子线程的结束。子线程在访问counter的一个原子操作中获取互斥量counter_mutex的锁有权,在一个原子操作后立即释放对互斥量counter_mutex的锁有权,让其它线程有机会获取互斥量的锁有权去对counter进行原子操作,这个可以运行结果中看出来。
[2] 运行结果
gcc mutex.c -o mutex -pthread
./mutex
一个线程可以调用pthread_mutex_lock获得Mutex(操作系统调度执行此线程),如果这时另一个线程已经调用pthread_mutex_lock获得了该Mutex,则当前线程需要挂起等待(操作系统仍可能在调度执行此线程),(操作系统调度切换运行另一线程)直到另一个线程调用pthread_mutex_unlock释放Mutex,(另一线程释放mutex时操作系统调度运行此线程)当前线程被唤醒(即从此线程上次执行的地方继续执行),才能获得该Mutex并继续执行。
Figure2.trylock运行结果
(2) 条件变量(ConditionVariable)
线程间的同步还有这样一种情况:线程A需要等某个条件成立才能继续往下执行,现在这个条件不成立,线程A就阻塞等待,而线程B在执行过程中使这个条件成立了,就唤醒线程A继续执行。在pthread库中通过条件变量(ConditionVariable)来阻塞等待一个条件,或者唤醒等待这个条件的线程。
一个Condition Variable总是和一个Mutex搭配使用的。一个线程可以调用pthread_cond_wait在一个Condition Variable上阻塞等待,这个函数做以下三步操作[结合操作系统对线程的调度理解]:
- 释放Mutex
- 阻塞等待
- 当被唤醒时,重新获得Mutex并返回
pthread_cond_timedwait函数还有一个额外的参数可以设定等待超时,如果到达了abstime所指定的时刻仍然没有别的线程来唤醒当前线程,就返回ETIMEDOUT。一个线程可以调用pthread_cond_signal唤醒在某个Condition Variable上等待的另一个线程,也可以调用pthread_cond_broadcast唤醒在这个Condition Variable上等待的所有线程。
(3) 信号量(Semaphore)
信号量(Semaphore)和Mutex类似,表示可用资源的数量,和Mutex不同的是这个数量可以大于1。这种信号量不仅可用于同一进程的线程间同步,也可用于不同进程间的同步。
POSIX semaphore库函数:
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value); int sem_wait(sem_t *sem); int sem_trywait(sem_t *sem); int sem_post(sem_t * sem); int sem_destroy(sem_t * sem); |
semaphore变量的类型为sem_t,sem_init()初始化一个semaphore变量,value参数表示可用资源的数量,pshared参数为0表示信号量用于同一进程的线程间同步,本节只介绍这种情况。在用完semaphore变量之后应该调用sem_destroy()释放与semaphore相关的资源。
调用sem_wait()可以获得资源,使semaphore的值减1,如果调用sem_wait()时semaphore的值已经是0,则挂起等待。如果不希望挂起等待,可以调用sem_trywait()。调用sem_post()可以释放资源,使semaphore的值加1,同时唤醒挂起等待的线程。
(4) 其它同步方式
用挂起等待的方式解决访问冲突不见得是最好的办法,因为这样毕竟会影响系统的并发性,在某些情况下解决访问冲突的问题可以尽量避免挂起某个线程,例如Linux内核的Seqlock、RCU(read-copy-update)等机制。[可参见UNIX系统高级环境编程]
1 线程介绍
线程在进程的地址空间中,一个进程的中可执行多个线程,每个线程是一个控制流(负责一件事情),多线程的控制流程可以长期并存,操作系统会在各线程之间调度和切换,就像在多个进程之间调度和切换一样。
同一进程的多个线程共享同一地址空间,因此TextSegment、Data Segment都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量,在各线程中都可以访问到,除此之外,各线程还共享以下进程资源和环境:
- 文件描述符表
- 每种信号的处理方式(SIG_IGN、SIG_DFL或者自定义的信号处理函数)
- 当前工作目录
- 用户id和组id
但有些资源是每个线程各有一份的:
- 线程id
- 上下文,包括各种寄存器的值、程序计数器和栈指针
- 栈空间
- lerrno变量
- 信号屏蔽字
- 调度优先级
更多推荐
所有评论(0)