Java线程

只有部分灵魂,学习阶段复习使用,参考资料写的笔记


进程和线程

进程是资源分配的基本单位,线程是CPU调度的基本单位。进程包含线程,线程共用进程的资源。

进程:是一个处于运行过程中的程序,当一个程序装载到内存中,系统为他分配资源之后运行,就变成一个进程,它有一定的独立性。

线程:线程是进程中的一个执行单元,负责当前进程中程序的运行,一个进程至少有一个线程,一个进程可以有多个线程,这个应用程序也可以称之为多线程程序。

多线程:所有线程轮流使用cpu的使用权。

多线程就一定好吗?快吗?

并发编程的目的就是为了能提高程序的执行效率提高程序运行速度,但是并发编程并不总是能提高程序运行速度的,而且并发编程可能会遇到很多问题,比如:内存泄漏、上下文切换、死锁还有受限于硬件和软件的资源闲置问题。

多线程就是几乎同时执行多个线程(一个处理器在某一个时间点上永远都只能是一个线程!即使这个处理器是多核的,除非有多个处理器才能实现多个线程同时运行)。CPU通过给每个线程分配CPU时间片来实现伪同时运行,因为CPU时间片一般很短很短,所以给人一种同时运行的感觉。

上下文切换

当前任务在执行完CPU时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换会这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。

上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。

Linux相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。

上下文切换又分为2种:让步式上下文切换和抢占式上下文切换。前者是指执行线程主动释放CPU,与锁竞争严重程度成正比,可通过减少锁竞争和使用CAS算法来避免;后者是指线程因分配的时间片用尽而被迫放弃CPU或者被其他优先级更高的线程所抢占,一般由于线程数大于CPU可用核心数引起,可通过适当减少线程数和使用协程来避免。

总结一下:

  1. 减少锁的使用。因为多线程竞争锁时会引起上下文切换。
  2. 使用CAS算法。这种算法也是为了减少锁的使用。CAS算法是一种无锁算法。
  3. 减少线程的使用。人物很少的时候创建大量线程会导致大量线程都处于等待状态。
  4. 使用协程。

synchronized关键字

Volatile变量

Volatile 变量具有 synchronized 的可见性特性,但是不具备原子特性和“互斥性”。这就是说线程能够自动发现 volatile 变量的最新值。Volatile 变量可用于提供线程安全,但是只能应用于非常有限的一组用例:多个变量之间或者某个变量的当前值与修改后值之间没有约束。因此,单独使用 volatile 还不足以实现计数器、互斥锁或任何具有与多个变量相关的不变式(Invariants)的类

volatile关键字为实例域的同步访问提供了一种免锁机制。如果声明一个域为volatile,那么编译器和虚拟机就会知道该域是可以能被另一个线程并发更新的。

1
2
3
4
相较于 synchronized 是一种较为轻量级的同步策略。
注意:
1. volatile 不具备“互斥性”
2. volatile 不能保证变量的“原子性”

在 JDK1.2 之前,Java的内存模型实现总是从主存(即共享内存)读取变量,是不需要进行特别的注意的。而在当前的 Java 内存模型下,线程可以把变量保存本地内存(比如机器的寄存器)中,而不是直接在主存中进行读写。这就可能造成一个线程在主存中修改了一个变量的值,而另外一个线程还继续使用它在寄存器中的变量值的拷贝,造成数据的不一致。

006rNwoDgy1fpo4w1256pj307l04m3yf.jpg

在使用volatile修饰之后,都回到主存中读取,解决了数据不一致问题。

006rNwoDgy1fpo4w2fiv9j30d606mmx5.jpg

正确使用 volatile 变量的条件

只能在有限的一些情形下使用 volatile 变量替代锁。要使 volatile 变量提供理想的线程安全,必须同时满足下面两个条件:

  • 对变量的写操作不依赖于当前值。
  • 该变量没有包含在具有其他变量的不变式中。

实际上,这些条件表明,可以被写入 volatile 变量的这些有效值独立于任何程序的状态,包括变量的当前状态。

