本文翻译自:https://computing.llnl.gov/tutorials/pthreads/
1. 基本概念
在共享内存的多处理器架构体系中,线程可以用来实现并行性。在历史上,硬件供应商已经实现了自己的专有版本的线程模型,这使得可移植性成为了软件开发者的关注点。对于UNIX系统,一个标准的C语言线程编程接口由IEEE POSIX 1003.1C标准定义了。坚持这个标准的实现被称为POSIX线程或者Pthreads。
本教程会先介绍使用Pthread的概念、动机以及设计注意事项。然后会介绍Pthreads API中的三个主要类的例程:线程管理、互斥变量以及条件变量。示例代码用于演示如何使用大部分Pthread编程初学者会用到的Pthreads接口。这个教程最后会讨论LLNL的细节,以及如何混合MPI和pthreads。当然还包含一些使用C语言编写的练习题。
水平/先决条件:本教程是为那些打算使用pthreads进行并行编程的初学者而写。初学者需要了解基本的使用C语言进行并行编程的概念。对于那些完全不了解并行编程的初学者,下面链接中的资料将会非常有帮助:https://computing.llnl.gov/tutorials/parallel_comp/
2. Pthreads概览
什么是线程?
- 从技术上讲,线程被定义为一个独立地指令流,可以被调度为由操作系统运行。但是这是什么意思呢?
- 对于软件开发者,独立于主程序运行的“过程”的改变就是一个对线程很好的描述
- 进一步定义,想象这样一个主程序(a.out)包含一系列的执行过程。然后想象所有的这些过程都能够同时和/或者独立地被操作系统调用运行。那么这样一个程序就能称为是多线程程序。
- 这是如何完成的呢?
- 在理解一个线程之前,我们需要先了解UNIX进程。进程是由 操作系统创建的,并需要相当数量的资源。通常包含程序资源和程序执行语句等信息,包括:
- 进程ID,进程组ID,用户ID和组ID
- 环境
- 工作目录
- 程序执行流程
- 注册环境
- 堆
- 栈
- 文件句柄
- 信号处理
- 共享库
- 进程同步工具(例如消息队列、管道、信号量或者共享内存)。
上图左边是UNIX系统中进程的示意图,右边是UNIX进程中线程的示意图。
- 线程使用存在于进程中的资源,但是是能够被操作系统调用并作为独立实体运行,这是因为他们只复制能使他们作为可执行代码存在的基本资源。
- 这个独立地控制流程能够完成,是因为线程保持它们自己的:
- 栈指针
- 注册环境
- 调度属性(例如调度策略和调度优先级)
- 一系列挂起和阻塞的信号
- 线程特定的数据
- 因此,总结的说,在UNIX环境中,一个线程
- 存在于一个进程中,并且使用进程的资源
- 有自己独立地控制流程,只要它的父进程存在并且支持它
- 仅仅复制它能够独立运行所需要的资源
- 可以与其他同样的独立(或存在依赖关系的)线程共享进行资源
- 如果父进程死亡,线程也跟着死亡
- 是“轻量级的”,因为大部分开销已经通过创建进程时已经完成了。
- 因为在同一个进程里面的线程共享相同的资源
- 其他所有的线程都会看到其中一个线程对共享系统资源(如关闭文件)所作的更改。
- 指向同一个地方的指针拥有相同的数据
- 读写同一块内存区域是可能的,因此需要程序员做显式的同步数据
3. Pthreads概览
3.1 什么是Pthreads?
- 历史上,硬件供应商已经实现了自己的专有版本的线程。 这些实现彼此大相径庭,使程序员难以开发可移植的线程应用程序。
- 为了充分利用线程提供的功能,需要一个标准化的编程接口
- 对于UNIX系统,编程接口已经由IEEE POSIX 1003.1C标准定义了
- 遵守这个标准的实现被称为POSIX线程或Pthreads。
- 除了专有的API以外,大多数硬件供应商现在都提供Pthreads
- POSIX标准,包括Pthreads规范在内,在不断发展并经过修订。
- 一些有用的链接
- standards.ieee.org/findstds/standard/1003.1-2008.html
- www.opengroup.org/austin/papers/posix_faq.html
- www.unix.org/version3/ieee_std.html
- Pthread被定义为一组C语言编程类型和过程调用,用pthread.h头文件/include文件和一个线程库来实现,虽然在某些实现中,这个库可能是另一个库的一部分,比如libc。
3.2 为什么使用Pthreads?
轻量型
- 和创建以及管理一个进程相比,线程的创建所需的系统开销更少,而且管理线程相对管理进程所需的资源更少。
- 例如,下面的表格中列出了fork()子调用和pthread_create()子调用所需要的时间对比。时间反映了50000个进程/线程的创建,使用time函数进行计时,单位为秒,没有使用任何的优化技术
执行所用的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
============================================================================== C Code for fork() creation test ============================================================================== #include <stdio.h> #include <stdlib.h> #define NFORKS 50000 void do_nothing() { int i; i= 0; } int main(int argc, char *argv[]) { int pid, j, status; for (j=0; j < NFORKS; j++) { /*** error handling ***/ if ((pid = fork()) < 0 ) { printf ("fork failed with error code= %d\n", pid); exit(0); } /*** this is the child of the fork ***/ else if (pid ==0) { do_nothing(); exit(0); } /*** this is the parent of the fork ***/ else { waitpid(pid, status, 0); } } } ============================================================================== C Code for pthread_create() test ============================================================================== #include <pthread.h> #include <stdio.h> #include <stdlib.h> #define NTHREADS 50000 void *do_nothing(void *null) { int i; i=0; pthread_exit(NULL); } int main(int argc, char *argv[]) { int rc, i, j, detachstate; pthread_t tid; pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); for (j=0; j < NTHREADS; j++) { rc = pthread_create(&tid, &attr, do_nothing, NULL); if (rc) { printf("ERROR; return code from pthread_create() is %d\n", rc); exit(-1); } /* Wait for the thread */ rc = pthread_join(tid, NULL); if (rc) { printf("ERROR; return code from pthread_join() is %d\n", rc); exit(-1); } } pthread_attr_destroy(&attr); pthread_exit(NULL); } |
高效的通信/数据交换
- 考虑在高性能计算环境中使用Pthreads的主要原因是为了获取到最佳的性能。特别是,如果一个程序使用MPI进行节点通信,则有可能通过使用Pthreads来提高性能。
- MPI共享库进程通过共享内存来实现节点间任务通信,其中涉及至少一个内存复制操作(进程到进程)
- 对于Pthreads,就不需要中间内存复制了,因为线程在同一个进程中共享相同的地址空间。因此不需要传递数据本身,而可以高效的通过传递指向数据的指针
- 在最坏的场景下,Pthread通信将更多的是cache到CPU或者内存到CPU的带宽问题。这些通信的速度都是远远高于MPI共享内存通信的速度的
- 例如,一些本地场景的比较,过去和现在,如下所示:
其他原因
- 在其他几种场景下,线程化的应用程序比非线程化的应用程序提供更多潜在的性能提升和实际优势
- 重叠的CPU和I/O工作,例如一个程序可能有部分会长时间执行I/O操作。当一个线程等待I/O系统调度完成时,可以由其他线程来完成CPU密集型的工作
- 优先级/实时调度:更重要的任务能够取代或者中断低优先级的任务
- 异步事件处理:以不同频率和持续时间进行的服务可以交叉执行。例如一个web服务器能够同时和先前的连接进行数据传输以及管理新连接的到来
- 一个完美的例子应该是网页浏览器,在浏览器运行时,同时存在多个交叉任务而且任务的优先级不同
- 另外一个好的例子就是现代操作系统了,这使得线程被广泛的使用。MS Windows操作系统和应用程序使用多线程的屏幕快照如下如所示:
3.3 设计线程化的程序
串行化编程
- 在现代,在多核心处理器的机器上,pthreads非常适合并行编程。并且在一般情况下适用于并行编程的场景都能够适用于Pthreads进行并行编程。
- 进行并行程序设计,有很多注意实现:
- 适用何种并行编程模型
- 问题的细分
- 负载均衡
- 线程通信
- 数据依赖/同步
- 同步和竞争条件
- 内存问题
- I/O问题
- 程序的复杂性
- 程序员的努力/成本/时间
- ……
- 设计这些话题已经超出了本教程的范围,但是如果对此感兴趣的读者通过下面的教程获得一个简单的概述:https://computing.llnl.gov/tutorials/parallel_comp/
- 一般来说,为了使一个程序能够利用Pthreads,它必须能够被组织成分散的、可以并发执行的独立任务。例如,如果子例程1和子例程2能够实时的相互轮转、交叉和重叠的话,那么他们就适合线程。
- 具有以下特性的程序可能非常适合使用Pthreads:
- 能够被多个任务同时执行的工作或者同时操作的数据
- 长时间的I/O阻塞
- 在某些地方占用太多的CPU周期但是不执行有用的内容
- 必须响应异步事件
- 某些工作比其他工作更加重要(优先级中断)
- 存在几种常见的线程程序模型:
- 管理者/执行者:管理者线程将任务分配给具体的执行者线程,通常管理者处理所有的输入并将其工作打包成任务分发。最终有两种常见的管理者/执行者模式:静态执行线程池/动态执行线程池。
- 管道:一个任务被分解成一些列的子操作,每个子操作由不同的线程同时处理。汽车组装流水线就是这种模式最好的描述了。
- 对端:类似于管理者/执行者模式,但是主线程在创建完其他线程后,它仍旧参与工作
共享内存模型
- 所有线程都能够访问相同的全局共享内存。
- 线程同时拥有其私有数据
- 程序员有责任对访问到的全局共享数据进行同步(保护)
线程安全
- 线程安全:简而言之是指,能够同时执行多个线程的应用程序不会破坏共享数据或者产生竞争条件
- 举个例子,假设你的应用程序创建了几个线程,每个子线程都调用同一个共享库子例程:
- 这个共享库子例程访问/修改一个全局的结构体或者内存中的位置
- 当每个线程调用该子例程的时候,有可能这些线程会同时尝试修改这个全局结构体/内存位置
- 如果例程没有采用某种同步结构来防止数据损坏,那么它就不是线程安全的。
- 对于外部用户库例程,如果用户无法100%确保例程是线程安全的,那么就有可能遇到偶发的问题
- 建议:如果你的应用程序是否是线程安全的库或者其他对象,请小心。当存在疑问时,应当假设其实线程不安全的,直到能够证明其线程安全为止。这可以通过序列化的调用该例程来证明。
线程的限制
- 尽管Pthreads API是ANSI/IEEE标准的。但是实现通常会以标准未指定的方式变化。
- 正是因为这个原因,所以一个程序在一个平台上运行良好,但是到其他平台上可能无法运行或者执行后可能得到错误的结果
- 例如,允许的最大线程数和默认线程堆栈大小是设计程序时要考虑的两个重要限制。
- 本教程稍后会讨论几个线程的限制
4 Pthreads API
- 原来的Pthreads API是在ANSI / IEEE POSIX 1003.1 – 1995标准中定义的。后来包括Pthreads规范在内的POSIX标准还在发展并经过修订。
- 该标准的副本可以从IEEE处购买或者从其他网上上免费在线下载
- 包括Pthreads API的子例程大概可以分成四个部分:
- 线程管理:直接在线程上工作的例程 – 创建,分离,连接等。它们还包括设置/查询线程属性的功能(可连接,调度等)
- 互斥:处理同步的子例程称为互斥(Mutex),它是“mutual exclusion”的缩写。互斥功能提供创建、销毁、锁定和解锁互斥锁操作。这些由互斥属性函数来补充,这些函数设置或修改与互斥量相关的属性
- 条件变量:基于程序员指定的条件,解决共享互斥体例程之间的通信问题。这一组包含基于指定的变量值创建、销毁、等待和发送信号的功能。同时还包含设置/查询条件变量属性的函数
- 同步:管理读写锁和屏障的例程
- 命名约定:线程库中的所有标识符都以pthread_开头。 一些例子如下所示。
- 不透明对象的概念遍及API的设计。 基本调用的工作是创建或修改不透明的对象 – 不透明的对象可以通过调用属性函数来修改,这些属性函数处理不透明的属性
- Pthreads API包含大约100个子例程。 本教程将重点介绍其中的一部分 – 具体来说,这些部分Pthreads程序员最有可能会立刻用到。
- 为了便于移植,pthread.h文件必须包含在每个使用Pthreads库的源文件中
- 当前POSIX标准仅仅为C语言定义的。Fortan程序员可以调用这些C函数的封装包,某些Fortran编译器可能会提供一个Fortran pthreads API。
5 编译多线程程序
下表列出了几个用于编译pthreads代码的命令的例子:
6 线程管理
6.1 创建和终止例程
子例程
1 2 3 4 5 6 7 |
pthread_join (threadid,status) pthread_detach (threadid) pthread_attr_setdetachstate (attr,detachstate) pthread_attr_getdetachstate (attr,detachstate) |
创建线程
- 在初始状态下,你的main()程序包含一个单一的默认的线程,所有其他的线程都必须由程序员显示的创建
- pthread_create创建一个新线程并使其可执行,该子例程可以在你的代码中的任何地方调用任意次数。
- pthread_create的参数:
- thread:由子例程返回的新线程的不透明唯一标识符。
- attr:可用于设置线程属性的不透明属性对象。 您可以指定一个线程属性对象,或者为默认值为NULL。
- start_routine:线程一旦创建就执行的C例程
- arg:可以传递给start_routine的单一参数。它必须是通过void指针类型来传递。如果不需要传递参数,可以将该参数设置为NULL
- 一个进程能够创建的最大线程数是由实现决定的。任何试图超出限制的程序可能会失败或者产生错误的结果。
- 查询或者设置你的线程实现-Linux示例如下。演示查询默认(软)限制,然后将最大进程数(包括线程)设置为硬限制。 然后验证限制已被覆盖
- 一旦创建,线程是对等的,并可能创建其他线程。 线程之间没有隐含的层次结构或依赖关系
线程属性
- 默认的,一个线程被创建时都会有带有特定的属性。程序员可以通过线程属性对象来修改线程的部分属性。
- pthread_attr_init和pthread_attr_destroy是用来初始化/销毁线程属性对象的
- 然后使用其他子例程用来查询/设置线程属性对象中的特定属性。 属性包括:
- 分离或者连接的状态
- 调度策略
- 调度优先级
- 调度参数
- 调度争用作用域
- 栈大小
- 栈地址
- 栈防护(溢出)大小
- 部分属性会在后面讨论到。
线程绑定和调度
1 2 3 |
<b>问题</b>:一个线程被创建后,你是如何知道 a)它何时会被操作系统调用运行;以及 b)它会在那个处理器核心上运行? <b>答案</b>:除非你使用它Pthreads调度机制,否则线程何时以及在何处运行将取决于实现和/或者操作系统。鲁棒性强的程序不应该依赖于特定的执行顺序和执行在特定的处理器上 |
- Pthreads API提供了几个子例程,可以用来指定如何安排线程执行。 例如,可以指定以FIFO(先进先出),RR(循环)或OTHER(操作系统确定)调度方式来调用线程。 它还提供了设置线程调度优先级值的功能。
- 这些主题不会在这里展开,但是在sched_setscheduler的man页面中可以找到关于Linux下“工作原理”的很好的概述。
- 另外,本地操作系统可能会提供一种方法来执行此操作。 例如,Linux提供了sched_setaffinity子例程
终止线程于pthread_exit()
- 存在几种终止线程的方式:
- 线程从其启动例程中正常返回。表示它的工作已经完成了。
- 线程调度了pthread_exit子例程,不管其动作是否已经完成
- 该线程被其他线程通过pthread_cancel子例程终止
- 进程调用了exec()或者exit()导致其被终止
- 如果main()首先完成,而没有显示调用pthread_exit
- pthread_exit()例程允许程序员指定一个可选的终止状态参数。 这个可选参数通常返回给“连接到”终止线程的线程中(稍后介绍)。
- 在正常执行完成的子例程中,通常可以不用调用pthread_exit()-当然,除非要传递返回可选的状态码。
- 清理:pthread_exit()例程不关闭文件; 在线程内部打开的任何文件将在线程终止后保持打开状态。
- 讨论在main()方法中调用pthread_exit()
- 这是一个已经明确的问题,如果main()方法在其创建的其创建的线程之前完成,并且main()方法没有显式的调用pthread_exit(),那么其创建的所有线程都会终止。这是因为main()方法已经完成,那么就无法再支持线程了。
- 通过在main()方法中显式调用pthread_exit()作为其最后要做的事情,main()方法会阻塞并保持活动,以支持它创建的线程直到它们完成为止。
示例程序:线程创建和终止
这个示例程序通过pthread_create()子例程创建5个线程。每个线程打印一句“Hello World!”信息,然后通过调用pthread_exit()来终止
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
/****************************************************************************** * FILE: hello.c * DESCRIPTION: * A "hello world" Pthreads program. Demonstrates thread creation and * termination. * AUTHOR: Blaise Barney * LAST REVISED: 08/09/11 ******************************************************************************/ #include <pthread.h> #include <stdio.h> #include <stdlib.h> #define NUM_THREADS 5 void *PrintHello(void *threadid) { long tid; tid = (long)threadid; printf("Hello World! It's me, thread #%ld!\n", tid); pthread_exit(NULL); } int main(int argc, char *argv[]) { pthread_t threads[NUM_THREADS]; int rc; long t; for(t=0;t < NUM_THREADS;t++){ printf("In main: creating thread %ld\n", t); rc = pthread_create(&threads[t], NULL, PrintHello, (void *)t); if (rc){ printf("ERROR; return code from pthread_create() is %d\n", rc); exit(-1); } } /* Last thing that main() should do */ pthread_exit(NULL); } |
6.2 传递参数给线程
- pthread_create()例程允许程序员将一个参数传递给线程启动例程。 对于必须传递多个参数的情况,通过创建一个包含所有参数的结构体,然后在pthread_create()例程中传递一个指向该结构的指针,可以很容易地克服这个限制。
- 所有参数必须通过引用传递并转换为(void *)。
1 2 3 |
<b>问题:</b>鉴于线程的不确定的启动和调度,如何安全的将数据传递给新创建的线程呢? <b>答案:</b>确保所有传递的数据都是线程安全的-即它们不能被其他线程修改。下面的三个例子说明了什么不应该做而什么应该做 |
Example1
此代码片段演示了如何将简单的整数传递给每个线程。 调用线程为每个线程使用一个唯一的数据结构,确保每个线程的参数在程序中保持不变。
1 2 3 4 5 6 7 8 9 |
long taskids[NUM_THREADS]; for(t=0; t < NUM_THREADS; t++) { taskids[t] = t; printf("Creating thread %ld\n", t); rc = pthread_create(&threads[t], NULL, PrintHello, (void *) taskids[t]); ... } |
Example2
这个例子展示了如何通过一个结构来设置/传递多个参数。 每个线程接收一个唯一的结构实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
struct thread_data{ int thread_id; int sum; char *message; }; struct thread_data thread_data_array[NUM_THREADS]; void *PrintHello(void *threadarg) { struct thread_data *my_data; ... my_data = (struct thread_data *) threadarg; taskid = my_data->thread_id; sum = my_data->sum; hello_msg = my_data->message; ... } int main (int argc, char *argv[]) { ... thread_data_array[t].thread_id = t; thread_data_array[t].sum = sum; thread_data_array[t].message = messages[t]; rc = pthread_create(&threads[t], NULL, PrintHello, (void *) &thread_data_array[t]); ... } |
Example3
这个例子执行不正确的参数传递。 它传递变量t的地址,这个变量是共享内存空间,对所有线程都是可见的。 循环迭代时,可能在创建的线程可以访问它之前,此内存位置的值将发生更改。
1 2 3 4 5 6 7 8 9 |
int rc; long t; for(t=0; t < NUM_THREADS; t++) { printf("Creating thread %ld\n", t); rc = pthread_create(&threads[t], NULL, PrintHello, (void *) &t); ... } |
6.3 连接和分离线程
子例程
1 2 3 4 5 6 7 |
pthread_join (threadid,status) pthread_detach (threadid) pthread_attr_setdetachstate (attr,detachstate) pthread_attr_getdetachstate (attr,detachstate) |
连接
- 连接是实现线程同步的一种方法,例如:
- pthread_join()子例程阻塞调用的线程,直到传递给pthread_join()的threadid的线程终止为止
- 如果在目标线程对pthread_exit()的调用中指定了该目标线程的终止返回状态,程序员将能够获得目标线程的终止返回状态。
- 加入的线程可以匹配一个pthread_join()调用。 在同一个线程上尝试多个连接是一个逻辑错误。
- 其他两种同步方法,互斥锁和条件变量,将在后面讨论。
是否可连接的?
- 当一个线程被创建时,它的一个属性定义了它是可以连接的还是分离的。 只有被创建为可连接的线程才能被连接。 如果一个线程被创建为分离的,它永远不能被连接。
- POSIX标准的最终草案指定线程应该创建为可连接的。
- 要显式创建一个线程为可连接或分离的,要使用到pthread_create()例程中的attr参数。 典型的4个步骤是:
- 声明pthread_attr_t数据类型的线程属性变量
- 使用pthread_attr_init()初始化属性变量
- 使用pthread_attr_setdetachstate()设置线程的分离状态
- 当完成设置后,使用pthread_attr_destroy()来释放属性变量占用的内容
分离
- pthread_detach()子例程可以用来显式的分离一个线程,即时该线程以可连接形式创建的。
- 没有执行反操作的子例程。
建议
- 如果一个线程需要连接,请考虑明确地将其创建为可连接。 这提供了可移植性,因为并不是所有的实现都可以创建线程默认为可连接的。
- 如果你事先知道线程永远不需要与另一个线程连接,请考虑以分离状态创建线程。因为一些系统资源能够被自动释放。
例子:线程连接
- 此示例演示如何通过使用Pthread join子例程“等待”线程执行完成。
- 由于Pthreads的某些实现可能不会创建处于可连接状态的线程,因此本示例中的线程将以可连接的状态显式创建,以便稍后可以连接它们。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
#include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <math.h> #define NUM_THREADS 4 void *BusyWork(void *t) { int i; long tid; double result=0.0; tid = (long)t; printf("Thread %ld starting...\n",tid); for (i=0; i < 1000000; i++) { result = result + sin(i) * tan(i); } printf("Thread %ld done. Result = %e\n",tid, result); pthread_exit((void*) t); } int main (int argc, char *argv[]) { pthread_t thread[NUM_THREADS]; pthread_attr_t attr; int rc; long t; void *status; /* Initialize and set thread detached attribute */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); for(t=0; t < NUM_THREADS; t++) { printf("Main: creating thread %ld\n", t); rc = pthread_create(&thread[t], &attr, BusyWork, (void *)t); if (rc) { printf("ERROR; return code from pthread_create() is %d\n", rc); exit(-1); } } /* Free attribute and wait for the other threads */ pthread_attr_destroy(&attr); for(t=0; t < NUM_THREADS; t++) { rc = pthread_join(thread[t], &status); if (rc) { printf("ERROR; return code from pthread_join() is %d\n", rc); exit(-1); } printf("Main: completed join with thread %ld having a status of %ld\n",t,(long)status); } printf("Main: program completed. Exiting.\n"); pthread_exit(NULL); } |
6.4 栈管理
子例程
1 2 3 4 5 6 7 |
pthread_attr_getstacksize (attr, stacksize) pthread_attr_setstacksize (attr, stacksize) pthread_attr_getstackaddr (attr, stackaddr) pthread_attr_setstackaddr (attr, stackaddr) |
防止堆栈问题
- POSIX标准没有指定线程栈的大小,这个取决于实现,因此导致不同的实现其线程栈大小各不相同
- 超出默认栈限制是非常容易的,而超出栈限制,通常的结果是程序终止或者数据破坏
- 安全和可移植的程序不依赖于默认的栈限制,而是通过调用pthread_attr_setstacksize子例程来为每个线程分配足够的栈空间
- 当线程的栈必须放置在内存中某个特定的区域时,可以通过调用pthread_attr_getstackaddr和pthread_attr_setstackaddr子例程来实现
一些测试结果
- 默认的线程栈大小因实现而存在很大的不同。可以获得的最大尺寸也会有很大的变化,并且可能取决于每个节点的线程数量。
- 过去和现在的体系结构都显示了默认线程堆栈大小的巨大差别
示例:栈大小管理
- 该示例用于演示如何查询并设置线程栈大小
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
#include <pthread.h> #include <stdio.h> #define NTHREADS 4 #define N 1000 #define MEGEXTRA 1000000 pthread_attr_t attr; void *dowork(void *threadid) { double A[N][N]; int i,j; long tid; size_t mystacksize; tid = (long)threadid; pthread_attr_getstacksize (&attr, &mystacksize); printf("Thread %ld: stack size = %li bytes \n", tid, mystacksize); for (i=0; i < N; i++) for (j=0; j < N; j++) A[i][j] = ((i*j)/3.452) + (N-i); pthread_exit(NULL); } int main(int argc, char *argv[]) { pthread_t threads[NTHREADS]; size_t stacksize; int rc; long t; pthread_attr_init(&attr); pthread_attr_getstacksize (&attr, &stacksize); printf("Default stack size = %li\n", stacksize); stacksize = sizeof(double)*N*N+MEGEXTRA; printf("Amount of stack needed per thread = %li\n",stacksize); pthread_attr_setstacksize (&attr, stacksize); printf("Creating threads with stack size = %li bytes\n",stacksize); for(t=0; t < NTHREADS; t++){ rc = pthread_create(&threads[t], &attr, dowork, (void *)t); if (rc){ printf("ERROR; return code from pthread_create() is %d\n", rc); exit(-1); } } printf("Created %ld threads.\n", t); pthread_exit(NULL); } |
6.5 其余子例程
1 2 3 |
pthread_self () pthread_equal (thread1,thread2) |
- pthread_self返回唯一的,系统赋予调用该子例程的线程的线程ID
- pthread_equal比较两个线程ID。如果两个ID值不同则返回0,否则返回非零值。
- 注意,对于这两个例程,线程标识符对象是不透明对象和不容易被检查的。因为线程ID是不透明对象,多有C语言的相等判断符==不应该被用来比较两个线程ID或者将单一线程ID和另一个值比较
1 |
pthread_once (once_control, init_routine) |
- pthread_once在进程中只执行一次init_routince。进程中的任何线程对该例程的第一次调用将执行给定的init_routine,不带参数,任何后续的调用都将不起作用。
- init_routine例程通常是一个初始化例程
- once_control是一个同步控制结构体,需要在调用pthread_once之前进行初始化,例如:
1 |
pthread_once_t once_control = PTHREAD_ONCE_INIT; |
7 互斥变量
7.1 概览
- mutex是‘mutual exclusion’的缩写。互斥变量是实现线程同步和当数据发生多次写入时保护共享数据的主要手段之一。
- 一个互斥变量就像一个保护访问共享数据资源的“锁”。Pthreads中使用的互斥体的基本概念是,在任何给定的时间只有一个线程可以锁定(或拥有)互斥体变量。 因此,即使有几个线程试图锁定一个互斥锁,也只有一个线程会成功。 没有其他线程可以拥有该互斥体,直到拥有的线程解锁该互斥体。 线程必须“轮流”访问受保护的数据
- 互斥体可以用来防止“竞争”情况。一个关于竞争情况的例子是银行交易系统:
- 在上面的例子中,当一个线程正在使用这个共享数据资源时,应该使用互斥来锁定“Balance”。
- 一个拥有互斥锁的线程执行的动作通常是更新全局变量。 这是一个安全的方法来确保当多个线程更新同一个变量时,最终值与只有一个线程执行更新时的值相同。 正在更新的变量属于“临界区变量”。
- 使用互斥体的典型顺序如下:
- 创建并初始化一个互斥体变量
- 多个线程尝试锁住互斥体
- 仅仅一个线程成功锁住互斥体,并且该线程持有互斥体
- 持有锁的线程执行一系列操作
- 持有锁的线程解锁互斥体
- 其他线程尝试获取互斥体并执行重复相同的步骤
- 最后销毁互斥体
- 当多个线程竞争一个互斥锁时,失败者会阻塞在该调用上,非阻塞的调用时“trylock”而不是“lock”
- 当保护共享数据时,程序员有责任确保每个需要使用互斥锁的线程都这样做。 例如,如果4个线程正在更新相同的数据,但只有一个使用互斥体,则数据仍然可能被破坏。
7.2 创建和销毁互斥体
子例程
1 2 3 4 5 6 7 |
pthread_mutex_init (mutex,attr) pthread_mutex_destroy (mutex) pthread_mutexattr_init (attr) pthread_mutexattr_destroy (attr) |
使用方法
- 互斥量变量必须声明为pthread_mutex_t类型,并且在使用之前必须初始化。 有两种方法来初始化互斥变量:
- 当它被声明时进行静态初始化,例如
pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
- 使用pthread_mutex_init()子例程进行动态初始化。该方法允许设置互斥体对象的属性,attr
- 当它被声明时进行静态初始化,例如
- 互斥体初始化为解锁状态的
- attr对象用于建立互斥对象的属性,如果使用的话,必须是pthread_attr_t类型(可以指定为NULL来表示默认值)。Pthreads标准定义了三个可选的互斥量属性
- 协议:指定用于防止互斥优先反转的协议。
- Prioceiling:指定互斥量优先级的上限
- 进程共享:指定互斥量是进程共享的
- 需要注意的是,并不是所有的实现都提供这三个可选的互斥属性
- pthread_mutexattr_init()和pthread_mutexattr_destroy()子例程是用来创建和销毁互斥量属性对象的。
- 当一个互斥量不在需要时,需要调用pthread_mutex_destroy()来释放该互斥量对象
7.3 加锁和解锁互斥量
子例程
1 2 3 4 5 |
pthread_mutex_lock (mutex) pthread_mutex_trylock (mutex) pthread_mutex_unlock (mutex) |
用法
- pthread_mutex_lock()子例程是线程用来在特定的互斥体变量上请求加锁是调用的。如果该互斥体变量已经被其他的线程加锁了,那么该调用会阻塞调用的线程,直到请求的互斥体变量解锁。
- pthread_mutex_trylock()子例程会尝试去锁住互斥体。但是,如果互斥体已经被锁住了,该子例程会立刻返回一个“繁忙”的错误码。该子例程可能有助于防止死锁的情况,如在优先级反转的情况下。
- 如果用有锁的线程调用pthread_mutex_unlock(),它将解锁一个互斥锁。如果其他线程要获取互斥锁来完成对受保护数据的访问,则在线程完成受保护数据的访问后需要调用此例程。如果出现如下情况,将会返回错误码:
- 如果互斥量已经被解锁了
- 如果该互斥体是其他线程持有的
- 互斥体变量并没有什么“神奇的”地方,实际上他们更像是参与线程之间的“绅士协议”。代码编写者需要确保参与线程都正确的加锁和解锁。以下方案演示逻辑错误:
1 2 3 4 |
Thread 1 Thread 2 Thread 3 Lock Lock A = 2 A = A+1 A = A*B Unlock Unlock |
1 2 3 |
<b>问题:</b>当多个线程正在等待一个锁定的互斥体时,哪个线程会在互斥体释放之后首先被授予锁定? <b>答案:</b>除非使用线程优先级调度(未覆盖),否则分配将留给本地系统调度程序,可能看起来或多或少是随机的。 |
示例:使用互斥量
- 本示例程序演示了在执行点积的线程程序中使用互斥变量。
- 主要数据通过全局可访问的结构体提供给所有线程,
- 每个线程都工作在不同的数据部分。
- 主线程等待所有的线程完成他们的计算,然后打印结果总和。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
/***************************************************************************** * FILE: dotprod_mutex.c * DESCRIPTION: * This example program illustrates the use of mutex variables * in a threads program. This version was obtained by modifying the * serial version of the program (dotprod_serial.c) which performs a * dot product. The main data is made available to all threads through * a globally accessible structure. Each thread works on a different * part of the data. The main thread waits for all the threads to complete * their computations, and then it prints the resulting sum. * SOURCE: Vijay Sonnad, IBM * LAST REVISED: 01/29/09 Blaise Barney ******************************************************************************/ #include <pthread.h> #include <stdio.h> #include <stdlib.h> /* The following structure contains the necessary information to allow the function "dotprod" to access its input data and place its output into the structure. This structure is unchanged from the sequential version. */ typedef struct { double *a; double *b; double sum; int veclen; } DOTDATA; /* Define globally accessible variables and a mutex */ #define NUMTHRDS 4 #define VECLEN 100000 DOTDATA dotstr; pthread_t callThd[NUMTHRDS]; pthread_mutex_t mutexsum; /* The function dotprod is activated when the thread is created. As before, all input to this routine is obtained from a structure of type DOTDATA and all output from this function is written into this structure. The benefit of this approach is apparent for the multi-threaded program: when a thread is created we pass a single argument to the activated function - typically this argument is a thread number. All the other information required by the function is accessed from the globally accessible structure. */ void *dotprod(void *arg) { /* Define and use local variables for convenience */ int i, start, end, len ; long offset; double mysum, *x, *y; offset = (long)arg; len = dotstr.veclen; start = offset*len; end = start + len; x = dotstr.a; y = dotstr.b; /* Perform the dot product and assign result to the appropriate variable in the structure. */ mysum = 0; for (i=start; i < end ; i++) { mysum += (x[i] * y[i]); } /* Lock a mutex prior to updating the value in the shared structure, and unlock it upon updating. */ pthread_mutex_lock (&mutexsum); dotstr.sum += mysum; printf("Thread %ld did %d to %d: mysum=%f global sum=%f\n",offset,start,end,mysum,dotstr.sum); pthread_mutex_unlock (&mutexsum); pthread_exit((void*) 0); } /* The main program creates threads which do all the work and then print out result upon completion. Before creating the threads, The input data is created. Since all threads update a shared structure, we need a mutex for mutual exclusion. The main thread needs to wait for all threads to complete, it waits for each one of the threads. We specify a thread attribute value that allow the main thread to join with the threads it creates. Note also that we free up handles when they are no longer needed. */ int main (int argc, char *argv[]) { long i; double *a, *b; void *status; pthread_attr_t attr; /* Assign storage and initialize values */ a = (double*) malloc (NUMTHRDS*VECLEN*sizeof(double)); b = (double*) malloc (NUMTHRDS*VECLEN*sizeof(double)); for (i=0; i < VECLEN*NUMTHRDS; i++) { a[i]=1; b[i]=a[i]; } dotstr.veclen = VECLEN; dotstr.a = a; dotstr.b = b; dotstr.sum=0; pthread_mutex_init(&mutexsum, NULL); /* Create threads to perform the dotproduct */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); for(i=0;i < NUMTHRDS;i++) { /* Each thread works on a different set of data. * The offset is specified by 'i'. The size of * the data for each thread is indicated by VECLEN. */ pthread_create(&callThd[i], &attr, dotprod, (void *)i); } pthread_attr_destroy(&attr); /* Wait on the other threads */ for(i=0;i < NUMTHRDS;i++) { pthread_join(callThd[i], &status); } /* After joining, print out the results and cleanup */ printf ("Sum = %f \n", dotstr.sum); free (a); free (b); pthread_mutex_destroy(&mutexsum); pthread_exit(NULL); } |
8. 条件变量
8.1 概览
- 条件变量为线程同步提供了另一种方式。虽然互斥锁通过控制线程对数据的访问来实现同步,但是条件变量运行线程根据数据的实际值进行同步。
- 如果没有条件变量,程序员需要连续轮询线程(可能在关键部分),检查是否满足条件。 这可能是非常耗费资源的,因为线程会在这个活动中持续繁忙。 条件变量是一种无需轮询即可实现相同目标的方法。
- 一个条件变量总是和一个互斥锁一起使用。
- 下面显示使用条件变量的具有代表性的步骤
- 主线程:
- 声明并初始化需要同步的全局数据/变量(例如“count”)
- 声明并初始化一个条件变量对象
- 申明并初始化一个辅助的互斥体
- 创建线程A和线程B来完成工作
- 线程A
- 执行工作到指定的条件发生(例如“count”到达指定的值)
- 锁住辅助的互斥体并检查全局变量的值
- 调用pthread_cond_wait()来进入阻塞状态,等待从线程B中发来的信号。需要注意的是调用pthread_cond_wait()的线程会自动并原子性的解锁辅助的互斥体变量,因此该辅助互斥体就能被线程B使用的。
- 当信号来临,线程唤醒。辅助的互斥体会再次自动的和原子性的加锁
- 显式的解锁互斥体变量
- 继续完成剩余的工作
- 线程B
- 执行工作
- 对辅助的互斥体变量加锁
- 改变线程A等待的全局变量的值
- 检查线程A等待的全局变量的值,如果到达期望的值,发送信号给线程A
- 解锁互斥体
- 继续完成剩余的工作
- 主线程
- 连接线程/完成剩余的工作
- 主线程:
8.2 创建和销毁条件变量
子例程
1 2 3 4 5 6 7 |
pthread_cond_init (condition,attr) pthread_cond_destroy (condition) pthread_condattr_init (attr) pthread_condattr_destroy (attr) |
用法
- 条件变量必须声明为pthread_cond_t类型,并且必须在其使用前进行初始化。有两种方法初始化一个条件变量:
- 在其声明时执行静态初始化,例如
pthread_cond_t myconvar = PTHREAD_COND_INITIALIZER;
- 使用pthread_cond_init()子例程进行动态初始化。创建的条件变量的ID将通过条件参数返回给调用线程。该方法允许设置条件变量对象属性,attr
- 在其声明时执行静态初始化,例如
- 可选的attr对象是用来设置条件变量的属性的。仅有一个属性是为条件变量定义的:进程共享,它允许条件变量被其他进程中的线程看到。属性对象,如果使用的话,必须是pthread_condattr_t类型(也可以定义为NULL来作为默认值)
- 需要注意的是,并非所有的实现都提供进程共享这个属性
- pthread_condattr_init()和pthread_condattr_destroy()子例程是用来创建和销毁条件变量属性对象的
- pthread_cond_destroy()是用来释放不再需要的条件变量的
8.3 在条件变量上等待和发出信号
子例程
1 2 3 4 5 |
pthread_cond_wait (condition,mutex) pthread_cond_signal (condition) pthread_cond_broadcast (condition) |
用法
- pthread_cond_wait()阻塞调用的线程直到被通知指定的条件已经达成。该子例程应该在互斥量被锁住的情况下被调用并且它会在其等待条件变量的时候自动释放互斥锁。在接收到指定的信号后,线程被唤醒并自动的加锁线程所使用的互斥量。程序员有责任在线程完成其工作后释放互斥锁。
- 建议:使用while循环而不是if语句(看下面的watch_count例子)来检查等待的条件,能够有助于解决几个潜在的问题,例如
- 如果有几个线程正在等待相同的唤醒信号,它们将轮流获取互斥锁,并且其中任何一个线程都可以修改所有线程等待的条件。
- 如果线程由于程序错误而收到错误的信号。
- Pthreads库允许在不违反标准的情况下向虚拟线程发出虚假唤醒信号。
- pthread_cond_signal()子例程是用来通知(或者唤醒)另一个在等待特定条件变量的线程。该子例程应该在互斥锁被锁定后调用,并且解锁互斥体才能让pthread_cond_wait()子例程完成
- 在多于一个线程在阻塞等待信号量时,应该调用pthread_cond_breadcast()子例程而不是pthread_cond_signal()
- 在调用pthread_cond_wait()之前调用pthread_cond_signal()会产生逻辑错误
- 当使用这些子例程的时候,适当的锁定的解锁互斥体变量就变得至关重要了,例如:
- 在调用pthread_cond_wait()之前失败的加锁互斥体变量可能导致线程不阻塞
- 在调用pthread_cond_signal()之后失败的解锁互斥体变量,可能无法让匹配的pthread_cond_wait()子例程完成(它将保持阻塞状态)
例子:使用条件变量
- 这个简单的示例代码演示了几个Pthread条件变量例程的使用。
- 主线程创建三个线程
- 其中两个线程执行工作并更新“count”变量。
- 第三个线程等待计数变量达到指定的值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
/****************************************************************************** * FILE: condvar.c * DESCRIPTION: * Example code for using Pthreads condition variables. The main thread * creates three threads. Two of those threads increment a "count" variable, * while the third thread watches the value of "count". When "count" * reaches a predefined limit, the waiting thread is signaled by one of the * incrementing threads. The waiting thread "awakens" and then modifies * count. The program continues until the incrementing threads reach * TCOUNT. The main program prints the final value of count. * SOURCE: Adapted from example code in "Pthreads Programming", B. Nichols * et al. O'Reilly and Associates. * LAST REVISED: 03/07/17 Blaise Barney ******************************************************************************/ #include <pthread.h> #include <stdio.h> #include <stdlib.h> #define NUM_THREADS 3 #define TCOUNT 10 #define COUNT_LIMIT 12 int count = 0; pthread_mutex_t count_mutex; pthread_cond_t count_threshold_cv; void *inc_count(void *t) { int i; long my_id = (long)t; for (i=0; i < TCOUNT; i++) { pthread_mutex_lock(&count_mutex); count++; /* Check the value of count and signal waiting thread when condition is reached. Note that this occurs while mutex is locked. */ if (count == COUNT_LIMIT) { printf("inc_count(): thread %ld, count = %d Threshold reached. ", my_id, count); pthread_cond_signal(&count_threshold_cv); printf("Just sent signal.\n"); } printf("inc_count(): thread %ld, count = %d, unlocking mutex\n", my_id, count); pthread_mutex_unlock(&count_mutex); /* Do some work so threads can alternate on mutex lock */ sleep(1); } pthread_exit(NULL); } void *watch_count(void *t) { long my_id = (long)t; printf("Starting watch_count(): thread %ld\n", my_id); /* Lock mutex and wait for signal. Note that the pthread_cond_wait routine will automatically and atomically unlock mutex while it waits. Also, note that if COUNT_LIMIT is reached before this routine is run by the waiting thread, the loop will be skipped to prevent pthread_cond_wait from never returning. */ pthread_mutex_lock(&count_mutex); while (count < COUNT_LIMIT) { printf("watch_count(): thread %ld Count= %d. Going into wait...\n", my_id,count); pthread_cond_wait(&count_threshold_cv, &count_mutex); printf("watch_count(): thread %ld Condition signal received. Count= %d\n", my_id,count); } printf("watch_count(): thread %ld Updating the value of count...\n", my_id); count += 125; printf("watch_count(): thread %ld count now = %d.\n", my_id, count); printf("watch_count(): thread %ld Unlocking mutex.\n", my_id); pthread_mutex_unlock(&count_mutex); pthread_exit(NULL); } int main(int argc, char *argv[]) { int i, rc; long t1=1, t2=2, t3=3; pthread_t threads[3]; pthread_attr_t attr; /* Initialize mutex and condition variable objects */ pthread_mutex_init(&count_mutex, NULL); pthread_cond_init (&count_threshold_cv, NULL); /* For portability, explicitly create threads in a joinable state */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); pthread_create(&threads[0], &attr, watch_count, (void *)t1); pthread_create(&threads[1], &attr, inc_count, (void *)t2); pthread_create(&threads[2], &attr, inc_count, (void *)t3); /* Wait for all threads to complete */ for (i = 0; i < NUM_THREADS; i++) { pthread_join(threads[i], NULL); } printf ("Main(): Waited and joined with %d threads. Final value of count = %d. Done.\n", NUM_THREADS, count); /* Clean up and exit */ pthread_attr_destroy(&attr); pthread_mutex_destroy(&count_mutex); pthread_cond_destroy(&count_threshold_cv); pthread_exit (NULL); } |