posted in JavaWeb 

四、并发编程基础

1、为什么使用多线程

  1. 更多的处理器核心
  2. 更快的响应速度
  3. 更好的编程模型

2、如何设置线程优先级

  • 如何设置线程的优先级
    • Thread.setPriority()
  • 线程的优先级是否可以保证
    • 不同平台下的实现不一样,无法保证

3、线程有哪些状态

状态名称 说明
NEW 初始状态,线程被构建,但还没有调用start()方法。
RUNNABLE 运行状态,Java线程将操作系统中的就绪和运行两种状态统一称为RANNABLE
BLOCK 阻塞状态,表示线程阻塞于锁
WAITING 等待状态,表示当前线程需要等待其它线程做出一些待定动作(通知或中断)
TIME_WAITING 超时等待状态,类似于WAITING,但可以在指定时间后自行返回
TERMINATED 终止状态,表示当前线程已经执行完毕

4、如何达到线程的各种状态

public class Test {
    public static void main(String[] args) {
        new Thread(new TimeWaiting(), "TimeWaiting").start();
        new Thread(new Waiting(), "Waiting").start();
        new Thread(new Blocked(), "Blocked").start();
    }
}
class TimeWaiting implements Runnable {
    @Override
    public void run() {
        while (true) {
            SleepUtils.second(100);
        }
    }
}
class Waiting implements Runnable {
    @Override
    public void run() {
        while (true) {
            synchronized (Waiting.class) {
                try {
                    Waiting.class.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
class Blocked implements Runnable {
    @Override
    public void run() {
        synchronized (Blocked.class) {
            while (true) {
                SleepUtils.second(100L);
            }
        }
    }
}
class SleepUtils {
    public static final void second(long seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • jps后查看
7827 Jps
998 RemoteMavenServer
7783 Launcher
7784 Test
5341 Launcher
  • jstack 7784后查看
......

"Blocked" prio=5 tid=0x00007fcbe4845000 nid=0x5903 waiting on condition [0x00007000048fd000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at java.lang.Thread.sleep(Thread.java:340)
        at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:360)
        at test.SleepUtils.second(Test.java:60)
        at test.Blocked.run(Test.java:51)
        - locked <0x00000007aad2fb10> (a java.lang.Class for test.Blocked)
        at java.lang.Thread.run(Thread.java:745)

"Waiting" prio=5 tid=0x00007fcbe302b000 nid=0x5703 in Object.wait() [0x00007000047fa000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x00000007aad2c388> (a java.lang.Class for test.Waiting)
        at java.lang.Object.wait(Object.java:503)
        at test.Waiting.run(Test.java:36)
        - locked <0x00000007aad2c388> (a java.lang.Class for test.Waiting)
        at java.lang.Thread.run(Thread.java:745)

"TimeWaiting" prio=5 tid=0x00007fcbe4090000 nid=0x5503 waiting on condition [0x00007000046f7000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at java.lang.Thread.sleep(Thread.java:340)
        at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:360)
        at test.SleepUtils.second(Test.java:60)
        at test.TimeWaiting.run(Test.java:24)
        at java.lang.Thread.run(Thread.java:745)

5、线程各种状态之间如何转换

Java线程状态变迁如图所示,

6、Daemon线程有什么特点,如何创建Daemon线程

  • Java虚拟机中不存在非Daemon线程时,Java虚拟机会退出。
  • 通过Thread.setDaemon(true)来指定Daemon线程。

以下线程输出内容是?

public class Daemon {
    public static void main(String[] args) {
        Thread thread = new Thread(new DaemonRunner());
        thread.setDaemon(true);
        thread.start();
    }
}

class DaemonRunner implements Runnable {
    public void run() {
        try {
            SleepUtils.seconds(100L);
        } finally {
            System.out.print("Daemon Thread Exit.");
        }
    }
}

  • 结果:什么都不会输出
  • 注意:Daemon线程不能依靠finally的内容来进行关闭清理资源所及。

7、线程的中断是什么,如何进行中断相关操作

  • 终端是什么

终端可以理解为终端的一个boolean属性,标识运行中的线程是否被其他线程进行终端操作。

  • 如何进行终端相关操作
    • 中断:通过thread.interrupt()进行中断操作
    • 判断:通过isInterrupted()判断是否被中断
    • 复位:通过Thread.interrupted()对当前的中断位进行复位
    • 复位:抛出InterruptedException的方法会先对中断位进行复位,然后抛Exception

以下两个线程的输出是什么?

public class Interrupted {  
    public static void main(String[] args) throws Exception {  
        // sleepThread不停的尝试睡眠
        Thread sleepThread = new Thread(new SleepRunner(), "SleepThread");  
        sleepThread.setDaemon(true);
        // busyThread不停的运行
        Thread busyThread = new Thread(new BusyRunner(), "BusyThread");
        busyThread.setDaemon(true);
        sleepThread.start();
        busyThread.start();
        // 休眠5秒,让sleepThread和busyThread充分运行
        TimeUnit.SECONDS.sleep(2);
        sleepThread.interrupt();
        busyThread.interrupt();
        System.out.println("SleepThread interrupted is " + sleepThread.isInterrupted());
        System.out.println("BusyThread interrupted is " + busyThread.isInterrupted());  
        // 防止sleepThread和busyThread立刻退出  
        TimeUnit.SECONDS.sleep(2);
    }
    static class SleepRunner implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    //e.printStackTrace();
                }
            }
        }
    }
    static class BusyRunner implements Runnable {
        @Override
        public void run() {  
            while (true) {
            }
        }
    }
}

程序输出结果:

SleepThread interrupted is false  
BusyThread interrupted is true  

8、中断有什么用?

中断作为一种简便的线程间交互方式,非常适合用来取消或终止线程。
另一种取消或终止线程的方式是采用一个boolean变量来控制线程。

例1,创建一个线程进行计数,主线程对这个线程取消操作:

public class Test {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new CountTask());
        thread.start();

        TimeUnit.SECONDS.sleep(1L);
        thread.interrupt();
    }
}
class CountTask implements Runnable {
    private volatile boolean on = true;
    private long count = 0L;
    
    @Override
    public void run() {
        while (on && !Thread.currentThread().isInterrupted()) {
            count++;
        }
        System.out.println(c);
    }
}

例2,创建一个生产者,一个消费者和一个BlockingQueue,在任务消费完后结束两个任务:

public class Test {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> usernameQueue = new LinkedBlockingQueue<>(5);

        Thread producer = new Thread(new ProducerTask(usernameQueue));
        Thread consumer = new Thread(new ConsumerTask(usernameQueue));
        producer.start();
        consumer.start();

        //取消代码-------start--------
        while (true) {
            if (!producer.isAlive() && usernameQueue.size()==0) {
                //如果生产者已经生产完了,且队列已经消费完了
                consumer.interrupt();
                break;
            } else {
                TimeUnit.SECONDS.sleep(1L);
            }
        }
        //取消代码-------end--------
    }
}
class ProducerTask implements Runnable {
    private BlockingQueue<String> usernameQueue;

    public ProducerTask(BlockingQueue<String> usernameQueue) {
        this.usernameQueue = usernameQueue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                usernameQueue.put(i + "");
            } catch (InterruptedException e) {
                return;
            }
        }
    }
}

class ConsumerTask implements Runnable {
    private BlockingQueue<String> usernameQueue;

    public ConsumerTask(BlockingQueue<String> usernameQueue) {
        this.usernameQueue = usernameQueue;
    }

    @Override
    public void run() {
        String username;
        try {
            while ((username = usernameQueue.take()) != null) {
                System.out.println(username);
            }
        } catch (InterruptedException e) {
            return;
        }
    }
}

9、suspend()、resume()和stop()作用

  • 作用:把一个线程的运作比作录音机,这个线程的暂停、继续播放、停止 分别对应 Thread的suspend()、resume()、stop()
  • 过时的API:
    • suspend()时不会释放锁,而是占着资源入睡,容易死锁。
    • stop()时不保证资源的正常释放,没有给线程释放资源机会,导致线程运行在不确定状态下。