第一个条件的限制使 volatile 变量不能用作线程安全计数器。虽然增量操作(x++)看上去类似一个单独操作,实际上它是一个由读取-修改-写入操作序列组成的组合操作,必须以原子方式执行,而 volatile 不能提供必须的原子特性。实现正确的操作需要使 x 的值在操作期间保持不变,而 volatile 变量无法实现这点。(然而,如果将值调整为只从单个线程写入,那么可以忽略第一个条件。)

volatile关键字的可见性

volatile 修饰的成员变量在每次被线程访问时,都强迫从主存(共享内存)中重读该成员变量的值。而且,当成员变量发生变化时,强迫线程将变化值回写到主存(共享内存)。这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值,这样也就保证了同步数据的可见性

synchronized和volatile比较

  • volatile关键字是线程同步的轻量级实现,所以volatile性能肯定比synchronized关键字要好。但是volatile关键字只能用于变量而synchronized关键字可以修饰方法以及代码块。synchronized关键字在JavaSE1.6之后进行了主要包括为了减少获得锁和释放锁带来的性能消耗而引入的偏向锁和轻量级锁以及其它各种优化之后执行效率有了显著提升,实际开发中使用synchronized关键字还是更多一些。
  • 多线程访问volatile关键字不会发生阻塞,而synchronized关键字可能会发生阻塞
  • volatile关键字能保证数据的可见性,但不能保证数据的原子性。synchronized关键字两者都能保证。
  • volatile关键字用于解决变量在多个线程之间的可见性,而ynchronized关键字解决的是多个线程之间访问资源的同步性。

悲观锁与乐观锁

悲观锁

总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。Java中synchronizedReentrantLock等独占锁就是悲观锁思想的实现。

乐观锁

总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号机制和CAS算法实现。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库提供的类似于write_condition机制,其实都是提供的乐观锁。在Java中java.util.concurrent.atomic包下面的原子变量类就是使用了乐观锁的一种实现方式CAS实现的。

两种锁的使用场景

从上面对两种锁的介绍,我们知道两种锁各有优缺点,不可认为一种好于另一种,像乐观锁适用于写比较少的情况下(多读场景),即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果是多写的情况,一般会经常产生冲突,这就会导致上层应用会不断的进行retry,这样反倒是降低了性能,所以一般多写的场景下用悲观锁就比较合适。

乐观锁常见的两种实现方式

乐观锁一般会使用版本号机制或CAS算法实现。

版本号机制

一般是在数据表中加上一个数据版本号version字段,表示数据被修改的次数,当数据被修改时,version值会加一。当线程A要更新数据值时,在读取数据的同时也会读取version值,在提交更新时,若刚才读取到的version值为当前数据库中的version值相等时才更新,否则重试更新操作,直到更新成功。

CAS算法

  1. CAS(Compare-And-Swap) 算法保证数据变量的原子性
    • CAS 算法是硬件对于并发操作的支持 (是一种硬件对并发的支持,针对多处理器 操作而设计的处理器中的一种特殊指令,用于管理对共享数据的并发访问。)
    • CAS 是一种无锁的非阻塞算法的实现(效率较高)
    • CAS 包含了三个操作数:
      • 需要读写的内存值 V
      • 进行比较的值 A
      • 拟写入的新值 B
      • 当且仅当 V == A 时, V = B; 否则,不会执行任何操作。

JAVA 如何实现原子操作

在 java 中可以通过锁和循环 CAS 的方式来实现原子操作。

  • 使用循环CAS实现原子性

自旋 CAS 实现的基本思路就是循环进行 CAS 操作直到成功为止。

在 java 并发包中有一些并发框架也使用了自旋 CAS 的方式来实现原子操作,比如 LinkedTransferQueue 类的 Xfer 方法。CAS 虽然很高效的解决原子操作,但是 CAS 仍然存在三大问题。ABA 问题,循环时间长开销大和只能保证一个共享变量的原子操作。

三大问题

ABA 问题

因为 CAS 需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是 A,变成了 B,又变成了 A,那么使用 CAS 进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA 问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么 A-B-A 就会变成 1A-2B-3A。

从 Java1.5 开始 JDK 的 atomic 包里提供了一个类 AtomicStampedReference 来解决 ABA 问题。这个类的 compareAndSet 方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

1
2
3
4
5
public boolean compareAndSet
(V expectedReference,// 预期引用
V newReference,// 更新后的引用
int expectedStamp, // 预期标志
int newStamp) // 更新后的标志
循环时间长开销大

