并发学习笔记
- 四、并发编程基础
- 1、为什么使用多线程
- 2、如何设置线程优先级
- 3、线程有哪些状态
- 4、如何达到线程的各种状态
- 5、线程各种状态之间如何转换
- 6、Daemon线程有什么特点,如何创建Daemon线程
- 7、线程的中断是什么,如何进行中断相关操作
- 8、中断有什么用?
- 9、suspend()、resume()和stop()作用
- 10、使用 stop/interrupt 结束进程有什么区别?
- 11、synchronized底层实现原理?
- 12、如何实现 等待/通知 机制?
- 13、如何实现线程间的 管道输入/输出流?
- 14、Thread.join()有什么用?
- 15、ThreadLocal如何使用?
- 16、如何实现等待超时模式?
- 17、如何使用线程池?
- 17、如何实现一个简单的线程池?
- 18、如何用线程池实现简单的Web服务器?
- 五、Java中的锁
- 1、synchronized 和 Lock接口 比较优缺点?
- 2、Lock 接口如何使用?
- 3、如何实现独占锁?
- 4、AbstractQueuedSynchronizer 内部如何实现?
- 5、LockSupport的用途?
- 6、LockSupport的park/unpark与 object.wait/notify的区别?
- 7、如何实现TwinsLock?
- 8、什么是『重入锁』?如何实现『重入锁』?
- 9、『公平锁』和『非公平锁』区别?如何实现?
- 10、『读写锁』如何使用?如何实现『读写锁』?
- 11、除了
wait()
+notify()
+synchronize()
,如何实现『等待/通知』模式? - 12、Condition内部如何实现?
四、并发编程基础
1、为什么使用多线程
- 更多的处理器核心
- 更快的响应速度
- 更好的编程模型
2、如何设置线程优先级
- 如何设置线程的优先级
- Thread.setPriority()
- 线程的优先级是否可以保证
- 不同平台下的实现不一样,无法保证
3、线程有哪些状态
状态名称 | 说明 |
---|---|
NEW | 初始状态,线程被构建,但还没有调用start()方法。 |
RUNNABLE | 运行状态,Java线程将操作系统中的就绪和运行两种状态统一称为RANNABLE |
BLOCK | 阻塞状态,表示线程阻塞于锁 |
WAITING | 等待状态,表示当前线程需要等待其它线程做出一些待定动作(通知或中断) |
TIME_WAITING | 超时等待状态,类似于WAITING,但可以在指定时间后自行返回 |
TERMINATED | 终止状态,表示当前线程已经执行完毕 |
4、如何达到线程的各种状态
public class Test {
public static void main(String[] args) {
new Thread(new TimeWaiting(), "TimeWaiting").start();
new Thread(new Waiting(), "Waiting").start();
new Thread(new Blocked(), "Blocked").start();
}
}
class TimeWaiting implements Runnable {
@Override
public void run() {
while (true) {
SleepUtils.second(100);
}
}
}
class Waiting implements Runnable {
@Override
public void run() {
while (true) {
synchronized (Waiting.class) {
try {
Waiting.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
class Blocked implements Runnable {
@Override
public void run() {
synchronized (Blocked.class) {
while (true) {
SleepUtils.second(100L);
}
}
}
}
class SleepUtils {
public static final void second(long seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
jps
后查看
7827 Jps
998 RemoteMavenServer
7783 Launcher
7784 Test
5341 Launcher
jstack 7784
后查看
......
"Blocked" prio=5 tid=0x00007fcbe4845000 nid=0x5903 waiting on condition [0x00007000048fd000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:360)
at test.SleepUtils.second(Test.java:60)
at test.Blocked.run(Test.java:51)
- locked <0x00000007aad2fb10> (a java.lang.Class for test.Blocked)
at java.lang.Thread.run(Thread.java:745)
"Waiting" prio=5 tid=0x00007fcbe302b000 nid=0x5703 in Object.wait() [0x00007000047fa000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000007aad2c388> (a java.lang.Class for test.Waiting)
at java.lang.Object.wait(Object.java:503)
at test.Waiting.run(Test.java:36)
- locked <0x00000007aad2c388> (a java.lang.Class for test.Waiting)
at java.lang.Thread.run(Thread.java:745)
"TimeWaiting" prio=5 tid=0x00007fcbe4090000 nid=0x5503 waiting on condition [0x00007000046f7000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:360)
at test.SleepUtils.second(Test.java:60)
at test.TimeWaiting.run(Test.java:24)
at java.lang.Thread.run(Thread.java:745)
5、线程各种状态之间如何转换
Java线程状态变迁如图所示,
6、Daemon线程有什么特点,如何创建Daemon线程
- Java虚拟机中不存在非Daemon线程时,Java虚拟机会退出。
- 通过
Thread.setDaemon(true)
来指定Daemon线程。
以下线程输出内容是?
public class Daemon {
public static void main(String[] args) {
Thread thread = new Thread(new DaemonRunner());
thread.setDaemon(true);
thread.start();
}
}
class DaemonRunner implements Runnable {
public void run() {
try {
SleepUtils.seconds(100L);
} finally {
System.out.print("Daemon Thread Exit.");
}
}
}
- 结果:什么都不会输出
- 注意:Daemon线程不能依靠finally的内容来进行关闭清理资源所及。
7、线程的中断是什么,如何进行中断相关操作
- 终端是什么
终端可以理解为终端的一个boolean属性,标识运行中的线程是否被其他线程进行终端操作。
- 如何进行终端相关操作
- 中断:通过thread.interrupt()进行中断操作
- 判断:通过isInterrupted()判断是否被中断
- 复位:通过Thread.interrupted()对当前的中断位进行复位
- 复位:抛出InterruptedException的方法会先对中断位进行复位,然后抛Exception
以下两个线程的输出是什么?
public class Interrupted {
public static void main(String[] args) throws Exception {
// sleepThread不停的尝试睡眠
Thread sleepThread = new Thread(new SleepRunner(), "SleepThread");
sleepThread.setDaemon(true);
// busyThread不停的运行
Thread busyThread = new Thread(new BusyRunner(), "BusyThread");
busyThread.setDaemon(true);
sleepThread.start();
busyThread.start();
// 休眠5秒,让sleepThread和busyThread充分运行
TimeUnit.SECONDS.sleep(2);
sleepThread.interrupt();
busyThread.interrupt();
System.out.println("SleepThread interrupted is " + sleepThread.isInterrupted());
System.out.println("BusyThread interrupted is " + busyThread.isInterrupted());
// 防止sleepThread和busyThread立刻退出
TimeUnit.SECONDS.sleep(2);
}
static class SleepRunner implements Runnable {
@Override
public void run() {
while (true) {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
//e.printStackTrace();
}
}
}
}
static class BusyRunner implements Runnable {
@Override
public void run() {
while (true) {
}
}
}
}
程序输出结果:
SleepThread interrupted is false
BusyThread interrupted is true
8、中断有什么用?
中断作为一种简便的线程间交互方式,非常适合用来取消或终止线程。
另一种取消或终止线程的方式是采用一个boolean变量来控制线程。
例1,创建一个线程进行计数,主线程对这个线程取消操作:
public class Test {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new CountTask());
thread.start();
TimeUnit.SECONDS.sleep(1L);
thread.interrupt();
}
}
class CountTask implements Runnable {
private volatile boolean on = true;
private long count = 0L;
@Override
public void run() {
while (on && !Thread.currentThread().isInterrupted()) {
count++;
}
System.out.println(c);
}
}
例2,创建一个生产者,一个消费者和一个BlockingQueue,在任务消费完后结束两个任务:
public class Test {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> usernameQueue = new LinkedBlockingQueue<>(5);
Thread producer = new Thread(new ProducerTask(usernameQueue));
Thread consumer = new Thread(new ConsumerTask(usernameQueue));
producer.start();
consumer.start();
//取消代码-------start--------
while (true) {
if (!producer.isAlive() && usernameQueue.size()==0) {
//如果生产者已经生产完了,且队列已经消费完了
consumer.interrupt();
break;
} else {
TimeUnit.SECONDS.sleep(1L);
}
}
//取消代码-------end--------
}
}
class ProducerTask implements Runnable {
private BlockingQueue<String> usernameQueue;
public ProducerTask(BlockingQueue<String> usernameQueue) {
this.usernameQueue = usernameQueue;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
usernameQueue.put(i + "");
} catch (InterruptedException e) {
return;
}
}
}
}
class ConsumerTask implements Runnable {
private BlockingQueue<String> usernameQueue;
public ConsumerTask(BlockingQueue<String> usernameQueue) {
this.usernameQueue = usernameQueue;
}
@Override
public void run() {
String username;
try {
while ((username = usernameQueue.take()) != null) {
System.out.println(username);
}
} catch (InterruptedException e) {
return;
}
}
}
9、suspend()、resume()和stop()作用
- 作用:把一个线程的运作比作录音机,这个线程的暂停、继续播放、停止 分别对应 Thread的suspend()、resume()、stop()
- 过时的API:
- suspend()时不会释放锁,而是占着资源入睡,容易死锁。
- stop()时不保证资源的正常释放,没有给线程释放资源机会,导致线程运行在不确定状态下。
10、使用 stop/interrupt 结束进程有什么区别?
- stop: 不保证资源的正常释放,没有给线程释放资源机会,导致线程运行在不确定状态下。
- interrupt: 线程终止时有机会去清理资源,而不是武断的终止线程,这种做法更加安全和优雅。
11、synchronized底层实现原理?
-
作用:
synchronized可以修饰方法或者修饰方法,用于多线程时只有一个线程在方法或代码块中,保证了线程对变量的可见性和排他性。 -
原理:
public class Test {
public static void main(String[] args) {
//对Test Class对象进行加锁
synchronized (Test.class) {
}
f();
}
//对Test Class对象进行加锁(因为是静态方法)
public static synchronized void f() {
}
}
通过javap -verbose Test.class
进行反编译,得到以下结果:
4: astore_1
5: monitorenter
6: aload_1
7: monitorexit
......
public static synchronized void f();
descriptor: ()V
flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
Code:
同步代码块采用moniterenter
和moniterexit
实现,同步静态方法依靠ACC_SYNCHRONIZED
来实现,而本质上是对一个**监视器(monitor)**的获取。某个线程只有获得了一个对象的监视器,才能执行对应的同步代码块或者同步方法。
对象、对象监视器、同步队列和执行线程之间关系如下:
12、如何实现 等待/通知 机制?
12.1、什么是等待/通知机制?
一个线程(a线程)修改了一个对象,另一个线程(b线程)感知到了变化,然后进行相应操作。
12.2、等待/通知机制有什么好处?
这种模式开始于一个线程,最终执行是另一个线程。这是模式隔离了『何时做(When)』和『怎么做(How)』,功能层面实现了解耦,体系结构层面具备良好的伸缩性。
12.3、如何实现等待/通知机制?
方法1
//B线程
while (value =! desired) {
Thread.sleep(1000);
}
缺点:
- 无法保证响应及时性
- 性能消耗比较高
方法2
采用wait()和notify()来解决该问题:
- b线程调用对象o.wait()进入等待状态,
- a线程调用对象o.notify()来唤醒b线程,
- b线程醒来进行下面的操作。
方法名 | 描述 |
---|---|
notify() | 通知一个在对象上等待的线程,使其从wait()返回,返回的前提是获得了对象的锁 |
notifyAll() | 通知所有在对象上等待的线程 |
wait() | 调用该方法的线程进入WAITING状态,接收到其他线程通知才会返回。 调用wait()后会释放对象的锁。 |
wait(long) | 超时等待一段时间,被通知或者超时返回。 单位毫秒。 |
wait(long, int) | 超时等待。 单位可以指定。 |
public class Test {
private static Object lock = new Object();
private static boolean isDesired = false;
public static void main(String[] args) {
new Thread(new Wait(), "Wait").start();
new Thread(new Notify(), "Notify").start();
}
static class Wait implements Runnable {
@Override
public void run() {
synchronized (lock) {
while (!isDesired) {
try {
System.out.println(Thread.currentThread() + " is waiting");
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println(Thread.currentThread() + " start doing following things");
}
}
static class Notify implements Runnable {
@Override
public void run() {
synchronized (lock) {
System.out.println(Thread.currentThread() + " make isDesired=true");
isDesired = true;
lock.notifyAll();
}
}
}
}
输出如下:
Thread[Wait,5,main] is waiting
Thread[Notify,5,main] make isDesired=true
Thread[Wait,5,main] start doing following things
注意:
- 调用wait()\notify()\notifyAll()需要首先对对象加锁
- 调用wait()后,线程状态由 RUNNING 变为 WAITING,并把当前线程加入等待队列。
- 调用notify()和notifyAll()后,
- 线程不会直接返回,而是需要获得锁之后才返回。
- 从等待队列移动到同步队列。
- 状态由WAITING变为BLOCKING。
- notify()和notifyAll()区别:前者把一个线程从等待队列移动到同步队列,后者将所有线程从等待队列移动到同步队列。
等待/通知 经典范式:
- 等待方
1)获取对象的锁。
2)如果条件不满足,调用对象的wait(),被通知后仍要检查条件。
3)条件满足则执行相应逻辑。
对应伪代码如下:
synchronized(对象) {
while (条件不满足) {
对象.wait();
}
}
- 通知方
1)获得对象的锁。
2)改变条件。
3)通知所有等待在对象上的线程。
对应伪代码如下:
synchronized(对象) {
改变条件();
对象.notifyAll();
}
13、如何实现线程间的 管道输入/输出流?
通过PipedInputStream/PipedOutputStream/PipedReader/PipedWriter
实现线程之间的数据传输,传输的媒介为内存。
public class Test {
public static void main(String[] args) throws IOException {
//创建管道并绑定
PipedReader pipedReader = new PipedReader();
PipedWriter pipedWriter = new PipedWriter();
pipedWriter.connect(pipedReader);
//创建输入输出线程
new Thread(new PrintTask(pipedReader)).start();
new Thread(new ReadTask(pipedWriter)).start();
}
}
class ReadTask implements Runnable {
private PipedWriter pipedWriter;
public ReadTask(PipedWriter pipedWriter) {
this.pipedWriter = pipedWriter;
}
@Override
public void run() {
int receive = 0;
try {
while ((receive = System.in.read()) != -1) {
pipedWriter.write(receive);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
pipedWriter.close();
} catch (IOException e) {
}
}
}
}
class PrintTask implements Runnable {
private PipedReader pipedReader;
public PrintTask(PipedReader pipedReader) {
this.pipedReader = pipedReader;
}
@Override
public void run() {
int receive = 0;
try {
while ((receive = pipedReader.read()) != -1) {
System.out.print((char) receive);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
pipedReader.close();
} catch (IOException e) {
}
}
}
}
14、Thread.join()有什么用?
如果线程A执行了thread.join()方法,代表thread线程终止之后才从thread.join()返回。
Thread.join() 的源码如下(进行部分调整)
public final synchronized void join()
throws InterruptedException {
while (isAlive()) {
wait(0);
}
//条件符合 方法返回
}
线程终止时,会调用线程资深的notifyAll方法,通知所有wait的线程。
join()的实现也是采用 等待/通知 机制。
15、ThreadLocal如何使用?
ThreadLocal是一个为每个线程单独存储结构。
如,以下分析器可以用来统计调用时间。
public class Test {
public static void main(String[] args) throws IOException, InterruptedException {
Profiler.begin();
Thread.sleep(1);
System.out.println(Profiler.end());
}
}
class Profiler {
private static ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<>();
public static void begin() {
TIME_THREADLOCAL.set(System.currentTimeMillis());
}
public static long end() {
return System.currentTimeMillis() - TIME_THREADLOCAL.get();
}
}
这一分析器的好处在于两个方法的调用不用在同一个方法或者同一个类中。
16、如何实现等待超时模式?
16.1、什么是 等待超时机制?
调用一个方法等待一段时间,如果该时间段内得到结果则立即返回,否则超时返回默认结果。
16.2、如何实现 等待超时 机制?
在 等待/通知 做一个小小的改动即可。
问题:
当前有一个方法tryGet(),非阻塞获取一个结果,获取到则返回,获取不到返回null。
- 问题1. 将该方法改写为阻塞获取,获取不到则阻塞直至获取到。
- 问题2. 将该方法改写为阻塞获取并等待一段时间,该时间内获取不到则返回null。
问题解决:
- 问题1. 阻塞获取
public synchronized Object get() throws InterruptedException {
Object result = null;
while (((result = tryGet()) == null)) {
wait();
}
return result;
}
-
问题2. 阻塞获取并等待一段时间
- 等待超时 只需要在 等待/通知 做一个小小的改动即可。
- 等待变量:remaing = t
- 超时时间:future = now + remaining
public synchronized Object get(long t) throws InterruptedException {
long remaining = t;
long future = System.currentTimeMillis() + t;
Object result = null;
while (((result = tryGet()) == null)
&& remaining > 0) {
wait(remaining);
remaining = future - System.currentTimeMillis();
}
return result;
}
16.3、等待超时应用:数据库连接池
class ConnectionPool {
private List<Connection> pool = new LinkedList<>();
public ConnectionPool(int initSize) {
if (initSize <= 0) {
return;
}
for (int i = 0; i <initSize; i++) {
pool.add(ConnectionDriver.crateConnection());
}
}
public Connection fatchConnection(long timeout) throws InterruptedException {
synchronized (pool) {
if (timeout <= 0) {
//等待通知 机制
while (pool.isEmpty()) {
pool.wait();
}
return pool.remove(0);
} else {
//等待超时 机制
long remaining = timeout;
long future = System.currentTimeMillis() + timeout;
while (pool.isEmpty() && remaining > 0) {
pool.wait(remaining);
remaining = future - System.currentTimeMillis();
}
Connection result = null;
if (!pool.isEmpty()) {
result = pool.remove(0);
}
return result;
}
}
}
public void releaseConnection(Connection connection) {
if (connection == null) {
return;
}
synchronized (pool) {
pool.add(connection);
pool.notifyAll();
}
}
}
虽然客户端采用等待超时获取方式获取Connection失败,但是这种机制保证客户端不糊一致挂在这个获取连接操作上,则是『按时』返回,并告知客户端获取出现问题,是系统的一种自我保护机制。
17、如何使用线程池?
public class TestH_ThreadPool {
class TestTask implements Runnable {
AtomicInteger count = new AtomicInteger(0);
public void run() {
for (int i = 0; i < 10000; i++) {
count.incrementAndGet();
}
System.out.println(count.get());
}
}
//ThreadPool错误的用法
//输出:
//10000
//10000
//10000
@Test
public void test1() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.execute(new TestTask());
executorService.execute(new TestTask());
executorService.execute(new TestTask());
Thread.sleep(2000);
}
//ThreadPool正确的用法
//输出:
//28064
//29221
//30000
@Test
public void test2() throws InterruptedException {
Runnable task = new TestTask();
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.execute(task);
executorService.execute(task);
executorService.execute(task);
Thread.sleep(2000);
}
}
17、如何实现一个简单的线程池?
17.1、为什么使用线程池
- 如果为每个任务创建一个线程,会有两个问题
- 创建和销毁线程是有开销的,大量创建线程浪费资源。
- 线程切换存在上下文切换的开销,线程过多会花费大量实现在上下文切换上。
- 使用线程池有两个好处:
- 节省了上面提到的两个问题的开销。
- 面对过量的任务能够平缓劣化。
17.2、如何实现简单的线程池
- 线程池接口定义
interface ThreadPool<Job extends Runnable> {
//执行一个线程
void execute();
//关闭线程池
void shutdown();
//增加工作线程
void addWorkers(int num);
//减少工作线程
void removeWorkers(int num);
//得到正在等待执行的任务数量
int getJobSize();
}
- 实现
class DefaultThreadPool<T extends Runnable> implements ThreadPool<T> {
private static final int DEFAULT_WORKER_NUM = 5;
private static final int MAX_WORKER_NUM = 10;
private static final int MIN_WORKER_NUM = 1;
private List<T> taskList = new LinkedList<>();
private List<Worker> workerList = Collections.synchronizedList(new ArrayList<Worker>());
private int workerNum = DEFAULT_WORKER_NUM; //线程数目
private AtomicInteger threadNum = new AtomicInteger(); //线程编号
public DefaultThreadPool() {
}
public DefaultThreadPool(int workerNum) {
if (workerNum < MIN_WORKER_NUM) {
workerNum = MIN_WORKER_NUM;
} else if (workerNum > MAX_WORKER_NUM) {
workerNum = MAX_WORKER_NUM;
}
initializeWorkers(workerNum);
}
private void initializeWorkers(int workNum) {
for (int i = 0; i < workNum; i++) {
Worker worker = new Worker();
workerList.add(worker);
Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
thread.start();
}
}
@Override
public void execute(T task) {
if (task == null) {
return;
}
synchronized (taskList) {
taskList.add(task);
workerList.notify();
}
}
@Override
public void shutdown() {
for (Worker worker : workerList) {
worker.shutdown();
}
}
@Override
public void addWorkers(int num) {
if (num + workerNum > MAX_WORKER_NUM) {
num = num + workerNum - MAX_WORKER_NUM;
}
initializeWorkers(num);
workerNum += num;
}
@Override
public void removeWorkers(int num) {
if (num > workerNum) {
throw new IllegalArgumentException("blow worker num!");
}
for (int i = 0; i < num; i++) {
Worker worker = workerList.remove(0);
worker.shutdown();
}
workerNum -= num;
}
@Override
public int getJobSize() {
return taskList.size();
}
class Worker implements Runnable {
private volatile boolean running = true;
@Override
public void run() {
while (running) {
synchronized (taskList) {
while (taskList.isEmpty()) {
try {
taskList.wait();
} catch (InterruptedException e) {
//感知到外部对Worker的中断操作,则返回
Thread.currentThread().interrupt();
return;
}
}
T task = taskList.remove(0);
if (task == null) {
continue;
}
try {
task.run();
} catch (Exception e) {
//忽略执行中抛出的Exception
}
}
}
}
public void shutdown() {
running = false;
}
}
}
线程池的本质:
使用一个线程安全的工作队列连接工作线程和客户端线程,客户端线程将任务放入工作队列后便返回,而工作线程则不断从工作队列中取出工作并执行。
18、如何用线程池实现简单的Web服务器?
public class Test {
public static void main(String[] args) {
SimpleHttpServer simpleHttpServer = new SimpleHttpServer();
simpleHttpServer.start();
}
}
class SimpleHttpServer {
private ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<>(20);
private int port = 8080;
private static String basePath = "";
public void setPort(int port) {
this.port = port;
}
public void setBasePath(String basePath) {
this.basePath = basePath;
}
public void start() {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
Socket socket;
while ((socket = serverSocket.accept()) != null) {
threadPool.execute(new HttpRequestHandler(socket));
}
} catch (IOException e) {
e.printStackTrace();
}
}
static class HttpRequestHandler implements Runnable {
private Socket socket;
public HttpRequestHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
InputStream inputStreamOfSocket = null;
Reader readerOfSocket = null;
BufferedReader bufferedReaderOfSocket = null;
OutputStream outputStreamOfSocket = null;
PrintWriter printWriterOfSocket = null;
FileInputStream fileInputStreamOfFile = null;
FileReader fileReaderOfFile = null;
BufferedReader bufferedReaderOfFile = null;
try {
inputStreamOfSocket = socket.getInputStream();
readerOfSocket = new InputStreamReader(inputStreamOfSocket);
bufferedReaderOfSocket = new BufferedReader(readerOfSocket);
outputStreamOfSocket = socket.getOutputStream();
printWriterOfSocket = new PrintWriter(outputStreamOfSocket);
String header = bufferedReaderOfSocket.readLine();
String[] headerArr = header.split(" ");
String relativePath = headerArr.length > 1 ? headerArr[1] : "";
String filePath = basePath + relativePath;
if (filePath.endsWith(".jpg") || filePath.endsWith(".ico")) {
fileInputStreamOfFile = new FileInputStream(filePath);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
int i = 0;
while ((i = fileInputStreamOfFile.read()) != -1) {
byteArrayOutputStream.write(i);
}
byte[] bytes = byteArrayOutputStream.toByteArray();
printWriterOfSocket.println("HTTP/1.1 200 OK");
printWriterOfSocket.println("Server: Molly");
printWriterOfSocket.println("Content-Type: image/jpeg");
printWriterOfSocket.println("Content-Length: " + bytes.length);
printWriterOfSocket.println("");
outputStreamOfSocket.write(bytes, 0, bytes.length);
} else {
fileReaderOfFile = new FileReader(filePath);
bufferedReaderOfFile = new BufferedReader(fileReaderOfFile);
String line = null;
printWriterOfSocket.println("HTTP/1.1 200 OK");
printWriterOfSocket.println("Server: Molly");
printWriterOfSocket.println("Content-Type: text/html; charset=UTF-8");
printWriterOfSocket.println("");
while ((line = bufferedReaderOfFile.readLine()) != null) {
printWriterOfSocket.println(line);
}
}
outputStreamOfSocket.flush();
} catch (Exception e) {
try {
PrintWriter printWriter = new PrintWriter(socket.getOutputStream());
printWriter.println("HTTP/1.1 500");
printWriter.println("");
printWriter.flush();
} catch (IOException e1) {
e1.printStackTrace();
}
} finally {
close(printWriterOfSocket, bufferedReaderOfSocket, bufferedReaderOfFile);
}
}
}
private static void close (Closeable... closeables) {
if (closeables == null) {
return;
}
for (Closeable closeable : closeables) {
try {
closeable.close();
} catch (IOException e) {
}
}
}
}
五、Java中的锁
1、synchronized 和 Lock接口 比较优缺点?
synchronized 和 Lock接口 都提供锁的功能,提供类似的同步功能。
1.1、synchronized
- 优点:
隐式获取锁,比较便捷
- 缺点:
相比Lock缺少灵活性
1.2、Lock接口
- 优点:
获取锁更加灵活。
拥有获取锁和释放锁的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。
比如,针对一个场景,进行锁的获取和释放。
先获得锁A,再获取锁B,
当锁B获得后,释放锁A同时获取锁C,
当锁C获得后,释放锁B同时获取锁D
以此类推。
以上场景synchronized索取不那么容易,而是用Lock却容易许多。
Lock接口提供的 synchronized 所不具别的特性:
特性 | 描述 |
---|---|
非阻塞的获取锁 | 当前线程尝试获取锁, 如果这一时刻锁未被其他线程获取到,则获取并持有锁。 |
可被中断的获取锁 | 获取到锁的线程能响应中断。当获取到锁的线程被中断时,抛出中断异常,释放锁。 |
超时获取锁 | 在截止时间之前获取锁。如果截止时间到了仍未获取到锁,则返回。 |
- 缺点:
使用不如synchronized便捷。
2、Lock 接口如何使用?
Lock lock = new ReentrantLock();
lock.lock();
try {
//业务逻辑
} finally {
lock.unlock();
}
Lock 是一个接口,它定义了锁的获取和释放的基本操作。Lock的API如下所示:
函数定义 | 描述 |
---|---|
void lock() | 阻塞的获取锁。当前线程阻塞,直到获取锁。 |
void lockInterruptibly() throws InterruptedException | 可中断的获取锁。当lock()的区别在于该方法会响应中断。 |
boolean tryLock() | 尝试非阻塞的获取锁,调用该方法立即返回。 如果获取成功返回true,否则返回false。 |
boolean tryLock(long time, TimeUnit unit) throws InterruptedException | 超时可中断的获取锁,以下几种情况返回: ①超时时间内获得了锁 ②超时时间内被中断 ③超时时间结束,返回false |
void unlock() | 释放锁 |
Condition newCondition() | 获取等待通知组件。 该组件和当前的锁绑定,当前线程只有获得了锁,才能调用waite()方法。 调用后,当前线程将释放锁。 |
- 习题:
以下代码输出是什么?
(假设线程名称是 pool-1-thread-1、pool-1-thread-2 …… pool-1-thread-5)
public class TestV {
@Test
public void test() {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Task task = new Task();
for (int i = 0; i < 5; i++) {
executorService.submit(task);
}
executorService.shutdown();
while (!executorService.isTerminated()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Task implements Runnable {
private Lock lock = new ReentrantLock();
@Override
public void run() {
while (true) {
lock.lock();
try {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
}
}
答案:
类似下面
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
每一秒随机选取一个线程打印线程名。
3、如何实现独占锁?
public class Mutex implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
//是否处于占用状态
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
//当状态为0的时候获取锁
@Override
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//释放锁,将状态设置为0
@Override
protected boolean tryRelease(int releases) {
if (getState() == 0) {
throw new IllegalMonitorStateException(null);
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//返回一个Condition,每个Condition都包含了一个condition队列
Condition newCondition() {
return new ConditionObject();
}
}
//将操作代理到Sync上
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
通过 AbstractQueuedSynchronizer来实现(简称AQS)。
- AQS是构建锁或者其他同步组件的基础框架。
- AQS的主要使用方式是『继承+模板方法』来使用。
- AQS使用int变量表示同步状态,通过内置FIFO队列完成线程排队。
- AQS通过以下3个基础方法 访问或修改 同步状态:
函数定义 | 说明 |
---|---|
getState() | 获取当前同步状态 |
setState(int newState) | 设置同步状态 |
compareAndSetState(int expect, int update) | 使用CAS设置状态,可保证设置原子性。 |
- AQS可重写的方法如下:
- 同步器提供的模板方法如下所示:
4、AbstractQueuedSynchronizer 内部如何实现?
即 3、如何实现独占锁?
中从sync.acquire(1);
到sync.tryAcquire()
的流程是什么样的?
4.1、独占锁总体流程
代码实现如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
解析:
① 调用tryAcqure()
方法获取同步状态
② addWaiter()
获取失败之后生成一个『链表节点』并加入链表尾部
③ acquireQueued()
进入自旋过程。
4.2、队列介绍
4.3、自旋介绍
- 进入自旋
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
① 死循环
② 如果前一个节点是head,则尝试获取同步状态
③ 如果获取失败,则进入『等待/通知』模式
- 退出自旋
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
① 释放同步状态。
② 通过LockSuppor.unpark()
通知后继节点。
4.4、共享式同步状态获取与释放
共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
① tryAcquireShared(int)
用来获取同步状态,如果返回>0则说明获取到了同步状态
② 如果获取不到则进入自旋状态
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
- 释放同步状态逻辑和『独占式』逻辑基本一致
- 主要区别在于:支持多个线程同时访问的并发组件(如Semaphore),需要保证状态安全释放,一般通过CAS实现。
4.5、独占式超时获取同步状态
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
if (nanosTimeout <= 0)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 主要流程和独占式基本类似
- 主要区别:在于未获取到同步状态时,再等待
nanosTimeout
纳秒返回。
5、LockSupport的用途?
LockSupport定义了一组以park
开头的方法用来阻塞当前线程,一组unpark
开头的方法用来唤醒线程。
函数定义 | 描述 |
---|---|
void park() | 阻塞当前线程。 调用unpark(Thread thread)或者中断当前线程才能返回。 |
void parkNanos() | 在park()基础上增加了超时返回。 |
parkUtil(long deadLine) | 阻塞当前线程,直到deadLine时间(1970到deadLine毫秒数) |
void unpark(Thread thread) | 唤醒阻塞的thread |
6、LockSupport的park/unpark与 object.wait/notify的区别?
主要有以下不同:[^park_vs_wait]
-
面向的主体不一样。
LockSuport主要是针对Thread进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒。Object.wait()是以对象为纬度,阻塞当前的线程和唤醒单个(随机)或者所有线程。 -
实现机制不同。
虽然LockSuport可以指定monitor的object对象,但和object.wait(),两者的阻塞队列并不交叉。object.notifyAll()不能唤醒LockSupport的阻塞Thread.
7、如何实现TwinsLock?
public class TwinsLock implements Lock {
private static final class Sync extends AbstractQueuedSynchronizer {
public Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than 0");
}
setState(count);
}
public int tryAcquireShared(int reduceCount) {
for (;;) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
public boolean tryReleaseShared(int returnCount) {
for (;;) {
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}
private final Sync sync = new Sync(2);
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
8、什么是『重入锁』?如何实现『重入锁』?
8.1、重入锁
- 支持重进入的锁,表示能够一个线程对资源的重复加锁。
synchronized关键字
隐式支持冲进入。
8.2、如何实现
重入锁需要解决以下两个问题:
- 再次获取锁时识别是否为当前线程占据锁。
- 线程重复获取n次锁,又重复释放n次锁,其它线程才能获取到锁。
通过自定义同步器实现,
同步器的tryAcqire(int acuires)
方法如下:
final boolean noFairAcquire(int acquires) {
find Thread thread = Thread.currentThread();
int c = getState();
if (c == 0) {
if compareAndSet(0, acquires) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nexc);
}
return false;
}
同步器的tryRelease(int releases)
方法如下:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw IllegalMonitorStateException();
boolean free = false
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return false;
}
9、『公平锁』和『非公平锁』区别?如何实现?
9.1 『公平锁』和『非公平锁』区别?
如果一个锁是公平的,那么锁的获取顺序应该和请求锁的顺序一致,先来先得。
9.2、公平锁如何实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
和上面相比多了!hasQueuedPredecessors()
的判断。
9.3、公平锁和非公平锁比较
非公平锁相比公平锁,
-
优点:
吞吐量更大。因此同样的工作前提下,非公平锁的线程切换次数更小。 -
缺点
会造成线程的饥饿。
10、『读写锁』如何使用?如何实现『读写锁』?
10.1、『读写锁』使用
class Cache {
private Map<String, Object> map = new HashMap();
private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
private Lock r = reentrantReadWriteLock.readLock();
private Lock w = reentrantReadWriteLock.writeLock();
public Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
public void put(String key, Object value) {
w.lock();
try {
map.put(key, value);
} finally {
w.unlock();
}
}
public void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
}
方法名称 | 描述 |
---|---|
int getReadLockCount() | 返回当前读锁被获取的次数。该次数不等于获取读锁的线程数,比如:仅一个线程,它连续获取(重进入)了n次读锁,那么占据读锁的线程数是1,但该方法返回n |
int getReadHoldCount() | 返回当前线程获取读锁的次数。该方法在Java 6 中加入到ReentrantReadWriteLock中,使用ThreadLocal保存当前线程获取的次数,这也使得Java 6 的实现变得更加复杂 |
boolean isWriteLocked() | 判断写锁是否被获取 |
int getWriteHoldCount() | 返回当前写锁被获取的次数 |
10.2 如何实现『读写锁』
因为『读写锁』的自定义同步器需要,用1个int维护多个读线程和一个写线程,
所以就一定要『按位切割使用』。
- 高16位表示读,低16位表示写
- 划分方式如下
写锁的获取与释放
写锁是一个支持重入的排它锁。
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
相比可重入锁,添加了对读锁的判断。如果存在读锁则无法获取写锁。
释放流程和可重入锁基本一致。
读锁的获取与释放
读锁是一个支持重入的共享锁。
protected final int tryAcquireShared(int unused) {
for (;;) {
int c = getState();
int netc = c + (1 << 16);
int (nextc < c)
throw new Error("Maximum lock count exceeded");
if (exclusiveCount(c) != 0 && owner !- Thread.currentThread())
return -1;
if (compareAndSetState(c, nextc))
return 1;
}
}
如果其他线程获取了写锁,则获取失败。
如果写锁未获取 或者 当前线程获取了锁 ,则成功获取读锁。
wait()
+notify()
+synchronize()
,如何实现『等待/通知』模式?
11、除了 wait()
+notify()
+synchronize()
可以实现『等待/通知』模式,
除此之外Condition
+Lock
配合也可以实现『等待/通知』模式。
- 简单使用如下
class ConditionUseCase {
private Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
lock.lock();
try {
condition.await();
} finally {
lock.unlock();
}
}
public void conditionSignal() {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
}
- 实现
BlockingQueue
:
class BoundedQueue<T> {
private Object[] items;
private int addIndex, removeIndex, count;
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
public void add(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await();
}
items[addIndex] = t;
if (++addIndex == items.length) {
addIndex = 0;
}
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public T remove(T t) throws InterruptedException {
lock.lock();
try {
while (0 == count) {
notEmpty.await();
}
Object x = items[removeIndex];
if (++removeIndex == items.length) {
removeIndex = 0;
}
--count;
notFull.signal();
return (T) x;
} finally {
lock.unlock();
}
}
}
12、Condition内部如何实现?
12.1、等待队列
- 一个Condition对象包含一个等待队列
- 调用
Condition.awaite()
会在队列尾部中追加节点 - 调用
Condition.signal()
把队列头部节点移除
- 一个同步器包含一个同步队列、多个等待队列
- 不同于Object的监视器模式拥有一个同步队列和一个等待队列
12.2、等待
- 调用
await()
相当于同步队列的首节点移动到等待队列的尾节点 - 调用
awaite()
和从await()
返回的前提都是需要获取锁
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
long savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
- 构造节点加入等待队列。
- 释放同步状态
- 进入等待队列
12.3、通知
- 调用
signal()
方法会把等待队列的首节点移动到同步节点尾部。 - 调用
signal()
方法的前提是需要获取到锁。 - 调用
signalAll()
会把等待队列所有节点移动到同步队列尾部。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
- 检查当前线程是否获取了锁
- 唤醒首节点