10、使用 stop/interrupt 结束进程有什么区别?

  • stop: 不保证资源的正常释放,没有给线程释放资源机会,导致线程运行在不确定状态下。
  • interrupt: 线程终止时有机会去清理资源,而不是武断的终止线程,这种做法更加安全和优雅。

11、synchronized底层实现原理?

  • 作用:
    synchronized可以修饰方法或者修饰方法,用于多线程时只有一个线程在方法或代码块中,保证了线程对变量的可见性和排他性。

  • 原理:

public class Test {
    public static void main(String[] args) {
        //对Test Class对象进行加锁
        synchronized (Test.class) {
        }
        f();
    }
    //对Test Class对象进行加锁(因为是静态方法)
    public static synchronized void f() {
    }
}

通过javap -verbose Test.class进行反编译,得到以下结果:

         4: astore_1
         5: monitorenter
         6: aload_1
         7: monitorexit
         ......
public static synchronized void f();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
    Code:

同步代码块采用moniterentermoniterexit实现,同步静态方法依靠ACC_SYNCHRONIZED来实现,而本质上是对一个监视器(monitor)的获取。某个线程只有获得了一个对象的监视器,才能执行对应的同步代码块或者同步方法。

对象、对象监视器、同步队列和执行线程之间关系如下:

12、如何实现 等待/通知 机制?

12.1、什么是等待/通知机制?

一个线程(a线程)修改了一个对象,另一个线程(b线程)感知到了变化,然后进行相应操作。

12.2、等待/通知机制有什么好处?

这种模式开始于一个线程,最终执行是另一个线程。这是模式隔离了『何时做(When)』和『怎么做(How)』,功能层面实现了解耦,体系结构层面具备良好的伸缩性。

12.3、如何实现等待/通知机制?

方法1
//B线程
while (value =! desired) {
    Thread.sleep(1000);
}

缺点:

  1. 无法保证响应及时性
  2. 性能消耗比较高
方法2

采用wait()和notify()来解决该问题:

  1. b线程调用对象o.wait()进入等待状态,
  2. a线程调用对象o.notify()来唤醒b线程,
  3. b线程醒来进行下面的操作。
方法名 描述
notify() 通知一个在对象上等待的线程,使其从wait()返回,返回的前提是获得了对象的锁
notifyAll() 通知所有在对象上等待的线程
wait() 调用该方法的线程进入WAITING状态,接收到其他线程通知才会返回。
调用wait()后会释放对象的锁。
wait(long) 超时等待一段时间,被通知或者超时返回。 单位毫秒。
wait(long, int) 超时等待。 单位可以指定。
public class Test {
    private static Object lock = new Object();
    private static boolean isDesired = false;

    public static void main(String[] args) {
        new Thread(new Wait(), "Wait").start();
        new Thread(new Notify(), "Notify").start();
    }
    static class Wait implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                while (!isDesired) {
                    try {
                        System.out.println(Thread.currentThread() + " is waiting");
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            System.out.println(Thread.currentThread() + " start doing following things");
        }
    }
    static class Notify implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                System.out.println(Thread.currentThread() + " make isDesired=true");
                isDesired = true;
                lock.notifyAll();
            }
        }
    }
}

输出如下:

Thread[Wait,5,main] is waiting
Thread[Notify,5,main] make isDesired=true
Thread[Wait,5,main] start doing following things

注意:

  • 调用wait()\notify()\notifyAll()需要首先对对象加锁
  • 调用wait()后,线程状态由 RUNNING 变为 WAITING,并把当前线程加入等待队列。
  • 调用notify()和notifyAll()后,
    • 线程不会直接返回,而是需要获得锁之后才返回。
    • 从等待队列移动到同步队列。
    • 状态由WAITING变为BLOCKING。
  • notify()和notifyAll()区别:前者把一个线程从等待队列移动到同步队列,后者将所有线程从等待队列移动到同步队列。

等待/通知 经典范式:

  • 等待方

1)获取对象的锁。
2)如果条件不满足,调用对象的wait(),被通知后仍要检查条件。
3)条件满足则执行相应逻辑。
对应伪代码如下:

synchronized(对象) {
    while (条件不满足) {
        对象.wait();
    }
}
  • 通知方

1)获得对象的锁。
2)改变条件。
3)通知所有等待在对象上的线程。
对应伪代码如下:

synchronized(对象) {
    改变条件();
    对象.notifyAll();
}

13、如何实现线程间的 管道输入/输出流?

通过PipedInputStream/PipedOutputStream/PipedReader/PipedWriter实现线程之间的数据传输,传输的媒介为内存。

public class Test {
    public static void main(String[] args) throws IOException {
        //创建管道并绑定
        PipedReader pipedReader = new PipedReader();
        PipedWriter pipedWriter = new PipedWriter();
        pipedWriter.connect(pipedReader);

        //创建输入输出线程
        new Thread(new PrintTask(pipedReader)).start();
        new Thread(new ReadTask(pipedWriter)).start();
    }
}
class ReadTask implements Runnable {
    private PipedWriter pipedWriter;
    public ReadTask(PipedWriter pipedWriter) {
        this.pipedWriter = pipedWriter;
    }
    @Override
    public void run() {
        int receive = 0;
        try {
            while ((receive = System.in.read()) != -1) {
                pipedWriter.write(receive);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                pipedWriter.close();
            } catch (IOException e) {
            }
        }
    }
}
class PrintTask implements Runnable {
    private PipedReader pipedReader;
    public PrintTask(PipedReader pipedReader) {
        this.pipedReader = pipedReader;
    }
    @Override
    public void run() {
        int receive = 0;
        try {
            while ((receive = pipedReader.read()) != -1) {
                System.out.print((char) receive);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                pipedReader.close();
            } catch (IOException e) {
            }
        }
    }
}

14、Thread.join()有什么用?

如果线程A执行了thread.join()方法,代表thread线程终止之后才从thread.join()返回。

Thread.join() 的源码如下(进行部分调整)

    public final synchronized void join()
    throws InterruptedException {

        while (isAlive()) {
            wait(0);
        }
        //条件符合 方法返回
    }

线程终止时,会调用线程资深的notifyAll方法,通知所有wait的线程。
join()的实现也是采用 等待/通知 机制。

15、ThreadLocal如何使用?

ThreadLocal是一个为每个线程单独存储结构。

如,以下分析器可以用来统计调用时间。

public class Test {
    public static void main(String[] args) throws IOException, InterruptedException {
        Profiler.begin();
        Thread.sleep(1);
        System.out.println(Profiler.end());
    }
}

class Profiler {
    private static ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<>();
    public static void begin() {
        TIME_THREADLOCAL.set(System.currentTimeMillis());
    }
    public static long end() {
        return System.currentTimeMillis() - TIME_THREADLOCAL.get();
    }
}

这一分析器的好处在于两个方法的调用不用在同一个方法或者同一个类中。

16、如何实现等待超时模式?

16.1、什么是 等待超时机制?

调用一个方法等待一段时间,如果该时间段内得到结果则立即返回,否则超时返回默认结果。

16.2、如何实现 等待超时 机制?

在 等待/通知 做一个小小的改动即可。

问题:

当前有一个方法tryGet(),非阻塞获取一个结果,获取到则返回,获取不到返回null。

  • 问题1. 将该方法改写为阻塞获取,获取不到则阻塞直至获取到。
  • 问题2. 将该方法改写为阻塞获取并等待一段时间,该时间内获取不到则返回null。