自旋 CAS 如果长时间不成功,会给 CPU 带来非常大的执行开销。如果 JVM 能支持处理器提供的 pause 指令那么效率会有一定的提升,pause 指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline), 使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起 CPU 流水线被清空(CPU pipeline flush),从而提高 CPU 的执行效率。

只能保证一个共享变量的原子操作。

当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就可以用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量 i=2,j=a,合并一下 ij=2a,然后用 CAS 来操作 ij。从 Java1.5 开始 JDK 提供了 AtomicReference 类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作。

CAS与synchronized的使用情景

简单的来说CAS适用于写比较少的情况下(多读场景,冲突一般较少),synchronized适用于写比较多的情况下(多写场景,冲突一般较多)

  1. 对于资源竞争较少(线程冲突较轻)的情况,使用synchronized同步锁进行线程阻塞和唤醒切换以及用户态内核态间的切换操作额外浪费消耗cpu资源;而CAS基于硬件实现,不需要进入内核,不需要切换线程,操作自旋几率较少,因此可以获得更高的性能。
  2. 对于资源竞争严重(线程冲突严重)的情况,CAS自旋的概率会比较大,从而浪费更多的CPU资源,效率低于synchronized。

三大特性

在并发编程中,线程安全问题的本质其实就是 原子性、有序性、可见性;接下来主要围绕这三个问题进行展开分析其本质,彻底了解可见性的特性

  • 原子性 和数据库事务中的原子性一样,满足原子性特性的操作是不可中断的,要么全部执行成功要么全部执行失败
  • 有序性 编译器和处理器为了优化程序性能而对指令序列进行重排序,也就是你编写的代码顺序和最终执行的指令顺序是不一致的,重排序可能会导致多线程程序出现内存可见性问题
  • 可见性 多个线程访问同一个共享变量时,其中一个线程对这个共享变量值的修改,其他线程能够立刻获得修改以后的值

AQS

AbstractQueuedSynchronizer (抽象队列同步器,以下简称 AQS)出现在 JDK 1.5 中,由大师 Doug Lea 所创作。AQS 是很多同步器的基础框架,比如 ReentrantLock、CountDownLatch 和 Semaphore 等都是基于 AQS 实现的。除此之外,我们还可以基于 AQS,定制出我们所需要的同步器。AQS 的使用方式通常都是通过内部类继承 AQS 实现同步功能,通过继承 AQS,可以简化同步器的实现。

在 AQS 内部,通过维护一个FIFO 队列来管理多线程的排队工作。在公平竞争的情况下,无法获取同步状态的线程将会被封装成一个节点,置于队列尾部。入队的线程将会通过自旋的方式获取同步状态,若在有限次的尝试后,仍未获取成功,线程则会被阻塞住。

当头结点释放同步状态后,且后继节点对应的线程被阻塞,此时头结点线程将会去唤醒后继节点线程。后继节点线程恢复运行并获取同步状态后,会将旧的头结点从队列中移除,并将自己设为头结点。

AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。

闭锁

CountDownLatch介绍

CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。

CountDownLatch原理

CountDownLatch是通过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就相应得减1。当计数器到达0时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。

1
CountDownLatch :闭锁,在完成某些运算是,只有其他所有线程的运算全部完成,当前运算才继续执行

CyclicBarrier(循环栅栏)

CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。

CyclicBarrier和CountDownLatch的区别

CountDownLatch是计数器,只能使用一次,而CyclicBarrier的计数器提供reset功能,可以多次使用。javadoc是这么描述它们的:

CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;) CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一起执行。)

对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。

CountDownLatch是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。

countdownlatch和cyclicbarrier.png

并发容器

JDK提供的这些容器大部分在 java.util.concurrent 包中。

  • ConcurrentHashMap: 线程安全的HashMap

  • CopyOnWriteArrayList: 线程安全的List,在读多写少的场合性能非常好,远远好于Vector.

  • ConcurrentLinkedQueue:高效的并发队列,使用链表实现。可以看做一个线程安全的 LinkedList,这是一个非阻塞队列。

  • BlockingQueue: 这是一个接口,JDK内部通过链表、数组等方式实现了这个接口。表示阻塞队列,非常适合用于作为数据共享的通道。

  • ConcurrentSkipListMap: 跳表的实现。这是一个Map,使用跳表的数据结构进行快速查找。