问题解决:
  • 问题1. 阻塞获取
    public synchronized Object get() throws InterruptedException {

        Object result = null;
        while (((result = tryGet()) == null)) {
            wait();
        }

        return result;
    }
  • 问题2. 阻塞获取并等待一段时间

    • 等待超时 只需要在 等待/通知 做一个小小的改动即可。
    • 等待变量:remaing = t
    • 超时时间:future = now + remaining
    public synchronized Object get(long t) throws InterruptedException {
        long remaining = t;
        long future = System.currentTimeMillis() + t;

        Object result = null;
        while (((result = tryGet()) == null)
                && remaining > 0) {
            wait(remaining);
            remaining = future - System.currentTimeMillis();
        }

        return result;
    }

16.3、等待超时应用:数据库连接池

class ConnectionPool {
    private List<Connection> pool = new LinkedList<>();

    public ConnectionPool(int initSize) {
        if (initSize <= 0) {
            return;
        }

        for (int i = 0; i <initSize; i++) {
            pool.add(ConnectionDriver.crateConnection());
        }
    }

    public Connection fatchConnection(long timeout) throws InterruptedException {
        synchronized (pool) {
            if (timeout <= 0) {

                //等待通知 机制
                while (pool.isEmpty()) {
                    pool.wait();
                }

                return pool.remove(0);
            } else {

                //等待超时 机制
                long remaining = timeout;
                long future = System.currentTimeMillis() + timeout;

                while (pool.isEmpty() && remaining > 0) {
                    pool.wait(remaining);
                    remaining = future - System.currentTimeMillis();
                }

                Connection result = null;
                if (!pool.isEmpty()) {
                    result = pool.remove(0);
                }
                return result;
            }
        }
    }


    public void releaseConnection(Connection connection) {
        if (connection == null) {
            return;
        }

        synchronized (pool) {
            pool.add(connection);
            pool.notifyAll();
        }
    }
}

虽然客户端采用等待超时获取方式获取Connection失败,但是这种机制保证客户端不糊一致挂在这个获取连接操作上,则是『按时』返回,并告知客户端获取出现问题,是系统的一种自我保护机制。

17、如何使用线程池?

public class TestH_ThreadPool {

    class TestTask implements Runnable {
        AtomicInteger count = new AtomicInteger(0);

        public void run() {
            for (int i = 0; i < 10000; i++) {
                count.incrementAndGet();
            }
            System.out.println(count.get());
        }
    }

    //ThreadPool错误的用法
    //输出:
    //10000
    //10000
    //10000
    @Test
    public void test1() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        executorService.execute(new TestTask());
        executorService.execute(new TestTask());
        executorService.execute(new TestTask());

        Thread.sleep(2000);
    }

    //ThreadPool正确的用法
    //输出:
    //28064
    //29221
    //30000
    @Test
    public void test2() throws InterruptedException {
        Runnable task = new TestTask();

        ExecutorService executorService = Executors.newFixedThreadPool(3);
        executorService.execute(task);
        executorService.execute(task);
        executorService.execute(task);

        Thread.sleep(2000);
    }
}

17、如何实现一个简单的线程池?

17.1、为什么使用线程池

  • 如果为每个任务创建一个线程,会有两个问题
    1. 创建和销毁线程是有开销的,大量创建线程浪费资源。
    2. 线程切换存在上下文切换的开销,线程过多会花费大量实现在上下文切换上。
  • 使用线程池有两个好处:
    1. 节省了上面提到的两个问题的开销。
    2. 面对过量的任务能够平缓劣化。

17.2、如何实现简单的线程池

  • 线程池接口定义
interface ThreadPool<Job extends Runnable> {
    //执行一个线程
    void execute();
    //关闭线程池
    void shutdown();
    //增加工作线程
    void addWorkers(int num);
    //减少工作线程
    void removeWorkers(int num);
    //得到正在等待执行的任务数量
    int getJobSize();
}
  • 实现
class DefaultThreadPool<T extends Runnable> implements ThreadPool<T> {

    private static final int DEFAULT_WORKER_NUM = 5;
    private static final int MAX_WORKER_NUM = 10;
    private static final int MIN_WORKER_NUM = 1;

    private List<T> taskList = new LinkedList<>();
    private List<Worker> workerList = Collections.synchronizedList(new ArrayList<Worker>());
    private int workerNum = DEFAULT_WORKER_NUM;             //线程数目
    private AtomicInteger threadNum = new AtomicInteger();  //线程编号

    public DefaultThreadPool() {
    }

    public DefaultThreadPool(int workerNum) {
        if (workerNum < MIN_WORKER_NUM) {
            workerNum = MIN_WORKER_NUM;
        } else if (workerNum > MAX_WORKER_NUM) {
            workerNum = MAX_WORKER_NUM;
        }
        initializeWorkers(workerNum);
    }

    private void initializeWorkers(int workNum) {
        for (int i = 0; i < workNum; i++) {
            Worker worker = new Worker();
            workerList.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
            thread.start();
        }
    }

    @Override
    public void execute(T task) {
        if (task == null) {
            return;
        }
        synchronized (taskList) {
            taskList.add(task);
            workerList.notify();
        }
    }

    @Override
    public void shutdown() {
        for (Worker worker : workerList) {
            worker.shutdown();
        }
    }

    @Override
    public void addWorkers(int num) {
        if (num + workerNum > MAX_WORKER_NUM) {
            num = num + workerNum - MAX_WORKER_NUM;
        }
        initializeWorkers(num);
        workerNum += num;
    }

    @Override
    public void removeWorkers(int num) {
        if (num > workerNum) {
            throw new IllegalArgumentException("blow worker num!");
        }

        for (int i = 0; i < num; i++) {
            Worker worker = workerList.remove(0);
            worker.shutdown();
        }
        workerNum -= num;
    }

    @Override
    public int getJobSize() {
        return taskList.size();
    }

    class Worker implements Runnable {
        private volatile boolean running = true;
        @Override
        public void run() {
            while (running) {
                synchronized (taskList) {
                    while (taskList.isEmpty()) {
                        try {
                            taskList.wait();
                        } catch (InterruptedException e) {
                            //感知到外部对Worker的中断操作,则返回
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    T task = taskList.remove(0);
                    if (task == null) {
                        continue;
                    }
                    try {
                        task.run();
                    } catch (Exception e) {
                        //忽略执行中抛出的Exception
                    }
                }
            }
        }

        public void shutdown() {
            running = false;
        }
    }
}

线程池的本质:
使用一个线程安全的工作队列连接工作线程和客户端线程,客户端线程将任务放入工作队列后便返回,而工作线程则不断从工作队列中取出工作并执行。

18、如何用线程池实现简单的Web服务器?

public class Test {
    public static void main(String[] args) {
        SimpleHttpServer simpleHttpServer = new SimpleHttpServer();
        simpleHttpServer.start();
    }
}


class SimpleHttpServer {
    private ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<>(20);
    private int port = 8080;
    private static String basePath = "";

    public void setPort(int port) {
        this.port = port;
    }

    public void setBasePath(String basePath) {
        this.basePath = basePath;
    }