CopyOnWriteArrayList/CopyOnWriteArraySet : “写入并复制”

  • 注意:添加操作多时,效率低,因为每次添加时都会进行复制,开销非常的大。并发迭代操作多时可以选择。

Lock锁

锁是用于通过多个线程控制对共享资源的访问的工具。通常,锁提供对共享资源的独占访问:一次只能有一个线程可以获取锁,并且对共享资源的所有访问都要求首先获取锁。 但是,一些锁可能允许并发访问共享资源,如ReadWriteLock的读写锁。

在Lock接口出现之前,Java程序是靠synchronized关键字实现锁功能的。JDK1.5之后并发包中新增了Lock接口以及相关实现类来实现锁功能。

虽然synchronized方法和语句的范围机制使得使用监视器锁更容易编程,并且有助于避免涉及锁的许多常见编程错误,但是有时您需要以更灵活的方式处理锁。例如,用于遍历并发访问的数据结构的一些算法需要使用“手动”或“链锁定”:您获取节点A的锁定,然后获取节点B,然后释放A并获取C,然后释放B并获得D等。在这种场景中synchronized关键字就不那么容易实现了,使用Lock接口容易很多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Ticket implements Runnable{

private int ticket = 100;

private Lock lock = new ReentrantLock();


@Override
public void run() {
while (true){
lock.lock();
try {
if(ticket > 0){

System.out.println(Thread.currentThread().getName() + "当前正在卖第" + ticket + "票");
ticket--;
}
}finally {
lock.unlock();
}
}
}

// int ticket = 100;
// @Override
// public void run() {
//
// while (true){
// synchronized (Ticket.class){
// if(ticket > 0){
// System.out.println(Thread.currentThread().getName() + "当前正在卖第" + ticket + "票");
// ticket--;
// }
// }
//
// }
// }
}

生产者消费者

synchronized方式的生产者和消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/**
* @author kokio
* @create 2018-12-24 18:30
* 生产者和消费者
*/
public class TestProductorAndConsumer {

public static void main(String[] args) {

Clerk clerk = new Clerk();
Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);

Thread tproductor = new Thread(productor,"生产者");
Thread tconsumer = new Thread(consumer,"消费者");

Thread tproductor1 = new Thread(productor,"生产者");
Thread tconsumer1 = new Thread(consumer,"消费者");

tproductor.start();
tconsumer.start();
tproductor1.start();
tconsumer1.start();

}


}

//店员
class Clerk {
private int product = 0;

//进货
public synchronized void get(){
while (product >= 1){ //为了避免虚假唤醒问题,应该总是使用在循环中
System.out.println("产品已满");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":"+ ++product);
this.notifyAll();

}

//卖货
public synchronized void sale(){
while (product <= 0){
System.out.println("没有产品");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":" + --product);
this.notifyAll();

}
}
//生产者
class Productor implements Runnable {

private Clerk clerk;

public Productor(Clerk clerk){
this.clerk = clerk;
}

@Override
public void run() {

for(int i = 0; i < 20; i++){

try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}

clerk.get();
}
}

}

class Consumer implements Runnable {

private Clerk clerk;

public Consumer(Clerk clerk){
this.clerk = clerk;
}

@Override
public void run() {
for(int i = 0; i < 20; i++){
clerk.sale();
}
}
}

使用Lock的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
//店员
class Clerk {
private int product = 0;

private Lock lock = new ReentrantLock();

private Condition condition = lock.newCondition();

//进货
public void get(){
lock.lock();
try {
while (product >= 1){ //为了避免虚假唤醒问题,应该总是使用在循环中
System.out.println("产品已满");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":"+ ++product);
condition.signalAll();
}finally {
lock.unlock();
}

}

//卖货
public void sale(){

lock.lock();

try {

while (product <= 0){
System.out.println("没有产品");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ":" + --product);
condition.signalAll();
}finally {
lock.unlock();
}


}
}

//生产者
class Productor implements Runnable {

private Clerk clerk;

public Productor(Clerk clerk){
this.clerk = clerk;
}

@Override
public void run() {

for(int i = 0; i < 20; i++){

try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}

clerk.get();
}
}

}

class Consumer implements Runnable {

private Clerk clerk;

public Consumer(Clerk clerk){
this.clerk = clerk;
}

@Override
public void run() {
for(int i = 0; i < 20; i++){
clerk.sale();
}
}
}

线程按序交替

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
/**
* @author kokio
* @create 2018-12-24 22:06
*/
public class TestAlternate {

public static void main(String[] args) {
AlternateDemo alternateDemo = new AlternateDemo();
new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0;i < 10;i++){
alternateDemo.loopA(i);

}
}
},"A").start();

new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0;i < 10;i++){
alternateDemo.loopB(i);

}
}
},"B").start();

new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0;i < 10;i++){
alternateDemo.loopC(i);

}
}
},"C").start();
}

}


class AlternateDemo {

private int number = 1;

private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();




public void loopA(int totalLoop){
lock.lock();

try {

if(number != 1){
condition1.await();
}
for(int i = 0; i < 1; i++){
System.out.println(Thread.currentThread().getName() + ":A");
}
number = 2;
condition2.signal();

} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void loopB(int totalLoop){
lock.lock();

try {

if(number != 2){
condition2.await();
}
for(int i = 0; i < 1; i++){
System.out.println(Thread.currentThread().getName() + ":B");
}
number = 3;
condition3.signal();

} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void loopC(int totalLoop){
lock.lock();

try {

if(number != 3){
condition3.await();
}
for(int i = 0; i < 1; i++){
System.out.println(Thread.currentThread().getName() + ":C");
}
number = 1;
condition1.signal();

} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

}

读写锁(ReadWriterLock)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* @author kokio
* @create 2018-12-24 22:47
* 写和写之间互斥 写和读之间互斥
* 读读之间不需要互斥
* A ReadWriteLock维护一对关联的locks ,一个用于只读操作,一个用于写入。
* read lock可以由多个线程同时进行,只要没有写。 write lock是独家的。
*/
public class TestReadWriteLock {

public static void main(String[] args) {
ReadWriteLockDemo readWriteLockDemo = new ReadWriteLockDemo();
new Thread(new Runnable() {
@Override
public void run() {
readWriteLockDemo.set(new Random().nextInt(10));
}
},"Writer").start();


for(int i = 0; i < 100; i ++){
new Thread(new Runnable() {
@Override
public void run() {

readWriteLockDemo.get();
}
},"Read" + i).start();
}

}


}
class ReadWriteLockDemo {

private int number = 0;

private ReadWriteLock lock = new ReentrantReadWriteLock();


//读
public void get(){
lock.readLock().lock();//上锁

try {

System.out.println(Thread.currentThread().getName() + " :" +number);

}finally {
lock.readLock().unlock();//解锁
}
}

//写锁
public void set(int number){

lock.writeLock().lock();

try {

System.out.println(Thread.currentThread().getName());
this.number = number;

}finally {
lock.writeLock().unlock();
}
}


}

线程八锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* @author kokio
* @create 2018-12-24 23:44
* 1.两个普通同步方法,两个线程,打印:one two
* 2.新增Thread.sleep() 给getOne,打印 :3秒后出one two
* 3.新增普通方法 getThree() , 打印:立刻打印three 3秒后 one two
* 4. 两个普通同步方法,两个 Number 对象, //two 3秒后 one
* 5. 修改 getOne() 为静态同步方法, //two 3秒后 one
* 6. 修改两个方法均为静态同步方法,一个 Number 对象 //one two
* 7. 一个静态同步方法,一个非静态同步方法,两个 Number 对象 //two one
* 8. 两个静态同步方法,两个 Number 对象? //3秒后 one two
* 线程八锁的关键:
* - 非静态方法的锁默认为 this, 静态方法的锁为 对应的 Class 实例
* - 某一个时刻内,只能有一个线程持有锁,无论几个方法。
*/
public class TestThread8Monitor {

public static void main(String[] args) {
Number number = new Number();
Number number1 = new Number();


new Thread(new Runnable() {
@Override
public void run() {
number.getOne();
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
// number.getTwo();
number1.getTwo();
}
}).start();

// new Thread(new Runnable() {
// @Override
// public void run() {
// number.getThree();
// }
// }).start();

}


}

class Number {

public static synchronized void getOne(){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("one");
}
public static synchronized void getTwo(){
System.out.println("two");
}

public void getThree(){
System.out.println("three");
}

}

Fork/Join框架

Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

采用 “工作窃取”模式(work-stealing):
当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列

相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务 的处理方式上.在一般的线程池中,如果一个线程正在执行的任务由于某些 原因无法继续运行,那么该线程会处于等待状态。而在fork/join框架实现中, 如果某个子问题由于等待另外一个子问题的完成而无法继续运行。那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了 线程的等待时间,提高了性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class TestForkJoinPool {

public static void main(String[] args) {
Instant start = Instant.now();

ForkJoinPool pool = new ForkJoinPool();

ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L, 50000000000L);

Long sum = pool.invoke(task);

System.out.println(sum);

Instant end = Instant.now();

System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());
}

@Test
public void test1(){
Instant start = Instant.now();

long sum = 0L;

for (long i = 0L; i <= 50000000000L; i++) {
sum += i;
}

System.out.println(sum);

Instant end = Instant.now();

System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());
}

//java8 新特性
@Test
public void test2(){
Instant start = Instant.now();

Long sum = LongStream.rangeClosed(0L, 50000000000L)
.parallel()
.reduce(0L, Long::sum);

System.out.println(sum);

Instant end = Instant.now();

System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());
}

}