    public void start() {

        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(port);
            Socket socket;

            while ((socket = serverSocket.accept()) != null) {
                threadPool.execute(new HttpRequestHandler(socket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    static class HttpRequestHandler implements Runnable {

        private Socket socket;

        public HttpRequestHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {

            InputStream inputStreamOfSocket = null;
            Reader readerOfSocket = null;
            BufferedReader bufferedReaderOfSocket = null;
            OutputStream outputStreamOfSocket = null;
            PrintWriter printWriterOfSocket = null;
            FileInputStream fileInputStreamOfFile = null;
            FileReader fileReaderOfFile = null;
            BufferedReader bufferedReaderOfFile = null;
            try {
                inputStreamOfSocket = socket.getInputStream();
                readerOfSocket = new InputStreamReader(inputStreamOfSocket);
                bufferedReaderOfSocket = new BufferedReader(readerOfSocket);

                outputStreamOfSocket = socket.getOutputStream();
                printWriterOfSocket = new PrintWriter(outputStreamOfSocket);

                String header = bufferedReaderOfSocket.readLine();
                String[] headerArr = header.split(" ");
                String relativePath = headerArr.length > 1 ? headerArr[1] : "";
                String filePath = basePath + relativePath;


                if (filePath.endsWith(".jpg") || filePath.endsWith(".ico")) {

                    fileInputStreamOfFile = new FileInputStream(filePath);
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

                    int i = 0;
                    while ((i = fileInputStreamOfFile.read()) != -1) {
                        byteArrayOutputStream.write(i);
                    }
                    byte[] bytes = byteArrayOutputStream.toByteArray();
                    printWriterOfSocket.println("HTTP/1.1 200 OK");
                    printWriterOfSocket.println("Server: Molly");
                    printWriterOfSocket.println("Content-Type: image/jpeg");
                    printWriterOfSocket.println("Content-Length: " + bytes.length);
                    printWriterOfSocket.println("");
                    outputStreamOfSocket.write(bytes, 0, bytes.length);

                } else {

                    fileReaderOfFile = new FileReader(filePath);
                    bufferedReaderOfFile = new BufferedReader(fileReaderOfFile);

                    String line = null;
                    printWriterOfSocket.println("HTTP/1.1 200 OK");
                    printWriterOfSocket.println("Server: Molly");
                    printWriterOfSocket.println("Content-Type: text/html; charset=UTF-8");
                    printWriterOfSocket.println("");
                    while ((line = bufferedReaderOfFile.readLine()) != null) {
                        printWriterOfSocket.println(line);
                    }

                }
                outputStreamOfSocket.flush();

            } catch (Exception e) {
                try {
                    PrintWriter printWriter = new PrintWriter(socket.getOutputStream());
                    printWriter.println("HTTP/1.1 500");
                    printWriter.println("");
                    printWriter.flush();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            } finally {
                close(printWriterOfSocket, bufferedReaderOfSocket, bufferedReaderOfFile);
            }
        }
    }
    private static void close (Closeable... closeables) {
        if (closeables == null) {
            return;
        }

        for (Closeable closeable : closeables) {
            try {
                closeable.close();
            } catch (IOException e) {
            }
        }
    }
}

五、Java中的锁

1、synchronized 和 Lock接口 比较优缺点?

synchronized 和 Lock接口 都提供锁的功能,提供类似的同步功能。

1.1、synchronized

  • 优点:

隐式获取锁,比较便捷

  • 缺点:

相比Lock缺少灵活性

1.2、Lock接口

  • 优点:

获取锁更加灵活。
拥有获取锁和释放锁的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。

比如,针对一个场景,进行锁的获取和释放。

先获得锁A,再获取锁B,
当锁B获得后,释放锁A同时获取锁C,
当锁C获得后,释放锁B同时获取锁D
以此类推。

以上场景synchronized索取不那么容易,而是用Lock却容易许多。

Lock接口提供的 synchronized 所不具别的特性:

特性 描述
非阻塞的获取锁 当前线程尝试获取锁, 如果这一时刻锁未被其他线程获取到,则获取并持有锁。
可被中断的获取锁 获取到锁的线程能响应中断。当获取到锁的线程被中断时,抛出中断异常,释放锁。
超时获取锁 在截止时间之前获取锁。如果截止时间到了仍未获取到锁,则返回。
  • 缺点:

使用不如synchronized便捷。

2、Lock 接口如何使用?

Lock lock = new ReentrantLock();
lock.lock();

try {
    //业务逻辑
} finally {
    lock.unlock();
}

Lock 是一个接口,它定义了锁的获取和释放的基本操作。Lock的API如下所示:

函数定义 描述
void lock() 阻塞的获取锁。当前线程阻塞,直到获取锁。
void lockInterruptibly() throws InterruptedException 可中断的获取锁。当lock()的区别在于该方法会响应中断。
boolean tryLock() 尝试非阻塞的获取锁,调用该方法立即返回。 如果获取成功返回true,否则返回false。
boolean tryLock(long time, TimeUnit unit) throws InterruptedException 超时可中断的获取锁,以下几种情况返回:
①超时时间内获得了锁
②超时时间内被中断
③超时时间结束,返回false
void unlock() 释放锁
Condition newCondition() 获取等待通知组件。
该组件和当前的锁绑定,当前线程只有获得了锁,才能调用waite()方法。
调用后,当前线程将释放锁。
  • 习题:

以下代码输出是什么?
(假设线程名称是 pool-1-thread-1、pool-1-thread-2 …… pool-1-thread-5)

public class TestV {
    @Test
    public void test() {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Task task = new Task();

        for (int i = 0; i < 5; i++) {
            executorService.submit(task);
        }
        executorService.shutdown();
        while (!executorService.isTerminated()) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Task implements Runnable {
    private Lock lock = new ReentrantLock();

    @Override
    public void run() {
        while (true) {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                lock.unlock();
            }
        }
    }
}

答案:
类似下面

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3

每一秒随机选取一个线程打印线程名。

3、如何实现独占锁?

public class Mutex implements Lock {
    private static class Sync extends AbstractQueuedSynchronizer {
        //是否处于占用状态
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        //当状态为0的时候获取锁
        @Override
        public boolean tryAcquire(int acquires) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        //释放锁,将状态设置为0
        @Override
        protected boolean tryRelease(int releases) {
            if (getState() == 0) {
                throw new IllegalMonitorStateException(null);
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        //返回一个Condition,每个Condition都包含了一个condition队列
        Condition newCondition() {
            return new ConditionObject();
        }
    }

    //将操作代理到Sync上
    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

通过 AbstractQueuedSynchronizer来实现(简称AQS)。

  • AQS是构建锁或者其他同步组件的基础框架。
  • AQS的主要使用方式是『继承+模板方法』来使用。
  • AQS使用int变量表示同步状态,通过内置FIFO队列完成线程排队。
  • AQS通过以下3个基础方法 访问或修改 同步状态:
函数定义 说明
getState() 获取当前同步状态
setState(int newState) 设置同步状态
compareAndSetState(int expect, int update) 使用CAS设置状态,可保证设置原子性。
  • AQS可重写的方法如下:
  • 同步器提供的模板方法如下所示:

4、AbstractQueuedSynchronizer 内部如何实现?

3、如何实现独占锁?中从sync.acquire(1);sync.tryAcquire()的流程是什么样的?

4.1、独占锁总体流程

代码实现如下:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

解析:
① 调用tryAcqure()方法获取同步状态
addWaiter()获取失败之后生成一个『链表节点』并加入链表尾部
acquireQueued()进入自旋过程。

4.2、队列介绍

4.3、自旋介绍

  • 进入自旋
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

① 死循环
② 如果前一个节点是head,则尝试获取同步状态
③ 如果获取失败,则进入『等待/通知』模式

  • 退出自旋
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

① 释放同步状态。
② 通过LockSuppor.unpark()通知后继节点。

4.4、共享式同步状态获取与释放

共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
        private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

tryAcquireShared(int)用来获取同步状态,如果返回>0则说明获取到了同步状态
② 如果获取不到则进入自旋状态

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
  • 释放同步状态逻辑和『独占式』逻辑基本一致
  • 主要区别在于:支持多个线程同时访问的并发组件(如Semaphore),需要保证状态安全释放,一般通过CAS实现。

4.5、独占式超时获取同步状态

    private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
        long lastTime = System.nanoTime();
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                if (nanosTimeout <= 0)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                long now = System.nanoTime();
                nanosTimeout -= now - lastTime;
                lastTime = now;
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  • 主要流程和独占式基本类似
  • 主要区别:在于未获取到同步状态时,再等待nanosTimeout纳秒返回。

5、LockSupport的用途?

LockSupport定义了一组以park开头的方法用来阻塞当前线程,一组unpark开头的方法用来唤醒线程。

函数定义 描述
void park() 阻塞当前线程。 调用unpark(Thread thread)或者中断当前线程才能返回。
void parkNanos() 在park()基础上增加了超时返回。
parkUtil(long deadLine) 阻塞当前线程,直到deadLine时间(1970到deadLine毫秒数)
void unpark(Thread thread) 唤醒阻塞的thread

6、LockSupport的park/unpark与 object.wait/notify的区别?

主要有以下不同:1

  1. 面向的主体不一样。
    LockSuport主要是针对Thread进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒。Object.wait()是以对象为纬度,阻塞当前的线程和唤醒单个(随机)或者所有线程。

  2. 实现机制不同。
    虽然LockSuport可以指定monitor的object对象,但和object.wait(),两者的阻塞队列并不交叉。object.notifyAll()不能唤醒LockSupport的阻塞Thread.

7、如何实现TwinsLock?

public class TwinsLock implements Lock {

    private static final class Sync extends AbstractQueuedSynchronizer {
        public Sync(int count) {

            if (count <= 0) {
                throw new IllegalArgumentException("count must large than 0");
            }
            setState(count);
        }

        public int tryAcquireShared(int reduceCount) {
            for (;;) {
                int current = getState();
                int newCount = current - reduceCount;
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }
        }

        public boolean tryReleaseShared(int returnCount) {
            for (;;) {
                int current = getState();
                int newCount = current + returnCount;

                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }

        }
    }


    private final Sync sync = new Sync(2);

    @Override
    public void lock() {
        sync.acquireShared(1);
    }


    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

8、什么是『重入锁』?如何实现『重入锁』?

8.1、重入锁

  • 支持重进入的锁,表示能够一个线程对资源的重复加锁。
  • synchronized关键字隐式支持冲进入。

8.2、如何实现

重入锁需要解决以下两个问题:

  • 再次获取锁时识别是否为当前线程占据锁。
  • 线程重复获取n次锁,又重复释放n次锁,其它线程才能获取到锁。

通过自定义同步器实现,

同步器的tryAcqire(int acuires)方法如下:

final boolean noFairAcquire(int acquires) {
    find Thread thread = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if compareAndSet(0, acquires) {
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nexc);
    }
    return false;
}

同步器的tryRelease(int releases)方法如下:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw IllegalMonitorStateException();
    
    boolean free = false
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return false;
}

9、『公平锁』和『非公平锁』区别?如何实现?

9.1 『公平锁』和『非公平锁』区别?

如果一个锁是公平的,那么锁的获取顺序应该和请求锁的顺序一致,先来先得。

9.2、公平锁如何实现

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

和上面相比多了!hasQueuedPredecessors()的判断。

9.3、公平锁和非公平锁比较

非公平锁相比公平锁,

  • 优点:
    吞吐量更大。因此同样的工作前提下,非公平锁的线程切换次数更小。

  • 缺点
    会造成线程的饥饿。

10、『读写锁』如何使用?如何实现『读写锁』?

10.1、『读写锁』使用

class Cache {
    private Map<String, Object> map = new HashMap();
    private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();

    private Lock r = reentrantReadWriteLock.readLock();
    private Lock w = reentrantReadWriteLock.writeLock();

    public Object get(String key) {
        r.lock();
        try {
            return map.get(key);
        } finally {
            r.unlock();
        }
    }
    public void put(String key, Object value) {
        w.lock();
        try {
            map.put(key, value);
        } finally {
            w.unlock();
        }
    }
    public void clear() {
        w.lock();
        try {
            map.clear();
        } finally {
            w.unlock();
        }
    }
}
方法名称 描述
int getReadLockCount() 返回当前读锁被获取的次数。该次数不等于获取读锁的线程数,比如:仅一个线程,它连续获取(重进入)了n次读锁,那么占据读锁的线程数是1,但该方法返回n
int getReadHoldCount() 返回当前线程获取读锁的次数。该方法在Java 6 中加入到ReentrantReadWriteLock中,使用ThreadLocal保存当前线程获取的次数,这也使得Java 6 的实现变得更加复杂
boolean isWriteLocked() 判断写锁是否被获取
int getWriteHoldCount() 返回当前写锁被获取的次数

10.2 如何实现『读写锁』

因为『读写锁』的自定义同步器需要,用1个int维护多个读线程和一个写线程,
所以就一定要『按位切割使用』。

  • 高16位表示读,低16位表示写
  • 划分方式如下
写锁的获取与释放

写锁是一个支持重入的排它锁。

        protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

相比可重入锁,添加了对读锁的判断。如果存在读锁则无法获取写锁。

释放流程和可重入锁基本一致。

读锁的获取与释放

读锁是一个支持重入的共享锁。

protected final int tryAcquireShared(int unused) {
    for (;;) {
        int c = getState();
        int netc = c + (1 << 16);
        int (nextc < c) 
            throw new Error("Maximum lock count exceeded");
        if (exclusiveCount(c) != 0 && owner !- Thread.currentThread())
            return -1;
        if (compareAndSetState(c, nextc))
            return 1;
    }
}

如果其他线程获取了写锁,则获取失败。
如果写锁未获取 或者 当前线程获取了锁 ,则成功获取读锁。

11、除了 wait()+notify()+synchronize(),如何实现『等待/通知』模式?

wait()+notify()+synchronize()可以实现『等待/通知』模式,
除此之外Condition+Lock配合也可以实现『等待/通知』模式。

  • 简单使用如下
class ConditionUseCase {
    private Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    public void conditionWait() throws InterruptedException {
        lock.lock();
        try {
            condition.await();
        } finally {
            lock.unlock();
        }
    }
    public void conditionSignal() {
        lock.lock();
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
}
  • 实现BlockingQueue
class BoundedQueue<T> {

    private Object[] items;
    private int addIndex, removeIndex, count;
    private Lock lock = new ReentrantLock();

    private Condition notEmpty = lock.newCondition();
    private Condition notFull = lock.newCondition();

    public BoundedQueue(int size) {
        items = new Object[size];
    }

    public void add(T t) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) {
                notFull.await();
            }
            items[addIndex] = t;
            if (++addIndex == items.length) {
                addIndex = 0;
            }
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public T remove(T t) throws InterruptedException {
        lock.lock();
        try {
            while (0 == count) {
                notEmpty.await();
            }
            Object x = items[removeIndex];
            if (++removeIndex == items.length) {
                removeIndex = 0;
            }
            --count;
            notFull.signal();
            return (T) x;
        } finally {
            lock.unlock();
        }
    }
}

12、Condition内部如何实现?

12.1、等待队列

  • 一个Condition对象包含一个等待队列
  • 调用Condition.awaite()会在队列尾部中追加节点
  • 调用Condition.signal()把队列头部节点移除

  • 一个同步器包含一个同步队列、多个等待队列
  • 不同于Object的监视器模式拥有一个同步队列和一个等待队列

12.2 等待

  • 调用await()相当于同步队列的首节点移动到等待队列的尾节点
  • 调用awaite()和从await()返回的前提都是需要获取锁
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            long savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
  • 构造节点加入等待队列。
  • 释放同步状态
  • 进入等待队列

12.3、通知

  • 调用signal()方法会把等待队列的首节点移动到同步节点尾部。
  • 调用signal()方法的前提是需要获取到锁。
  • 调用signalAll()会把等待队列所有节点移动到同步队列尾部。
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
  • 检查当前线程是否获取了锁
  • 唤醒首节点

1、安装与启动

如何安装

brew install redis

配置Redis-Server

#设置成Daemon
daemonize yes

#绑定非localhost网卡,
#确保server和sentinel在同一台机器时的注册ip不是127
bind 192.168.20.100

#这样可以通过密码访问(如果没有密码只能限制本地访问)
requirepass abc123

启动服务端

redis-server /usr/local/etc/redis.con

启动客户端

redis-cli
redis-cli -h host -p port -a password

#检查redis是否启动
>PING
PONG

相关脚本

ps -ef | grep redis-sentinel | grep -v grep | awk '{pring $2}' | xargs kill

cd redis-4.0.1
src/redis-sentinel sentinel.conf
cd ..

cd redis-4.0.1-2
src/redis-sentinel sentinel.conf
cd ..

cd redis-4.0.1-3
src/redis-sentinel sentinel.conf
cd ..

2、配置

2.1、查看配置

  • 语法
CONFIG GET CONFIG_SETTING_NAME
  • 实例
#查看loglevel
CONFIG GET loglevel
#查看所有
CONFIG GET *
  • 常用命令
说明 命令
获取所有的配置信息 CONFIG GET *
获取loglevel的配置信息 CONFIG GET loglevel

2.2、修改配置

  • 语法
CONFIG SET CONFIG_SETTING_NAME NEW_CONFIG_VALUE
  • 实例
#设置loglevel
CONFIG SET loglevel "notice"

2.3、常用配置项

redis.conf 配置项说明如下:

  • 设置是否守护进程
daemonize no
  • 监听端口
port 6379
  • 绑定IP
bind 127.0.0.1

注意:如果没有指定,则监听所有网卡的IP

  • 客户端关闭超时时间

客户端闲置多长时间后关闭连接

timeout 300

如果指定为0,表示关闭该功能

  • 日志记录方式 默认为标准输出,如果配置Redis为守护进程方式运行 且这里配置为标准输出,则日志将会发送给/dev/null
logfile stdout
  • 指定持久化策略

指定在多长时间内,有多少次更新操作,就将数据同步到数据文件,可以多个条件配合

save <seconds> <changes>

Redis默认配置文件中提供了三个条件:

save 900 1
save 300 10
save 60 10000

分别表示900秒(15分钟)内有1个更改,300秒(5分钟)内有10个更改以及60秒内有10000个更改。

  • 本地数据库文件名
dbfilename dump.rdb

默认值为dump.rdb

  • 本地数据库存放目录
dir ./
  • Redis最大内存限制
maxmemory <bytes>

Redis在启动时会把数据加载到内存中,达到最大内存后,Redis会先尝试清除已到期或即将到期的Key,当此方法处理 后,仍然到达最大内存设置,将无法再进行写入操作,但仍然可以进行读取操作。Redis新的vm机制,会把Key存放内存,Value会存放在swap区

  • 是否在每次更新操作后进行日志记录

指定是否在每次更新操作后进行日志记录,Redis在默认情况下是异步的把数据写入磁盘,如果不开启,可能会在断电时导致一段时间内的数据丢失。因为 redis本身同步数据文件是按上面save条件来同步的,所以有的数据会在一段时间内只存在于内存中。默认为no

appendonly no
  • 更新日志文件名

默认为appendonly.aof

appendfilename appendonly.aof
  • 更新日志条件
appendfsync everysec

共有3个可选值:
no:表示等操作系统进行数据缓存同步到磁盘(快)
always:表示每次更新操作后手动调用fsync()将数据写到磁盘(慢,安全)
everysec:表示每秒同步一次(折衷,默认值)

  • 指定包含其它的配置文件
include /path/to/local.conf

可以在同一主机上多个Redis实例之间使用同一份配置文件,而同时各个实例又拥有自己的特定配置文件

3、数据类型

Redis支持五种数据类型:string(字符串),hash(哈希),list(列表),set(集合)及zset(sorted set:有序集合)。

String(字符串)

string是redis最基本的类型,你可以理解成与Memcached一模一样的类型,一个key对应一个value。
string类型是二进制安全的。意思是redis的string可以包含任何数据。比如jpg图片或者序列化的对象 。
string类型是Redis最基本的数据类型,一个键最大能存储512MB。

>SET name "runoob"
>GET name

Hash(哈希)

Redis hash 是一个键名对集合。
Redis hash是一个string类型的field和value的映射表,hash特别适合用于存储对象。

#设置一个Map,键为user:1,并插入3个kv
>HMSET user:1 username runoob password runoob points 200
#获取一个Map所有值
>HGETALL user:1

List(列表)

Redis 列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)。

#左侧插入三个值
>lpush runoob redis
>lpush runoob mongodb
>lpush runoob rabitmq
#列出0-10个元素
>lrange runoob 0 10

Set(集合)

Redis的Set是string类型的无序集合。
集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是O(1)。

Zset (有序集合)

Redis zset 和 set 一样也是string类型元素的集合,且不允许重复的成员。
不同的是每个元素都会关联一个double类型的分数。
redis正是通过分数来为集合中的成员进行从小到大的排序。
zset的成员是唯一的,但分数(score)却可以重复。

4、Redis命令1

键(Key)

  • 语法

    COMMAND KEY_NAME

  • 实例
    ```

    SET runoobkey redis
    DEL runoobkey
    ```

  • 常用命令

命令 说明
DEL key 该命令用于在 key 存在时删除 key。
DUMP key 序列化给定 key ,并返回被序列化的值。
EXISTS key 检查给定 key 是否存在。
EXPIRE key seconds 为给定 key 设置过期时间。
KEYS pattern 查找所有符合给定模式( pattern)的 key 。 (查看所有key: KEYS *)
RENAME key newkey 修改 key 的名称
TYPE key 返回 key 所储存的值的类型。

字符串(String)

  • 语法 redis 127.0.0.1:6379> COMMAND KEY_NAME
  • 常用命令
序号 命令及描述
1 [SET key value]  设置指定 key 的值
2 [GET key] 获取指定 key 的值。
3 [GETRANGE key start end]  返回 key 中字符串值的子字符
4 [GETSET key value] 将给定 key 的值设为 value ,并返回 key 的旧值(old value)。

哈希(Hash)

列表(List)

集合(Set)

有序集合(Sorted Set)

发布订阅

发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

  • 实例

订阅频道redisChat:

SUBSCRIBE redisChat

向某个频道发布消息

PUBLISH redisChat "Redis is a great caching technique"
  • 常用命令
序号 命令 描述
1 PSUBSCRIBE pattern [pattern ...] 订阅一个或多个符合给定模式的频道。
2 PUBSUB subcommand [argument [argument ...]] 查看订阅与发布系统状态。
3 PUBLISH channel message 将信息发送到指定的频道。
4 PUNSUBSCRIBE [pattern [pattern ...]] 退订所有给定模式的频道。
5 SUBSCRIBE channel [channel ...] 订阅给定的一个或多个频道的信息。
6 UNSUBSCRIBE [channel [channel ...]] 指退订给定的频道。

5、理论

5.1、Redis持久化

http://redisdoc.com/topic/persistence.html

6、高级特性

6.1、主从分离2

  • 所有redis配置 每个redis实例的redis.conf文件配置以下内容
# 端口配置
port 6379

# 修改pidfile
pidfile /var/run/redis_6379.pid
  • Slave Redis配置 Slave实例除了以上配置之外还要配置以下内容
# 设置master
slaveof 127.0.0.1 6379
# 设置master的密码
masterauth <master-password>
  • 启动所有server实例
> src/redis-server redis.conf
  • 确认 主从角色
> src/redis-cli -p 6379 -a abc123
> info

...
# Replication
role:master
connected_slaves:2
slave0:ip=127.0.0.1,port=6380,state=online,offset=322,lag=1
slave1:ip=127.0.0.1,port=6381,state=online,offset=322,lag=0

6.2、读写分离3

  • 配置文件 在master的配置文件中配置:
slave-read-only yes

6.3、Sentinel相关

6.3.1、Sentinel有什么用

  • Monitoring. Sentinel constantly checks if your master and slave instances are working as expected.
  • Notification. Sentinel can notify the system administrator, another computer programs, via an API, that something is wrong with one of the monitored Redis instances.
  • Automatic failover. If a master is not working as expected, Sentinel can start a failover process where a slave is promoted to master, the other additional slaves are reconfigured to use the new master, and the applications using the Redis server informed about the new address to use when connecting.
  • Configuration provider. Sentinel acts as a source of authority for clients service discovery: clients connect to Sentinels in order to ask for the address of the current Redis master responsible for a given service. If a failover occurs, Sentinels will report the new address.

6.3.2、Sentinel配置

  • 配置文件配置4

每个sentinel.conf 中添加以下配置:

# 确保从非localhost网卡可以访问
#protected-mode no

# 绑定非localhost网卡
# 确保server和sentinel在同一台机器时的注册ip不是127
bind 192.168.20.100

# 端口
port 26379

# redis集群的master
# 不需要配置slave,因为可以自动获取
# sentinel monitor <master-group-name> <ip> <port> <quorum>
# The quorum is the number of Sentinels that need to agree about the fact the master is not reachable, in order for really mark the slave as failing, and eventually start a fail over procedure if possible.
# 注意:
# 绑定非localhost网卡
# 确保server和sentinel在同一台机器时的注册ip不是127
sentinel monitor mymaster 192.168.20.100 6379 2

# 以及所有 mymaster 出现的地方添加每组redis配置

# 设置成守护进程
daemonize yes

6.3.3、Sentinel相关命令

  • 启动多个sentinel实例
> src/redis-sentinel sentinel.conf
  • 查看sentinel状态
src/redis-cli -p 26379
127.0.0.1:26379> SENTINEL get-master-addr-by-name mymaster

6.4、分片

6.4.1、如何选择分片方案5

RedisSharding

RedisSharding.gliffy

6.4.2、Redis Sentinel Sharding 配置

参考[6.3.2、Sentinel配置][6.3.2、Sentinel配置],在sentinel配置中添加多个sentinel monitor

7、算法

7.1、一致性哈希

1、定义6

一致哈希 是一种特殊的哈希算法。在使用一致哈希算法后,哈希表槽位数(大小)的改变平均只需要对K/n个关键字重新映射,其中K是关键字的数量,n是槽位数量。然而在传统的哈希表中,添加或删除一个槽位的几乎需要对所有关键字进行重新映射。

2、评判标准7

  • 平衡性(Balance):平衡性是指哈希的结果能够尽可能分布到所有的缓冲中去,这样可以使得所有的缓冲空间都得到利用。很多哈希算法都能够满足这一条件。
  • 单调性(Monotonicity):单调性是指如果已经有一些内容通过哈希分派到了相应的缓冲中,又有新的缓冲加入到系统中。哈希的结果应能够保证原有已分配的内容可以被映射到原有的或者新的缓冲中去,而不会被映射到旧的缓冲集合中的其他缓冲区。
  • 分散性(Spread):在分布式环境中,终端有可能看不到所有的缓冲,而是只能看到其中的一部分。当终端希望通过哈希过程将内容映射到缓冲上时,由于不同终端所见的缓冲范围有可能不同,从而导致哈希的结果不一致,最终的结果是相同的内容被不同的终端映射到不同的缓冲区中。这种情况显然是应该避免的,因为它导致相同内容被存储到不同缓冲中去,降低了系统存储的效率。分散性的定义就是上述情况发生的严重程度。好的哈希算法应能够尽量避免不一致的情况发生,也就是尽量降低分散性。
  • 负载(Load):负载问题实际上是从另一个角度看待分散性问题。既然不同的终端可能将相同的内容映射到不同的缓冲区中,那么对于一个特定的缓冲区而言,也可能被不同的用户映射为不同 的内容。与分散性一样,这种情况也是应当避免的,因此好的哈希算法应能够尽量降低缓冲的负荷。

3、如何实现7

3.1、如何实现单调性?

在分布式集群中,对机器的添加删除,或者机器故障后自动脱离集群这些操作是分布式集群管理最基本的功能。如果采用常用的hash(object)%N算法,那么在有机器添加或者删除后,很多原有的数据就无法找到了,这样严重的违反了单调性原则。接下来主要讲解一下一致性哈希算法是如何设计的:

  • 环形Hash空间
    按照常用的hash算法来将对应的key哈希到一个具有2^32次方个桶的空间中,即0~(2^32)-1的数字空间中。现在我们可以将这些数字头尾相连,想象成一个闭合的环形。如下图

  • 把数据通过一定的hash算法处理后映射到环上
    现在我们将object1、object2、object3、object4四个对象通过特定的Hash函数计算出对应的key值,然后散列到Hash环上。如下图:
    Hash(object1) = key1;
    Hash(object2) = key2;
    Hash(object3) = key3;
    Hash(object4) = key4;

  • 将机器通过hash算法映射到环上
    在采用一致性哈希算法的分布式集群中将新的机器加入,其原理是通过使用与对象存储一样的Hash算法将机器也映射到环中(一般情况下对机器的hash计算是采用机器的IP或者机器唯一的别名作为输入值),然后以顺时针的方向计算,将所有对象存储到离自己最近的机器中。
    假设现在有NODE1,NODE2,NODE3三台机器,通过Hash算法得到对应的KEY值,映射到环中,其示意图如下:
    Hash(NODE1) = KEY1;
    Hash(NODE2) = KEY2;
    Hash(NODE3) = KEY3;

    通过上图可以看出对象与机器处于同一哈希空间中,这样按顺时针转动object1存储到了NODE1中,object3存储到了NODE2中,object2、object4存储到了NODE3中。在这样的部署环境中,hash环是不会变更的,因此,通过算出对象的hash值就能快速的定位到对应的机器中,这样就能找到对象真正的存储位置了。

  • 机器的删除与添加
    普通hash求余算法最为不妥的地方就是在有机器的添加或者删除之后会照成大量的对象存储位置失效,这样就大大的不满足单调性了。下面来分析一下一致性哈希算法是如何处理的。

  1. 节点(机器)的删除
    以上面的分布为例,如果NODE2出现故障被删除了,那么按照顺时针迁移的方法,object3将会被迁移到NODE3中,这样仅仅是object3的映射位置发生了变化,其它的对象没有任何的改动。如下图:

  2. 节点(机器)的添加
    如果往集群中添加一个新的节点NODE4,通过对应的哈希算法得到KEY4,并映射到环中,如下图:

    通过按顺时针迁移的规则,那么object2被迁移到了NODE4中,其它对象还保持这原有的存储位置。通过对节点的添加和删除的分析,一致性哈希算法在保持了单调性的同时,还是数据的迁移达到了最小,这样的算法对分布式集群来说是非常合适的,避免了大量数据迁移,减小了服务器的的压力。

3.2、如何实现平衡性?

根据上面的图解分析,一致性哈希算法满足了单调性和负载均衡的特性以及一般hash算法的分散性,但这还并不能当做其被广泛应用的原由,因为还缺少了平衡性。下面将分析一致性哈希算法是如何满足平衡性的。hash算法是不保证平衡的,如上面只部署了NODE1和NODE3的情况(NODE2被删除的图),object1存储到了NODE1中,而object2、object3、object4都存储到了NODE3中,这样就照成了非常不平衡的状态。在一致性哈希算法中,为了尽可能的满足平衡性,其引入了虚拟节点。

“虚拟节点”( virtual node )是实际节点(机器)在 hash 空间的复制品( replica ),一实际个节点(机器)对应了若干个“虚拟节点”,这个对应个数也成为“复制个数”,“虚拟节点”在 hash 空间中以hash值排列。

以上面只部署了NODE1和NODE3的情况(NODE2被删除的图)为例,之前的对象在机器上的分布很不均衡,现在我们以2个副本(复制个数)为例,这样整个hash环中就存在了4个虚拟节点,最后对象映射的关系图如下:

根据上图可知对象的映射关系:object1->NODE1-1,object2->NODE1-2,object3->NODE3-2,object4->NODE3-1。通过虚拟节点的引入,对象的分布就比较均衡了。那么在实际操作中,正真的对象查询是如何工作的呢?对象从hash到虚拟节点到实际节点的转换如下图:

“虚拟节点”的hash计算可以采用对应节点的IP地址加数字后缀的方式。例如假设NODE1的IP地址为192.168.1.100。引入“虚拟节点”前,计算 cache A 的 hash 值:
Hash(“192.168.1.100”);
引入“虚拟节点”后,计算“虚拟节”点NODE1-1和NODE1-2的hash值:
Hash(“192.168.1.100#1”); // NODE1-1
Hash(“192.168.1.100#2”); // NODE1-2

posted in JavaWeb 

最近工作中涉及到两个对批量用户进行离线处理的工作:

  1. 对若干用户进行打标记。
  2. 对若干用户进行消息推送。

联想到之前也有很多这种类似的工作,索性把其中共用的部分抽离出来做成了框架——取名叫绿萝。

通过使用该容器,使用者编写处理业务相关的代码,业务无关的部分交给容器来解决。好比 Servlet和Tomcat之间的关系,绿萝作为一个容器来运行业务代码。

1、有什么用

具体来说,绿萝主要为使用者处理以下逻辑:

  1. 从文件中读取用户列表进行后续处理
  2. 把读取的用户列表分发给若干现成进行进行处理,线程数可指定
  3. 可以通过参数对消费的QPS进行限制,防止影响线上系统
  4. 对所有用户的处理结果进行返回码的统计
  5. 把业务参数解析成Map供使用者进行使用

2、怎么用

绿萝使用起来很简单,需要以下几步:

2.1 安装lvluo到本地repo

在lvluo目录下执行

mvn clean install -Dmaven.test.skip

2.2、引入依赖

        <dependency>
            <groupId>com.netease</groupId>
            <artifactId>lvluo-worker</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

2.3、实现接口

写一个类继承AbstractConsumerTask,并添加@Component注解。
(注:该类需要放置在com.netease路径下)

例如,要对1亿用户进行消息推送,那么编写以下类即可:

@Component
public class PushConsumerTask extends AbstractConsumerTask {

    private static final Logger LOGGER = LoggerFactory.getLogger(OneConsumerTask.class);
    private static final Logger SUCCESS_LOGGER = LoggerFactory.getLogger("successLogger");
    private static final Logger FAIL_LOGGER = LoggerFactory.getLogger("failLogger");

    private AtomicLong successCount = new AtomicLong(0);
    private AtomicLong failCount = new AtomicLong(0);

    @Override
    public int doService(String username, Map<String, String> requestMap) {

        //获取请求参数,进行参数校验
        String message = requestMap.get("message");
        if (StringUtils.isEmpty(indexStr)) {
            System.out.println("message 参数为空");
            System.exit(-1)
        }

        //业务逻辑处理
        int retCode = pushMessage(username, message);

        //打印需要的结果到日志
        if (retCode == 0) {
            SUCCESS_LOGGER.info(username);
            LOGGER.info(MessageFormat.format("成功:第{0}个逻辑处理完成, 用户名为{1}", successCount.addAndGet(1), username));
        } else {
            FAIL_LOGGER.info(username);
            LOGGER.info(MessageFormat.format("失败:第{0}个逻辑处理完成, 用户名为{1}", failCount.addAndGet(1), username));
        }

        return retCode;
    }

2.4、打包

pom.xml中加入以下代码:
其中finalName要指定成想要的名字。


    <build>
        <finalName>demo</finalName>
        <sourceDirectory>src/main/java</sourceDirectory>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>1.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.handlers</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.netease.Main</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.schemas</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

2.5、运行

java -jar demo.jar 消费任务 文件路径 业务参数 系统参数
  • 消费任务 - 编写的业务类的名称,首字母替换成小写
  • 文件路径 - 用户名文件对应的文件路径
  • 业务参数 - 你的业务中需要用的参数 (如果有特殊字符或者中文需要进行urlencode)
  • 系统参数 - 如threadCount 指定线程数,qps指定限制的频率

如,

java -jar demo.jar pushConsumerTask usernameList.txt message=xx threadCount=10&qps=100

2.6、输出结果

控制台输出:

10:32:05,096  INFO DemoConsumerTask:51 - 失败:第504个逻辑处理完成, 用户名为urstest_czz994
10:32:05,096  INFO DemoConsumerTask:48 - 成功:第492个逻辑处理完成, 用户名为urstest_czz995
10:32:05,096  INFO AbstractConsumerTask:92 - 本次请求返回码1,用户名为urstest_czz994。总共请求995次,当前返回码统计结果:{0:491,1:504}
10:32:05,097  INFO AbstractConsumerTask:92 - 本次请求返回码0,用户名为urstest_czz995。总共请求996次,当前返回码统计结果:{0:492,1:504}

logs/success.log:

urstest_czz3
urstest_czz4
urstest_czz6
urstest_czz9
urstest_czz10
urstest_czz11

logs/fail.log

urstest_czz0
urstest_czz1
urstest_czz2
urstest_czz5
urstest_czz7

3、注意事项

  1. ConsumerTask类放在com.netease路径下
  2. Spring版本用4及以上版本,推荐4.2.9.RELEASE

4、代码

如果对代码有兴趣欢迎围观吐槽~
https://github.com/czzshr/lvluo

工程中包含了两个module:
- lvluo-demo 是lvluo的使用demo
- lvluo-worker 是lvluo的具体实现

posted in Mac相关 

HelloDpi 用来设置屏幕HiDpi
当Mac外接高分辨率的屏幕(大于1080P)时,设置HiDpi可以使得画面更加细腻。

一、使用步骤

0、关闭系统SIP

macOS 10.10及以下略过该步骤哦!
* 重启电脑,按住Command+R直到出现苹果Logo。
这样进入了recovery mode
* 点击实用工具,进入终端Terminal
* 在终端里输入csrutil disable关闭SIP
* 正常重启电脑

1、添加自定义的分辨率

打开HelloDpi,添加自定义分辨率。

如我的屏幕是2560x1440分辨率,添加1600x900的HiDpi分辨率。

2、重启系统

3、切换分辨率

  • 下载RDM
  • 安装RDM

右键点击『RDM-2.2.pkg』- 打开

  • 使用RDM切换分辨率

Done!

二、最新版本

v1.0.0下载

三、项目源码

项目用C++和Qt实现,如有兴趣欢迎围观:

https://github.com/czzshr

最早是在CSDN上写博客的,可是渐渐发现CSDN写博客有诸多限制,而且CSDN博客时不时会挂掉,因此后面用Hexo在github上搭了一个新博客,也就是现在这个。然而这样一来很多文章就留在CSDN上,而在新博客上找不到了。

因此前几天用Java写了一个爬虫,把CSDN上的博客内容爬了下来,并解析成markdown格式存储在本地,这样就可以方便的迁移到新的博客了。

1、效果

旧博客地址 新博客地址

1.1、旧博客目录

旧博客目录

1.2、旧博客文章

旧博客文章

1.3、新博客目录

新博客目录

1.4、新博客文章

新博客文章

2、需求分析

CSDN博客迁移,把CSDN博客上的内容爬取下来,存为本地的md文件,方便部署到hexo。

具体需求如下:
* 1. 博文内容要转换成MarkDown源码格式存储。
* 2. 支持CSDN博客内容是MarkDown渲染或者非MarkDown渲染的。
* 3. 除了文章正文,其他信息(如标题、发布时间、分类、标签、是否转载等)也要保存下来。
* 4. 生成文章描述。

3、实现思路

  • 用爬虫把需要的内容爬取下来
  • 然后解析相应的内容
  • 存储在本地
  • 发布在新博客

4、如何使用

  • 下载jar包 点击下载
  • 执行命令 java -jar csdn-blog-migration-crawler.jar 博客地址 本地存储路径 如, java -jar csdn-blog-migration-crawler.jar timberwolf_2012 /Users/chenzz/Desktop/blog/

5、项目主页

进入项目主页