class ForkJoinSumCalculate extends RecursiveTask<Long> {




private long start;
private long end;

private static final long THERSHOLD = 10000L; //临界值

public ForkJoinSumCalculate(long start, long end) {
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
long length = end - start;

if(length <= THERSHOLD){
long sum = 0L;

for (long i = start; i <= end; i++) {
sum += i;
}

return sum;
}else{
long middle = (start + end) / 2;

ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle);
left.fork(); //进行拆分,同时压入线程队列

ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle+1, end);
right.fork(); //

return left.join() + right.join();
}
}
}

创建线程的四种方式

一、继承Thread

  1. 定义一个类继承Thread
  2. 重写run方法
  3. 创建子类对象,就是创建线程对象
  4. 调用start方法,开启线程并让线程执行,同时还会告诉jvm去调用run方法

注意:

  • 线程对象调用run方法和调用start方法区别

线程对象调用run方法仅仅只是像调用普通方法一样,线程调用start方法,开启线程,并让jvm去调用run方法在开启的线程中执行。

  • 创建线程的目的

为了建立程序单独的执行路径,让多部分代码实现同时执行,线程创建并执行,需要给定线程要执行的任务。

Thread类的run方法中的任务并不是我们所需要的,只有重写这个run方法。

二、实现Runnable接口

创建线程的另一种方法是声明实现Runnable接口的类,该类然后实现run方法,创建Runnable的子类对象,传入到某个线程的构造方法中,开启线程。

  1. 定义实现Runnable接口
  2. 重写Runnable接口的中的run方法,将线程运行的代码放入run方法中
  3. 通过Thread类建立线程对象
  4. 将Runnable接口的子类对象作为实际参数,传递给Thread类构造方法
  5. 调用Thread类的start方法开启线程,会调用Runnable接口子类的run方法

为什么要将Runnable接口的子类对象传递给Thread的构造函数:

自定义的run方法所属对象是Runnable接口的子类对象,所以要让线程去执行指定对象的run方法。

实现Runnable的原理

为什么需要定一个类去实现Runnable接口呢,继承Thread类和实现Runnable接口有啥区别

实现Runnable接口,避免了继承Thread类的单继承局限性,覆盖Runnable接口中的run方法,将线程任务代码定义到run方法中。

创建Thread类的对象,只有创建Thread类的对象才可以创建线程。

线程任务已被封装到Runnable接口的run方法中,而这个run方法所属于Runnable接口的子类对象,所以将这个子类对对象作为参数传递给Thread的构造函数,这样,线程对象创建时就可以明确要运行的线程任务。

Thread和Runnable的比较

实现Runnable接口避免了单继承的局限性

实现Runnable接口的方式,更加符合面向对象,线程分为两部分,一部分是线程对象,一部分线程任务。继承Thread类,线程对象和线程任务耦合在一起。一旦创建Thread类的子类对象,既是线程对象,又是线程任务。实现Runnable接口,将线程任务单独分离出来封装成对象,类型就是Runnable接口类型。Runnable接口对线程对象和线程任务进行解耦。

三、实现Callable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/*
* 一、创建执行线程的方式三:实现 Callable 接口。 相较于实现 Runnable 接口的方式,方法可以有返回值,并且可以抛出异常。
*
* 二、执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。 FutureTask 是 Future 接口的实现类
*/
public class TestCallable {

public static void main(String[] args) {
ThreadDemo td = new ThreadDemo();

//1.执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。
FutureTask<Integer> result = new FutureTask<>(td);

new Thread(result).start();

//2.接收线程运算后的结果
try {
Integer sum = result.get(); //FutureTask 可用于 闭锁
System.out.println(sum);
System.out.println("------------------------------------");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

}

class ThreadDemo implements Callable<Integer>{

@Override
public Integer call() throws Exception {
int sum = 0;

for (int i = 0; i <= 100000; i++) {
sum += i;
}

return sum;
}

}

四、线程池

线程池提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提交了响应速度。

  • 降低资源消耗。 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。 当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池.jpg

Executor框架使用

  • 主线程首先要创建实现Runnable或者Callable接口的任务对象。
    备注: 工具类Executors可以实现Runnable对象和Callable对象之间的相互转换。(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object resule))。

  • 然后可以把创建完成的Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnable command));或者也可以把Runnable对象或Callable对象提交给ExecutorService执行(ExecutorService.submit(Runnable task)或ExecutorService.submit(Callable task))。

执行execute()方法和submit()方法的区别是什么呢?
1)execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
2)submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

  • 如果执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象(我们刚刚也提到过了执行execute()方法和submit()方法的区别,到目前为止的JDK中,返回的是FutureTask对象)。由于FutureTask实现了Runnable,程序员也可以创建FutureTask,然后直接交给ExecutorService执行。

  • 最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* @author kokio
* @create 2018-12-25 1:33
* 线程池的体系结构:
* java.util.concurrent.Executor : 负责线程的使用与调度的根接口
* ExecutorService 子接口: 线程池的主要接口
* ThreadPoolExecutor 线程池的实现类
* ScheduledExecutorService 子接口:负责线程的调度
* ScheduledThreadPoolExecutor :继承 ThreadPoolExecutor, 实现 ScheduledExecutorService
*
* 工具类 : Executors
* ExecutorService newFixedThreadPool() : 创建固定大小的线程池
* ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。
* ExecutorService newSingleThreadExecutor() : 创建单个线程池。线程池中只有一个线程
*
* ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务。
*/
public class TestThreadPool {

public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建线程池
ExecutorService pool = Executors.newFixedThreadPool(5);

List<Future<Integer>> futures = new ArrayList<>();

for(int i = 0; i < 10; i++){
Future<Integer> future = pool.submit(new Callable<Integer>() {

@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 1; i <= 100; i++) {
sum += i;
}
return sum;
}
});


futures.add(future);
}


for(Future<Integer> future : futures){
System.out.println(future.get());
}


// ThreadPoolDemo threadPoolDemo = new ThreadPoolDemo();

// //为线程池中的线程分配任务
// for(int i = 0; i < 10; i++){
// pool.submit(threadPoolDemo);
//
// }

//关闭线程池
pool.shutdown();
// pool.shutdownNow()//立即关闭
}
}

class ThreadPoolDemo implements Runnable {

private int i = 10;

@Override
public void run() {
while (i <= 100){
System.out.println(Thread.currentThread().getName() + ":" + i++);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* @author kokio
* @create 2018-12-25 1:55
* ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务。调度
*/
public class TestScheduleThreadPool {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ScheduledExecutorService pool =
Executors.newScheduledThreadPool(6);


for(int i = 0; i < 10; i++){
ScheduledFuture<Integer> future = pool.schedule(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int num = new Random().nextInt(190);
System.out.println(Thread.currentThread().getName() + ":" + num);
return num;
}
}, 1, TimeUnit.SECONDS); //延迟1秒,自己可以设置

System.out.println(future.get());
}

pool.shutdown();



}
}

有部分也没有我灵魂,尤其是AQS方面,引用的地方都以附上连接

------------- 感谢阅读-------------