posted in JavaWeb 

1、简介

前段时间有项目有读写分离的需要,因此完成了该类库mybatis-read-write-split来实现读写分离。

  • 特点 支持两种模式的主备分离:
  • 业务透明的读写分离。自动解析sql的读写类型并进行路由转发。
  • 基于注解的读写分离。通过注解中的配置来进行读写分离。

以上两种模式可以混合使用:缺省自动解析sql的读写类型,如果注解有指定数据源,则根据注解进行路由。

2、用法

  • pom.xml 添加依赖
        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis-read-write-split-core</artifactId>
            <version>2.0-SNAPSHOT</version>
        </dependency>
  • 配置数据源
    <!--替换原本的DataSource-->
    <bean id="dataSource" class="org.mybatis.rw.MultiReadDataSource">
        <property name="masterDataSource" ref="masterDataSource"/>
        <property name="slaveDataSourceList">
            <list>
                <ref bean="slaveDataSource1"></ref>
                <ref bean="slaveDataSource2"></ref>
            </list>
        </property>
    </bean>
    
    <bean id="masterDataSource" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
        <!--各数据源配置-->
    </bean>
    
    <bean id="slaveDataSource1" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
        <!--各数据源配置-->
    </bean>
    
    <bean id="slaveDataSource2" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
        <!--各数据源配置-->
    </bean>

2.1、业务透明区分读写

mybatis自动分析读or写操作,并进行相应的路由操作

  • mybatis配置文件添加interceptor
   <plugins>
        <plugin interceptor="org.mybatis.rw.interceptor.ReadWriteDistinguishInterceptor">
        </plugin>
    </plugins>

2.2、通过注解区分读写

通过方法上的注解显示指定读主库or备库

  • 在目标方法上添加 @DataSource()

    @DataSource(DataSourceType.MASTER)
    public User getUserByIdFromMaster(Integer userId) {
        //some operation
    }

3、内部实现

2.1、业务透明区分读写

  1. Mapper调用MyBatis进行读写
  2. MyBatis分析读写类型,并存入ThreadLocal中
  3. 自定义DataSource从ThreadLocal里获取读写类型,路由给对应的子DataSource
  4. 使用对应的子DataSource进行读写操作

2.2、通过注解实现读写区分

  1. Spring的切面读取注解的内容,分析 读/写 操作
  2. 把分析结果丢到 ThreadLocal中
  3. 自定义DataSource从ThreadLocal里获取DataSource类型,路由给对应的子DataSource
  4. 使用对应的子DataSource进行读写操作

4、项目源码

https://github.com/chenzz/mybatis-read-write-split

posted in JavaWeb 

在进行SOA系统设计,设计微服务的接口形式,会遇到一些问题,

  • 入参是多个入参还是包装在一个对象中
  • 异常返回是通过Exception还是返回码

以下是开发过程中的一些思考与实践:

1、实践

定义成 GetUserInfoResp getUserInfo(GetUserInfoReq req)的形式,

  • 入参

    • GetUserInfoReq继承了AbstractReq
    • AbstractReq保存了公用的请求参数成员变量,如username、ip
  • 出参

    • GetUserInfoResp继承了AbstractResp
    • AbstractResp保存了公用的返回码成员变量,如retCode

代码设计

interface UserProvider {
    GetUserInfoResp getUserInfo(GetUserInfoReq req);
}

class GetUserInfoReq extends AbstractReq {
}

class AbstractReq {
    private String username;
    private String ip;
}

class GetUserInfoResp extends AbstractResp {
    private String school;
    private double weight;
}

class AbstractResp {
    private String retCode;
    private String message;
}

2、入参设计

为什么要把各种成员变量包装在一个Req对象中,而不是做成getUserInfo(String username, String disctrict)的形式呢?

2.1 方便拓展

  • 案例一:

如果这个接口有两个调用方,某一天这个接口新添加了一个入参int gender,其中一个调用方会传这个参数,另一个调用方不传。

如果是当前这个方案,只需要在Req对象中添加一个成员变量int gender即可,然后两个消费方都可以兼容的进行调用。

如果是采用getUserInfo(String username, String disctrict)的方案,那么就只能新加一个方法getUserInfo(String username, String disctrict, int gender)了,造成了代码的冗余。

  • 案例二:

如果某一天,这个系统要支持不同国家的业务,所有的接口都要添加一个参数String nation

那么基于当前的方案,只需要在AbstractReq中添加一个String nation即可,非常方便。

如果基于getUserInfo(String username, String disctrict)的形式,那么可能需要批量修改几百个接口,花费了大量的时间,同时也提高了错误发生的概率。

2.2 方便映射

在SOA的系统设计中,往往会在内部系统的前面添加一个gateway系统来做一些公共逻辑,网关对外提供http服务,而网关和内部系统通过专有协议进行通信,

想想这样一个场景,/user/getUserInfo这个接口提供获取用户信息的服务,用户的请求body可能有以下几种形式:

  • 形式一:
{
    "name": "jason",
    "nation": "america"
}
  • 形式二:
{
    "nation": "america",
    "name": "jason"
}

要把这些形式的请求映射到后端接口上:

2.2.1 如果是当前最佳实践

即如果是getUserInfo(GetUserInfoReq req)的形式

那么网关很好设计,

  1. 提前记录url和接口的映射规则,
  2. 请求来的时候,把url映射成接口,直接把body反序列化成GetUserInfoReq对象即可。
2.2.2 如果是另一种形式

即如果getUserInfo(String name, String nation)的形式

那么网关很不好设计,

  1. 提前记录url和接口的映射规则,

    这个记录会很麻烦,因为需要记录 body中的每个参数和接口中的第几个参数是对应的

  2. 映射也很麻烦,

    需要遍历body中的变量,把body中的某个变量映射成接口第n个参数

究其原因,是因为通过反射可以获取到一个对象的成员变量的名称,却无法获取到一个方法的参数名称。

3、出参设计

为什么通过通过retCode来包装异常情况,而不是直接抛出异常?

3.1 降低系统间的耦合

如果getUserInfo通过异常来表达业务异常情况,那调用方势必要接触到 提供方的各种Exception类型,大大提高了系统的耦合性。

3.2 避免延迟 && 避免占用带宽

Exception会带着一个调用栈,
如果通过网络抛出异常,一方面会占用带宽,一方面会造成延迟。

4、总结

基于以上原因,应该采用GetUserInfoResp getUserInfo(GetUserInfoReq req)的形式定义接口。

posted in JavaWeb 

一、HelloWorld Demo

一个最简单的MyBatis Demo

1、 代码

源码下载:mybatis-demo
对应DB表:mybatis-demo

public class UserService {
    public User getUserById(Integer userId) {
        SqlSession sqlSession = MyBatisUtil.getSqlSessionFactory().openSession();
        try {
            UserMapper userMapper = sqlSession.getMapper(UserMapper.class);
            return userMapper.getUserById(userId);
        } finally {
            sqlSession.close();
        }
    }
}
public class MyBatisUtil
{
    private static SqlSessionFactory factory;

    private MyBatisUtil() {
    }

    static
    {
        Reader reader = null;
        try {
            reader = Resources.getResourceAsReader("mybatis-config.xml");
        } catch (IOException e) {
            throw new RuntimeException(e.getMessage());
        }
        factory = new SqlSessionFactoryBuilder().build(reader);
    }

    public static SqlSessionFactory getSqlSessionFactory()
    {
        return factory;
    }
}
  • mybatis-config.xml
<?xml version='1.0' encoding='UTF-8' ?>
<!DOCTYPE configuration
        PUBLIC '-//mybatis.org//DTD Config 3.0//EN'
        'http://mybatis.org/dtd/mybatis-3-config.dtd'>
<configuration>
    <properties resource='jdbc.properties'/>
    <typeAliases>
        <typeAlias type='com.sivalabs.mybatisdemo.domain.User' alias='user'></typeAlias>
    </typeAliases>
    <environments default='development'>
        <environment id='development'>
            <transactionManager type='JDBC'/>
            <dataSource type='POOLED'>
                <property name='driver' value='${jdbc.driverClassName}'/>
                <property name='url' value='${jdbc.url}'/>
                <property name='username' value='${jdbc.username}'/>
                <property name='password' value='${jdbc.password}'/>
            </dataSource>
        </environment>
    </environments>
    <mappers>
        <mapper resource='com/sivalabs/mybatisdemo/mappers/UserMapper.xml'/>
    </mappers>
</configuration>
  • UserMapper.xml
<?xml version='1.0' encoding='UTF-8' ?>
<!DOCTYPE mapper PUBLIC '-//mybatis.org//DTD Mapper 3.0//EN'
        'http://mybatis.org/dtd/mybatis-3-mapper.dtd'>

<mapper namespace='com.sivalabs.mybatisdemo.mappers.UserMapper'>

    <select id='getUserById' parameterType='int' resultType='com.sivalabs.mybatisdemo.domain.User'>
        SELECT
        user_id as userId,
        email_id as emailId ,
        password,
        first_name as firstName,
        last_name as lastName
        FROM USER
        WHERE USER_ID = #{userId}
    </select>
    <!-- Instead of referencing Fully Qualified Class Names we can register Aliases in mybatis-config.xml and use Alias names. -->
    <resultMap type='com.sivalabs.mybatisdemo.domain.User' id='UserResult'>
        <id property='userId' column='user_id'/>
        <result property='emailId' column='email_id'/>
        <result property='password' column='password'/>
        <result property='firstName' column='first_name'/>
        <result property='lastName' column='last_name'/>
    </resultMap>

    <select id='getAllUsers' resultMap='UserResult'>
        SELECT * FROM USER
    </select>

2、代码逻辑

  • 在执行userMapper.getUserById(userId)之前,会先执行2.1和2.1代码
  • 2.3是userMapper.getUserById(userId)真正执行的代码

2.1 解析配置文件(#hello-world-demo-2-1)

        Reader reader = null;
        try {
            reader = Resources.getResourceAsReader("mybatis-config.xml");
        } catch (IOException e) {
            throw new RuntimeException(e.getMessage());
        }
        factory = new SqlSessionFactoryBuilder().build(reader);

该段代码主要把mybatis-config.xmlUserMapper.xml中的各种配置解析到Configuration实例中去。

2.2 构造UserMapper实例

SqlSession sqlSession = MyBatisUtil.getSqlSessionFactory().openSession();

UserMapper userMapper = sqlSession.getMapper(UserMapper.class);
  • 通过openSession()来进行实例的初始化,此处可暂时略过。
  • 通过getMapper()方法来获取Mapper实例。

此处getMapper()方法的内部实现是为UserMapper生成一个动态代理,并且返回:

  protected T newInstance(MapperProxy<T> mapperProxy) {
    return (T) Proxy.newProxyInstance(mapperInterface.getClassLoader(), new Class[] { mapperInterface }, mapperProxy);
  }

  public T newInstance(SqlSession sqlSession) {
    final MapperProxy<T> mapperProxy = new MapperProxy<T>(sqlSession, mapperInterface, methodCache);
    return newInstance(mapperProxy);
  }

此处的意思为:
MapperProxy这个类的invoke()方法来实现对应的函数调用流程。

2.3 执行对应查询代码

            return userMapper.getUserById(userId);

因为这里的userMapper对象是动态代理对象,所以其实这里执行的是MapperProxy.invoke()方法。
MapperProxy.invoke()方法内容如下:

  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    if (Object.class.equals(method.getDeclaringClass())) {
      try {
        return method.invoke(this, args);
      } catch (Throwable t) {
        throw ExceptionUtil.unwrapThrowable(t);
      }
    }
    final MapperMethod mapperMethod = cachedMapperMethod(method);
    return mapperMethod.execute(sqlSession, args);
  }

  private MapperMethod cachedMapperMethod(Method method) {
    MapperMethod mapperMethod = methodCache.get(method);
    if (mapperMethod == null) {
      mapperMethod = new MapperMethod(mapperInterface, method, sqlSession.getConfiguration());
      methodCache.put(method, mapperMethod);
    }
    return mapperMethod;
  }

这里生成了一个MapperMethod 对象,并且调用mapperMethod.execute(sqlSession, args)来进行真正的DB查询逻辑。

mapperMethod.execute(sqlSession, args)的内部逻辑如下所示

2.3.1 找到接口对应的sql

DefaultSqlSession#selectList()方法内容如下:

  @Override
  public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
    try {
      MappedStatement ms = configuration.getMappedStatement(statement);
      return executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);
    } catch (Exception e) {
      throw ExceptionFactory.wrapException("Error querying database.  Cause: " + e, e);
    } finally {
      ErrorContext.instance().reset();
    }
  }

根据接口找到对应的sql,然后交由executor执行

2.3.2 执行sql,封装结果

缺省情况下executor是SimpleExecutorSimpleExecutor.doQuery()方法内容如下:

  @Override
  public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
    Statement stmt = null;
    try {
      Configuration configuration = ms.getConfiguration();
      StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
      stmt = prepareStatement(handler, ms.getStatementLog());
      return handler.<E>query(stmt, resultHandler);
    } finally {
      closeStatement(stmt);
    }
  }
  1. 执行sql,获得结果
  2. 把结果转换成接口中声明的类型然后返回

二、Plugin Demo

演示 MyBatis Plugin 功能的 Demo。

1、代码

源码下载:mybatis-demo
对应DB表:mybatis-demo

添加Interceptor类:

@Intercepts({@Signature(type = StatementHandler.class, method = "parameterize", args = {Statement.class})})
public class SQLStatsInterceptor implements Interceptor {

    public Object intercept(Invocation invocation) throws Throwable {

        StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
        BoundSql boundSql = statementHandler.getBoundSql();
        String sql = boundSql.getSql();
        System.out.println("mybatis intercept sql:" + sql);
        return invocation.proceed();
    }

    public Object plugin(Object target) {
        return Plugin.wrap(target, this);
    }

    public void setProperties(Properties properties) {
        String dialect = properties.getProperty("dialect");
        System.out.println("mybatis intercept dialect:" + dialect);
    }
}

添加interceptor配置:

mybatis-config.xml 添加以下配置,

    <plugins>
        <plugin interceptor="com.sivalabs.mybatisdemo.interceptor.SQLStatsInterceptor">
            <property name="dialect" value="mysql"/>
        </plugin>
    </plugins>

2、代码逻辑

2.1 解析配置文件

同上。2.1 解析配置文件

2.2 构造UserMapper实例

SqlSession sqlSession = MyBatisUtil.getSqlSessionFactory().openSession();

UserMapper userMapper = sqlSession.getMapper(UserMapper.class);
2.2.1、通过openSession()来进行实例的初始化。

openSession()的内部内部实现如下:

  private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
    Transaction tx = null;
    try {
      final Environment environment = configuration.getEnvironment();
      final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
      tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
      final Executor executor = configuration.newExecutor(tx, execType);
      return new DefaultSqlSession(configuration, executor, autoCommit);
    } catch (Exception e) {
      closeTransaction(tx); // may have fetched a connection so lets call close()
      throw ExceptionFactory.wrapException("Error opening session.  Cause: " + e, e);
    } finally {
      ErrorContext.instance().reset();
    }
  }

主要业务逻辑为

  1. 开启一个事务
  2. 构造一个newExecutor

configuration.newExecutor()的内部实现如下:

    executor = (Executor) interceptorChain.pluginAll(executor);

把所有的插件(Interceptor)嵌入到Executor中,具体实现如下:

  public Object pluginAll(Object target) {
    for (Interceptor interceptor : interceptors) {
      target = interceptor.plugin(target);
    }
    return target;
  }
    public Object plugin(Object target) {
        return Plugin.wrap(target, this);
    }
  public static Object wrap(Object target, Interceptor interceptor) {
    Map<Class<?>, Set<Method>> signatureMap = getSignatureMap(interceptor);
    Class<?> type = target.getClass();
    Class<?>[] interfaces = getAllInterfaces(type, signatureMap);
    if (interfaces.length > 0) {
      return Proxy.newProxyInstance(
          type.getClassLoader(),
          interfaces,
          new Plugin(target, interceptor, signatureMap));
    }
    return target;
  }

细节已经展现,MyBatis 内部通过 Java 的动态代理来实现 Plugin(Interceptor)的嵌入。

2.2.2、通过getMapper()方法来获取Mapper实例。

同上。

2.3 执行对应查询代码

同上。

2.3.1 找到接口对应的sql

同上。

2.3.2 执行sql,封装结果

缺省情况下executor是SimpleExecutorSimpleExecutor.doQuery()方法内容如下:

  @Override
  public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
    Statement stmt = null;
    try {
      Configuration configuration = ms.getConfiguration();
      StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
      stmt = prepareStatement(handler, ms.getStatementLog());
      return handler.<E>query(stmt, resultHandler);
    } finally {
      closeStatement(stmt);
    }
  }
  1. 执行sql,获得结果
  2. 把结果转换成接口中声明的类型然后返回

其中1.调用了configuration.newStatementHandler()来构造一个 statement,其内部实现如下:

  public StatementHandler newStatementHandler(Executor executor, MappedStatement mappedStatement, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) {
    StatementHandler statementHandler = new RoutingStatementHandler(executor, mappedStatement, parameterObject, rowBounds, resultHandler, boundSql);
    statementHandler = (StatementHandler) interceptorChain.pluginAll(statementHandler);
    return statementHandler;
  }

我们看到了interceptorChain.pluginAll(statementHandler),这里的逻辑便和2.2.2一致了,用 Java 的动态代理,把Interceptor的代码嵌入到Handler实例中。

这样一来,如 SQLStatsInterceptor 的注解中声明的那样,在调用 StatementHandler.parameterize时,便会回调Interceptor的代码逻辑了。

posted in JavaWeb 

1、问题:

不同层代码之间的异常情况,通过『返回码包装』还是『通过异常捕获』解决?

Java应用中,上层的代码会调用下层的代码:比如Controller层调用Service层,Service层调用Dao层等等。

上层代码调用下层代码会出错,比如UserServer.getUserById()这个方法,根据ID获取用户的详细信息。

2、解决方式:

错误的情况下,下层代码需要告诉上层代码错误发生了。
此时有两种通知上层调用方:
方法一:通过返回码的包装
方法二:通过抛出异常

两种详细说明如下:

2.1、 方法一:通过返回码的包装

UserServer.getUserById()返回一个Result<User>类

  • 函数定义如下:
Result<User> UserServer.getUserById(int id);
  • 其中Result定义如下:
public class Result<T> {
    private RetCodeEnum retCodeEnum;
    private T info;
}
  • 调用方调用姿势如下:
Result<User> userResult = userServer.getUserById(5);
if (RetCodeEnum.OK != userResult.getRetCodeEnum) {
    //给用户相应提示
}

//继续处理业务

2.2、方法二:通过抛出异常

UserServer.getUserById()抛出业务异常,

  • 函数定义如下:
User UserServer.getUserById(int id) throw BusinessException;
  • 其中BusinessException定义如下:
public class RetCodeEnumException extends Exception {
    private RetCodeEnum retCodeEnum;
}
  • 调用方调用姿势如下:
User user = userServer.getUserById(5);
//继续处理业务
  • 最后来一个全局的ExceptionHandler()来处理业务异常:
    protected BaseResp handleException(HttpServletRequest req, Exception e) {

        BaseResp baseResp;
        if (e instanceof BusinessException) {
            BusinessException businessException = (BusinessException) e;
            baseResp = new BaseResp(new Result(BusinessException));
        } else {
            baseResp = new BaseResp(new Result(RetCodeEnum.SYS_BUSY));
            LOGGER.error(e.getMessage(), e);
        }

        return baseResp;
    }

3、两种方式比较

第二种调用方式,把错误情况的处理统一到一个Handler处理,完爆第一种方式,理由如下:

3.1、大大减少了重复代码量。

比如你的Java应用有10层,最底层的一个函数有一种错误的情况,那上面9层调用这个方法的100个函数都要加上这么一句:

if (RetCodeEnum.OK != userResult.getRetCodeEnum) {
    //给用户相应提示
}

3.2、大大提升了安全性 && 健壮性

第一种方式,每次调用都要添加以上几行代码,而一旦忘记,就意味着 安全问题 or 莫名其妙的错误提示!

第二种方式,因为Exception的抛出是强制的,一旦错误情况发生,一定会中断正常的流程。

3.3、简化错误逻辑处理的代码

某天想在所有错误的情况下打印日志,

  • 第一种方式,需要在所有调用的地方打印日志,LOGGER.error("xxx")可能要写几千次;
  • 第二种方式,只需要在ExceptionHandler中添加一次LOGGER.error("xxx")即可。

3.4、错误信息更加 清晰结构化

如果
底层的一个方法a出错,导致方法 b、c、d 都产生问题
底层的一个方法m出错,导致方法 o、p、q 都产生问题

  • 如果用返回码的方式处理

那么日志里看起来是这样的:

a error
b error
c error
d error
m error
o error
p error
q error

很难区分到底出现了几次问题,每次出问题前因后果是什么

  • 如果用异常的方式处理

那么日志里看起来是这样的:

A Exception:
    Statck trace:
        d
        c
        b
        a
    Cause by:
        a
        
B Exception:
    Stack trace:
        q
        p
        o
        m
    Cause by:
        m

容易可以看出产生了几次问题,问题的前因后果是怎样的。

4、结论

鉴于以上种种好处,Java中错误情况的处理尽量通过 Exception 解决,而非通过返回码的包装解决。

posted in JavaWeb 

四、并发编程基础

1、为什么使用多线程

  1. 更多的处理器核心
  2. 更快的响应速度
  3. 更好的编程模型

2、如何设置线程优先级

  • 如何设置线程的优先级
    • Thread.setPriority()
  • 线程的优先级是否可以保证
    • 不同平台下的实现不一样,无法保证

3、线程有哪些状态

状态名称 说明
NEW 初始状态,线程被构建,但还没有调用start()方法。
RUNNABLE 运行状态,Java线程将操作系统中的就绪和运行两种状态统一称为RANNABLE
BLOCK 阻塞状态,表示线程阻塞于锁
WAITING 等待状态,表示当前线程需要等待其它线程做出一些待定动作(通知或中断)
TIME_WAITING 超时等待状态,类似于WAITING,但可以在指定时间后自行返回
TERMINATED 终止状态,表示当前线程已经执行完毕

4、如何达到线程的各种状态

public class Test {
    public static void main(String[] args) {
        new Thread(new TimeWaiting(), "TimeWaiting").start();
        new Thread(new Waiting(), "Waiting").start();
        new Thread(new Blocked(), "Blocked").start();
    }
}
class TimeWaiting implements Runnable {
    @Override
    public void run() {
        while (true) {
            SleepUtils.second(100);
        }
    }
}
class Waiting implements Runnable {
    @Override
    public void run() {
        while (true) {
            synchronized (Waiting.class) {
                try {
                    Waiting.class.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
class Blocked implements Runnable {
    @Override
    public void run() {
        synchronized (Blocked.class) {
            while (true) {
                SleepUtils.second(100L);
            }
        }
    }
}
class SleepUtils {
    public static final void second(long seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • jps后查看
7827 Jps
998 RemoteMavenServer
7783 Launcher
7784 Test
5341 Launcher
  • jstack 7784后查看
......

"Blocked" prio=5 tid=0x00007fcbe4845000 nid=0x5903 waiting on condition [0x00007000048fd000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at java.lang.Thread.sleep(Thread.java:340)
        at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:360)
        at test.SleepUtils.second(Test.java:60)
        at test.Blocked.run(Test.java:51)
        - locked <0x00000007aad2fb10> (a java.lang.Class for test.Blocked)
        at java.lang.Thread.run(Thread.java:745)

"Waiting" prio=5 tid=0x00007fcbe302b000 nid=0x5703 in Object.wait() [0x00007000047fa000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x00000007aad2c388> (a java.lang.Class for test.Waiting)
        at java.lang.Object.wait(Object.java:503)
        at test.Waiting.run(Test.java:36)
        - locked <0x00000007aad2c388> (a java.lang.Class for test.Waiting)
        at java.lang.Thread.run(Thread.java:745)

"TimeWaiting" prio=5 tid=0x00007fcbe4090000 nid=0x5503 waiting on condition [0x00007000046f7000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at java.lang.Thread.sleep(Thread.java:340)
        at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:360)
        at test.SleepUtils.second(Test.java:60)
        at test.TimeWaiting.run(Test.java:24)
        at java.lang.Thread.run(Thread.java:745)

5、线程各种状态之间如何转换

Java线程状态变迁如图所示,

6、Daemon线程有什么特点,如何创建Daemon线程

  • Java虚拟机中不存在非Daemon线程时,Java虚拟机会退出。
  • 通过Thread.setDaemon(true)来指定Daemon线程。

以下线程输出内容是?

public class Daemon {
    public static void main(String[] args) {
        Thread thread = new Thread(new DaemonRunner());
        thread.setDaemon(true);
        thread.start();
    }
}

class DaemonRunner implements Runnable {
    public void run() {
        try {
            SleepUtils.seconds(100L);
        } finally {
            System.out.print("Daemon Thread Exit.");
        }
    }
}

  • 结果:什么都不会输出
  • 注意:Daemon线程不能依靠finally的内容来进行关闭清理资源所及。

7、线程的中断是什么,如何进行中断相关操作

  • 终端是什么

终端可以理解为终端的一个boolean属性,标识运行中的线程是否被其他线程进行终端操作。

  • 如何进行终端相关操作
    • 中断:通过thread.interrupt()进行中断操作
    • 判断:通过isInterrupted()判断是否被中断
    • 复位:通过Thread.interrupted()对当前的中断位进行复位
    • 复位:抛出InterruptedException的方法会先对中断位进行复位,然后抛Exception

以下两个线程的输出是什么?

public class Interrupted {  
    public static void main(String[] args) throws Exception {  
        // sleepThread不停的尝试睡眠
        Thread sleepThread = new Thread(new SleepRunner(), "SleepThread");  
        sleepThread.setDaemon(true);
        // busyThread不停的运行
        Thread busyThread = new Thread(new BusyRunner(), "BusyThread");
        busyThread.setDaemon(true);
        sleepThread.start();
        busyThread.start();
        // 休眠5秒,让sleepThread和busyThread充分运行
        TimeUnit.SECONDS.sleep(2);
        sleepThread.interrupt();
        busyThread.interrupt();
        System.out.println("SleepThread interrupted is " + sleepThread.isInterrupted());
        System.out.println("BusyThread interrupted is " + busyThread.isInterrupted());  
        // 防止sleepThread和busyThread立刻退出  
        TimeUnit.SECONDS.sleep(2);
    }
    static class SleepRunner implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    //e.printStackTrace();
                }
            }
        }
    }
    static class BusyRunner implements Runnable {
        @Override
        public void run() {  
            while (true) {
            }
        }
    }
}

程序输出结果:

SleepThread interrupted is false  
BusyThread interrupted is true  

8、中断有什么用?

中断作为一种简便的线程间交互方式,非常适合用来取消或终止线程。
另一种取消或终止线程的方式是采用一个boolean变量来控制线程。

例1,创建一个线程进行计数,主线程对这个线程取消操作:

public class Test {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new CountTask());
        thread.start();

        TimeUnit.SECONDS.sleep(1L);
        thread.interrupt();
    }
}
class CountTask implements Runnable {
    private volatile boolean on = true;
    private long count = 0L;
    
    @Override
    public void run() {
        while (on && !Thread.currentThread().isInterrupted()) {
            count++;
        }
        System.out.println(c);
    }
}

例2,创建一个生产者,一个消费者和一个BlockingQueue,在任务消费完后结束两个任务:

public class Test {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> usernameQueue = new LinkedBlockingQueue<>(5);

        Thread producer = new Thread(new ProducerTask(usernameQueue));
        Thread consumer = new Thread(new ConsumerTask(usernameQueue));
        producer.start();
        consumer.start();

        //取消代码-------start--------
        while (true) {
            if (!producer.isAlive() && usernameQueue.size()==0) {
                //如果生产者已经生产完了,且队列已经消费完了
                consumer.interrupt();
                break;
            } else {
                TimeUnit.SECONDS.sleep(1L);
            }
        }
        //取消代码-------end--------
    }
}
class ProducerTask implements Runnable {
    private BlockingQueue<String> usernameQueue;

    public ProducerTask(BlockingQueue<String> usernameQueue) {
        this.usernameQueue = usernameQueue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                usernameQueue.put(i + "");
            } catch (InterruptedException e) {
                return;
            }
        }
    }
}

class ConsumerTask implements Runnable {
    private BlockingQueue<String> usernameQueue;

    public ConsumerTask(BlockingQueue<String> usernameQueue) {
        this.usernameQueue = usernameQueue;
    }

    @Override
    public void run() {
        String username;
        try {
            while ((username = usernameQueue.take()) != null) {
                System.out.println(username);
            }
        } catch (InterruptedException e) {
            return;
        }
    }
}

9、suspend()、resume()和stop()作用

  • 作用:把一个线程的运作比作录音机,这个线程的暂停、继续播放、停止 分别对应 Thread的suspend()、resume()、stop()
  • 过时的API:
    • suspend()时不会释放锁,而是占着资源入睡,容易死锁。
    • stop()时不保证资源的正常释放,没有给线程释放资源机会,导致线程运行在不确定状态下。

10、使用 stop/interrupt 结束进程有什么区别?

  • stop: 不保证资源的正常释放,没有给线程释放资源机会,导致线程运行在不确定状态下。
  • interrupt: 线程终止时有机会去清理资源,而不是武断的终止线程,这种做法更加安全和优雅。

11、synchronized底层实现原理?

  • 作用:
    synchronized可以修饰方法或者修饰方法,用于多线程时只有一个线程在方法或代码块中,保证了线程对变量的可见性和排他性。

  • 原理:

public class Test {
    public static void main(String[] args) {
        //对Test Class对象进行加锁
        synchronized (Test.class) {
        }
        f();
    }
    //对Test Class对象进行加锁(因为是静态方法)
    public static synchronized void f() {
    }
}

通过javap -verbose Test.class进行反编译,得到以下结果:

         4: astore_1
         5: monitorenter
         6: aload_1
         7: monitorexit
         ......
public static synchronized void f();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
    Code:

同步代码块采用moniterentermoniterexit实现,同步静态方法依靠ACC_SYNCHRONIZED来实现,而本质上是对一个监视器(monitor)的获取。某个线程只有获得了一个对象的监视器,才能执行对应的同步代码块或者同步方法。

对象、对象监视器、同步队列和执行线程之间关系如下:

12、如何实现 等待/通知 机制?

12.1、什么是等待/通知机制?

一个线程(a线程)修改了一个对象,另一个线程(b线程)感知到了变化,然后进行相应操作。

12.2、等待/通知机制有什么好处?

这种模式开始于一个线程,最终执行是另一个线程。这是模式隔离了『何时做(When)』和『怎么做(How)』,功能层面实现了解耦,体系结构层面具备良好的伸缩性。

12.3、如何实现等待/通知机制?

方法1
//B线程
while (value =! desired) {
    Thread.sleep(1000);
}

缺点:

  1. 无法保证响应及时性
  2. 性能消耗比较高
方法2

采用wait()和notify()来解决该问题:

  1. b线程调用对象o.wait()进入等待状态,
  2. a线程调用对象o.notify()来唤醒b线程,
  3. b线程醒来进行下面的操作。
方法名 描述
notify() 通知一个在对象上等待的线程,使其从wait()返回,返回的前提是获得了对象的锁
notifyAll() 通知所有在对象上等待的线程
wait() 调用该方法的线程进入WAITING状态,接收到其他线程通知才会返回。
调用wait()后会释放对象的锁。
wait(long) 超时等待一段时间,被通知或者超时返回。 单位毫秒。
wait(long, int) 超时等待。 单位可以指定。
public class Test {
    private static Object lock = new Object();
    private static boolean isDesired = false;

    public static void main(String[] args) {
        new Thread(new Wait(), "Wait").start();
        new Thread(new Notify(), "Notify").start();
    }
    static class Wait implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                while (!isDesired) {
                    try {
                        System.out.println(Thread.currentThread() + " is waiting");
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            System.out.println(Thread.currentThread() + " start doing following things");
        }
    }
    static class Notify implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                System.out.println(Thread.currentThread() + " make isDesired=true");
                isDesired = true;
                lock.notifyAll();
            }
        }
    }
}

输出如下:

Thread[Wait,5,main] is waiting
Thread[Notify,5,main] make isDesired=true
Thread[Wait,5,main] start doing following things

注意:

  • 调用wait()\notify()\notifyAll()需要首先对对象加锁
  • 调用wait()后,线程状态由 RUNNING 变为 WAITING,并把当前线程加入等待队列。
  • 调用notify()和notifyAll()后,
    • 线程不会直接返回,而是需要获得锁之后才返回。
    • 从等待队列移动到同步队列。
    • 状态由WAITING变为BLOCKING。
  • notify()和notifyAll()区别:前者把一个线程从等待队列移动到同步队列,后者将所有线程从等待队列移动到同步队列。

等待/通知 经典范式:

  • 等待方

1)获取对象的锁。
2)如果条件不满足,调用对象的wait(),被通知后仍要检查条件。
3)条件满足则执行相应逻辑。
对应伪代码如下:

synchronized(对象) {
    while (条件不满足) {
        对象.wait();
    }
}
  • 通知方

1)获得对象的锁。
2)改变条件。
3)通知所有等待在对象上的线程。
对应伪代码如下:

synchronized(对象) {
    改变条件();
    对象.notifyAll();
}

13、如何实现线程间的 管道输入/输出流?

通过PipedInputStream/PipedOutputStream/PipedReader/PipedWriter实现线程之间的数据传输,传输的媒介为内存。

public class Test {
    public static void main(String[] args) throws IOException {
        //创建管道并绑定
        PipedReader pipedReader = new PipedReader();
        PipedWriter pipedWriter = new PipedWriter();
        pipedWriter.connect(pipedReader);

        //创建输入输出线程
        new Thread(new PrintTask(pipedReader)).start();
        new Thread(new ReadTask(pipedWriter)).start();
    }
}
class ReadTask implements Runnable {
    private PipedWriter pipedWriter;
    public ReadTask(PipedWriter pipedWriter) {
        this.pipedWriter = pipedWriter;
    }
    @Override
    public void run() {
        int receive = 0;
        try {
            while ((receive = System.in.read()) != -1) {
                pipedWriter.write(receive);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                pipedWriter.close();
            } catch (IOException e) {
            }
        }
    }
}
class PrintTask implements Runnable {
    private PipedReader pipedReader;
    public PrintTask(PipedReader pipedReader) {
        this.pipedReader = pipedReader;
    }
    @Override
    public void run() {
        int receive = 0;
        try {
            while ((receive = pipedReader.read()) != -1) {
                System.out.print((char) receive);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                pipedReader.close();
            } catch (IOException e) {
            }
        }
    }
}

14、Thread.join()有什么用?

如果线程A执行了thread.join()方法,代表thread线程终止之后才从thread.join()返回。

Thread.join() 的源码如下(进行部分调整)

    public final synchronized void join()
    throws InterruptedException {

        while (isAlive()) {
            wait(0);
        }
        //条件符合 方法返回
    }

线程终止时,会调用线程资深的notifyAll方法,通知所有wait的线程。
join()的实现也是采用 等待/通知 机制。

15、ThreadLocal如何使用?

ThreadLocal是一个为每个线程单独存储结构。

如,以下分析器可以用来统计调用时间。

public class Test {
    public static void main(String[] args) throws IOException, InterruptedException {
        Profiler.begin();
        Thread.sleep(1);
        System.out.println(Profiler.end());
    }
}

class Profiler {
    private static ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<>();
    public static void begin() {
        TIME_THREADLOCAL.set(System.currentTimeMillis());
    }
    public static long end() {
        return System.currentTimeMillis() - TIME_THREADLOCAL.get();
    }
}

这一分析器的好处在于两个方法的调用不用在同一个方法或者同一个类中。

16、如何实现等待超时模式?

16.1、什么是 等待超时机制?

调用一个方法等待一段时间,如果该时间段内得到结果则立即返回,否则超时返回默认结果。

16.2、如何实现 等待超时 机制?

在 等待/通知 做一个小小的改动即可。

问题:

当前有一个方法tryGet(),非阻塞获取一个结果,获取到则返回,获取不到返回null。

  • 问题1. 将该方法改写为阻塞获取,获取不到则阻塞直至获取到。
  • 问题2. 将该方法改写为阻塞获取并等待一段时间,该时间内获取不到则返回null。
问题解决:
  • 问题1. 阻塞获取
    public synchronized Object get() throws InterruptedException {

        Object result = null;
        while (((result = tryGet()) == null)) {
            wait();
        }

        return result;
    }
  • 问题2. 阻塞获取并等待一段时间

    • 等待超时 只需要在 等待/通知 做一个小小的改动即可。
    • 等待变量:remaing = t
    • 超时时间:future = now + remaining
    public synchronized Object get(long t) throws InterruptedException {
        long remaining = t;
        long future = System.currentTimeMillis() + t;

        Object result = null;
        while (((result = tryGet()) == null)
                && remaining > 0) {
            wait(remaining);
            remaining = future - System.currentTimeMillis();
        }

        return result;
    }

16.3、等待超时应用:数据库连接池

class ConnectionPool {
    private List<Connection> pool = new LinkedList<>();

    public ConnectionPool(int initSize) {
        if (initSize <= 0) {
            return;
        }

        for (int i = 0; i <initSize; i++) {
            pool.add(ConnectionDriver.crateConnection());
        }
    }

    public Connection fatchConnection(long timeout) throws InterruptedException {
        synchronized (pool) {
            if (timeout <= 0) {

                //等待通知 机制
                while (pool.isEmpty()) {
                    pool.wait();
                }

                return pool.remove(0);
            } else {

                //等待超时 机制
                long remaining = timeout;
                long future = System.currentTimeMillis() + timeout;

                while (pool.isEmpty() && remaining > 0) {
                    pool.wait(remaining);
                    remaining = future - System.currentTimeMillis();
                }

                Connection result = null;
                if (!pool.isEmpty()) {
                    result = pool.remove(0);
                }
                return result;
            }
        }
    }


    public void releaseConnection(Connection connection) {
        if (connection == null) {
            return;
        }

        synchronized (pool) {
            pool.add(connection);
            pool.notifyAll();
        }
    }
}

虽然客户端采用等待超时获取方式获取Connection失败,但是这种机制保证客户端不糊一致挂在这个获取连接操作上,则是『按时』返回,并告知客户端获取出现问题,是系统的一种自我保护机制。

17、如何使用线程池?

public class TestH_ThreadPool {

    class TestTask implements Runnable {
        AtomicInteger count = new AtomicInteger(0);

        public void run() {
            for (int i = 0; i < 10000; i++) {
                count.incrementAndGet();
            }
            System.out.println(count.get());
        }
    }

    //ThreadPool错误的用法
    //输出:
    //10000
    //10000
    //10000
    @Test
    public void test1() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        executorService.execute(new TestTask());
        executorService.execute(new TestTask());
        executorService.execute(new TestTask());

        Thread.sleep(2000);
    }

    //ThreadPool正确的用法
    //输出:
    //28064
    //29221
    //30000
    @Test
    public void test2() throws InterruptedException {
        Runnable task = new TestTask();

        ExecutorService executorService = Executors.newFixedThreadPool(3);
        executorService.execute(task);
        executorService.execute(task);
        executorService.execute(task);

        Thread.sleep(2000);
    }
}

17、如何实现一个简单的线程池?

17.1、为什么使用线程池

  • 如果为每个任务创建一个线程,会有两个问题
    1. 创建和销毁线程是有开销的,大量创建线程浪费资源。
    2. 线程切换存在上下文切换的开销,线程过多会花费大量实现在上下文切换上。
  • 使用线程池有两个好处:
    1. 节省了上面提到的两个问题的开销。
    2. 面对过量的任务能够平缓劣化。

17.2、如何实现简单的线程池

  • 线程池接口定义
interface ThreadPool<Job extends Runnable> {
    //执行一个线程
    void execute();
    //关闭线程池
    void shutdown();
    //增加工作线程
    void addWorkers(int num);
    //减少工作线程
    void removeWorkers(int num);
    //得到正在等待执行的任务数量
    int getJobSize();
}
  • 实现
class DefaultThreadPool<T extends Runnable> implements ThreadPool<T> {

    private static final int DEFAULT_WORKER_NUM = 5;
    private static final int MAX_WORKER_NUM = 10;
    private static final int MIN_WORKER_NUM = 1;

    private List<T> taskList = new LinkedList<>();
    private List<Worker> workerList = Collections.synchronizedList(new ArrayList<Worker>());
    private int workerNum = DEFAULT_WORKER_NUM;             //线程数目
    private AtomicInteger threadNum = new AtomicInteger();  //线程编号

    public DefaultThreadPool() {
    }

    public DefaultThreadPool(int workerNum) {
        if (workerNum < MIN_WORKER_NUM) {
            workerNum = MIN_WORKER_NUM;
        } else if (workerNum > MAX_WORKER_NUM) {
            workerNum = MAX_WORKER_NUM;
        }
        initializeWorkers(workerNum);
    }

    private void initializeWorkers(int workNum) {
        for (int i = 0; i < workNum; i++) {
            Worker worker = new Worker();
            workerList.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
            thread.start();
        }
    }

    @Override
    public void execute(T task) {
        if (task == null) {
            return;
        }
        synchronized (taskList) {
            taskList.add(task);
            workerList.notify();
        }
    }

    @Override
    public void shutdown() {
        for (Worker worker : workerList) {
            worker.shutdown();
        }
    }

    @Override
    public void addWorkers(int num) {
        if (num + workerNum > MAX_WORKER_NUM) {
            num = num + workerNum - MAX_WORKER_NUM;
        }
        initializeWorkers(num);
        workerNum += num;
    }

    @Override
    public void removeWorkers(int num) {
        if (num > workerNum) {
            throw new IllegalArgumentException("blow worker num!");
        }

        for (int i = 0; i < num; i++) {
            Worker worker = workerList.remove(0);
            worker.shutdown();
        }
        workerNum -= num;
    }

    @Override
    public int getJobSize() {
        return taskList.size();
    }

    class Worker implements Runnable {
        private volatile boolean running = true;
        @Override
        public void run() {
            while (running) {
                synchronized (taskList) {
                    while (taskList.isEmpty()) {
                        try {
                            taskList.wait();
                        } catch (InterruptedException e) {
                            //感知到外部对Worker的中断操作,则返回
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    T task = taskList.remove(0);
                    if (task == null) {
                        continue;
                    }
                    try {
                        task.run();
                    } catch (Exception e) {
                        //忽略执行中抛出的Exception
                    }
                }
            }
        }

        public void shutdown() {
            running = false;
        }
    }
}

线程池的本质:
使用一个线程安全的工作队列连接工作线程和客户端线程,客户端线程将任务放入工作队列后便返回,而工作线程则不断从工作队列中取出工作并执行。

18、如何用线程池实现简单的Web服务器?

public class Test {
    public static void main(String[] args) {
        SimpleHttpServer simpleHttpServer = new SimpleHttpServer();
        simpleHttpServer.start();
    }
}


class SimpleHttpServer {
    private ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<>(20);
    private int port = 8080;
    private static String basePath = "";

    public void setPort(int port) {
        this.port = port;
    }

    public void setBasePath(String basePath) {
        this.basePath = basePath;
    }

    public void start() {

        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(port);
            Socket socket;

            while ((socket = serverSocket.accept()) != null) {
                threadPool.execute(new HttpRequestHandler(socket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    static class HttpRequestHandler implements Runnable {

        private Socket socket;

        public HttpRequestHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {

            InputStream inputStreamOfSocket = null;
            Reader readerOfSocket = null;
            BufferedReader bufferedReaderOfSocket = null;
            OutputStream outputStreamOfSocket = null;
            PrintWriter printWriterOfSocket = null;
            FileInputStream fileInputStreamOfFile = null;
            FileReader fileReaderOfFile = null;
            BufferedReader bufferedReaderOfFile = null;
            try {
                inputStreamOfSocket = socket.getInputStream();
                readerOfSocket = new InputStreamReader(inputStreamOfSocket);
                bufferedReaderOfSocket = new BufferedReader(readerOfSocket);

                outputStreamOfSocket = socket.getOutputStream();
                printWriterOfSocket = new PrintWriter(outputStreamOfSocket);

                String header = bufferedReaderOfSocket.readLine();
                String[] headerArr = header.split(" ");
                String relativePath = headerArr.length > 1 ? headerArr[1] : "";
                String filePath = basePath + relativePath;


                if (filePath.endsWith(".jpg") || filePath.endsWith(".ico")) {

                    fileInputStreamOfFile = new FileInputStream(filePath);
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

                    int i = 0;
                    while ((i = fileInputStreamOfFile.read()) != -1) {
                        byteArrayOutputStream.write(i);
                    }
                    byte[] bytes = byteArrayOutputStream.toByteArray();
                    printWriterOfSocket.println("HTTP/1.1 200 OK");
                    printWriterOfSocket.println("Server: Molly");
                    printWriterOfSocket.println("Content-Type: image/jpeg");
                    printWriterOfSocket.println("Content-Length: " + bytes.length);
                    printWriterOfSocket.println("");
                    outputStreamOfSocket.write(bytes, 0, bytes.length);

                } else {

                    fileReaderOfFile = new FileReader(filePath);
                    bufferedReaderOfFile = new BufferedReader(fileReaderOfFile);

                    String line = null;
                    printWriterOfSocket.println("HTTP/1.1 200 OK");
                    printWriterOfSocket.println("Server: Molly");
                    printWriterOfSocket.println("Content-Type: text/html; charset=UTF-8");
                    printWriterOfSocket.println("");
                    while ((line = bufferedReaderOfFile.readLine()) != null) {
                        printWriterOfSocket.println(line);
                    }

                }
                outputStreamOfSocket.flush();

            } catch (Exception e) {
                try {
                    PrintWriter printWriter = new PrintWriter(socket.getOutputStream());
                    printWriter.println("HTTP/1.1 500");
                    printWriter.println("");
                    printWriter.flush();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            } finally {
                close(printWriterOfSocket, bufferedReaderOfSocket, bufferedReaderOfFile);
            }
        }
    }
    private static void close (Closeable... closeables) {
        if (closeables == null) {
            return;
        }

        for (Closeable closeable : closeables) {
            try {
                closeable.close();
            } catch (IOException e) {
            }
        }
    }
}

五、Java中的锁

1、synchronized 和 Lock接口 比较优缺点?

synchronized 和 Lock接口 都提供锁的功能,提供类似的同步功能。

1.1、synchronized

  • 优点:

隐式获取锁,比较便捷

  • 缺点:

相比Lock缺少灵活性

1.2、Lock接口

  • 优点:

获取锁更加灵活。
拥有获取锁和释放锁的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。

比如,针对一个场景,进行锁的获取和释放。

先获得锁A,再获取锁B,
当锁B获得后,释放锁A同时获取锁C,
当锁C获得后,释放锁B同时获取锁D
以此类推。

以上场景synchronized索取不那么容易,而是用Lock却容易许多。

Lock接口提供的 synchronized 所不具别的特性:

特性 描述
非阻塞的获取锁 当前线程尝试获取锁, 如果这一时刻锁未被其他线程获取到,则获取并持有锁。
可被中断的获取锁 获取到锁的线程能响应中断。当获取到锁的线程被中断时,抛出中断异常,释放锁。
超时获取锁 在截止时间之前获取锁。如果截止时间到了仍未获取到锁,则返回。
  • 缺点:

使用不如synchronized便捷。

2、Lock 接口如何使用?

Lock lock = new ReentrantLock();
lock.lock();

try {
    //业务逻辑
} finally {
    lock.unlock();
}

Lock 是一个接口,它定义了锁的获取和释放的基本操作。Lock的API如下所示:

函数定义 描述
void lock() 阻塞的获取锁。当前线程阻塞,直到获取锁。
void lockInterruptibly() throws InterruptedException 可中断的获取锁。当lock()的区别在于该方法会响应中断。
boolean tryLock() 尝试非阻塞的获取锁,调用该方法立即返回。 如果获取成功返回true,否则返回false。
boolean tryLock(long time, TimeUnit unit) throws InterruptedException 超时可中断的获取锁,以下几种情况返回:
①超时时间内获得了锁
②超时时间内被中断
③超时时间结束,返回false
void unlock() 释放锁
Condition newCondition() 获取等待通知组件。
该组件和当前的锁绑定,当前线程只有获得了锁,才能调用waite()方法。
调用后,当前线程将释放锁。
  • 习题:

以下代码输出是什么?
(假设线程名称是 pool-1-thread-1、pool-1-thread-2 …… pool-1-thread-5)

public class TestV {
    @Test
    public void test() {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Task task = new Task();

        for (int i = 0; i < 5; i++) {
            executorService.submit(task);
        }
        executorService.shutdown();
        while (!executorService.isTerminated()) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Task implements Runnable {
    private Lock lock = new ReentrantLock();

    @Override
    public void run() {
        while (true) {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                lock.unlock();
            }
        }
    }
}

答案:
类似下面

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3

每一秒随机选取一个线程打印线程名。

3、如何实现独占锁?

public class Mutex implements Lock {
    private static class Sync extends AbstractQueuedSynchronizer {
        //是否处于占用状态
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        //当状态为0的时候获取锁
        @Override
        public boolean tryAcquire(int acquires) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        //释放锁,将状态设置为0
        @Override
        protected boolean tryRelease(int releases) {
            if (getState() == 0) {
                throw new IllegalMonitorStateException(null);
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        //返回一个Condition,每个Condition都包含了一个condition队列
        Condition newCondition() {
            return new ConditionObject();
        }
    }

    //将操作代理到Sync上
    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

通过 AbstractQueuedSynchronizer来实现(简称AQS)。

  • AQS是构建锁或者其他同步组件的基础框架。
  • AQS的主要使用方式是『继承+模板方法』来使用。
  • AQS使用int变量表示同步状态,通过内置FIFO队列完成线程排队。
  • AQS通过以下3个基础方法 访问或修改 同步状态:
函数定义 说明
getState() 获取当前同步状态
setState(int newState) 设置同步状态
compareAndSetState(int expect, int update) 使用CAS设置状态,可保证设置原子性。
  • AQS可重写的方法如下:
  • 同步器提供的模板方法如下所示:

4、AbstractQueuedSynchronizer 内部如何实现?

3、如何实现独占锁?中从sync.acquire(1);sync.tryAcquire()的流程是什么样的?

4.1、独占锁总体流程

代码实现如下:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

解析:
① 调用tryAcqure()方法获取同步状态
addWaiter()获取失败之后生成一个『链表节点』并加入链表尾部
acquireQueued()进入自旋过程。

4.2、队列介绍

4.3、自旋介绍

  • 进入自旋
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

① 死循环
② 如果前一个节点是head,则尝试获取同步状态
③ 如果获取失败,则进入『等待/通知』模式

  • 退出自旋
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

① 释放同步状态。
② 通过LockSuppor.unpark()通知后继节点。

4.4、共享式同步状态获取与释放

共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
        private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

tryAcquireShared(int)用来获取同步状态,如果返回>0则说明获取到了同步状态
② 如果获取不到则进入自旋状态

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
  • 释放同步状态逻辑和『独占式』逻辑基本一致
  • 主要区别在于:支持多个线程同时访问的并发组件(如Semaphore),需要保证状态安全释放,一般通过CAS实现。

4.5、独占式超时获取同步状态

    private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
        long lastTime = System.nanoTime();
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                if (nanosTimeout <= 0)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                long now = System.nanoTime();
                nanosTimeout -= now - lastTime;
                lastTime = now;
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  • 主要流程和独占式基本类似
  • 主要区别:在于未获取到同步状态时,再等待nanosTimeout纳秒返回。

5、LockSupport的用途?

LockSupport定义了一组以park开头的方法用来阻塞当前线程,一组unpark开头的方法用来唤醒线程。

函数定义 描述
void park() 阻塞当前线程。 调用unpark(Thread thread)或者中断当前线程才能返回。
void parkNanos() 在park()基础上增加了超时返回。
parkUtil(long deadLine) 阻塞当前线程,直到deadLine时间(1970到deadLine毫秒数)
void unpark(Thread thread) 唤醒阻塞的thread

6、LockSupport的park/unpark与 object.wait/notify的区别?

主要有以下不同:1

  1. 面向的主体不一样。
    LockSuport主要是针对Thread进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒。Object.wait()是以对象为纬度,阻塞当前的线程和唤醒单个(随机)或者所有线程。

  2. 实现机制不同。
    虽然LockSuport可以指定monitor的object对象,但和object.wait(),两者的阻塞队列并不交叉。object.notifyAll()不能唤醒LockSupport的阻塞Thread.

7、如何实现TwinsLock?

public class TwinsLock implements Lock {

    private static final class Sync extends AbstractQueuedSynchronizer {
        public Sync(int count) {

            if (count <= 0) {
                throw new IllegalArgumentException("count must large than 0");
            }
            setState(count);
        }

        public int tryAcquireShared(int reduceCount) {
            for (;;) {
                int current = getState();
                int newCount = current - reduceCount;
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }
        }

        public boolean tryReleaseShared(int returnCount) {
            for (;;) {
                int current = getState();
                int newCount = current + returnCount;

                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }

        }
    }


    private final Sync sync = new Sync(2);

    @Override
    public void lock() {
        sync.acquireShared(1);
    }


    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

8、什么是『重入锁』?如何实现『重入锁』?

8.1、重入锁

  • 支持重进入的锁,表示能够一个线程对资源的重复加锁。
  • synchronized关键字隐式支持冲进入。

8.2、如何实现

重入锁需要解决以下两个问题:

  • 再次获取锁时识别是否为当前线程占据锁。
  • 线程重复获取n次锁,又重复释放n次锁,其它线程才能获取到锁。

通过自定义同步器实现,

同步器的tryAcqire(int acuires)方法如下:

final boolean noFairAcquire(int acquires) {
    find Thread thread = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if compareAndSet(0, acquires) {
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nexc);
    }
    return false;
}

同步器的tryRelease(int releases)方法如下:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw IllegalMonitorStateException();
    
    boolean free = false
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return false;
}

9、『公平锁』和『非公平锁』区别?如何实现?

9.1 『公平锁』和『非公平锁』区别?

如果一个锁是公平的,那么锁的获取顺序应该和请求锁的顺序一致,先来先得。

9.2、公平锁如何实现

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

和上面相比多了!hasQueuedPredecessors()的判断。

9.3、公平锁和非公平锁比较

非公平锁相比公平锁,

  • 优点:
    吞吐量更大。因此同样的工作前提下,非公平锁的线程切换次数更小。

  • 缺点
    会造成线程的饥饿。

10、『读写锁』如何使用?如何实现『读写锁』?

10.1、『读写锁』使用

class Cache {
    private Map<String, Object> map = new HashMap();
    private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();

    private Lock r = reentrantReadWriteLock.readLock();
    private Lock w = reentrantReadWriteLock.writeLock();

    public Object get(String key) {
        r.lock();
        try {
            return map.get(key);
        } finally {
            r.unlock();
        }
    }
    public void put(String key, Object value) {
        w.lock();
        try {
            map.put(key, value);
        } finally {
            w.unlock();
        }
    }
    public void clear() {
        w.lock();
        try {
            map.clear();
        } finally {
            w.unlock();
        }
    }
}
方法名称 描述
int getReadLockCount() 返回当前读锁被获取的次数。该次数不等于获取读锁的线程数,比如:仅一个线程,它连续获取(重进入)了n次读锁,那么占据读锁的线程数是1,但该方法返回n
int getReadHoldCount() 返回当前线程获取读锁的次数。该方法在Java 6 中加入到ReentrantReadWriteLock中,使用ThreadLocal保存当前线程获取的次数,这也使得Java 6 的实现变得更加复杂
boolean isWriteLocked() 判断写锁是否被获取
int getWriteHoldCount() 返回当前写锁被获取的次数

10.2 如何实现『读写锁』

因为『读写锁』的自定义同步器需要,用1个int维护多个读线程和一个写线程,
所以就一定要『按位切割使用』。

  • 高16位表示读,低16位表示写
  • 划分方式如下
写锁的获取与释放

写锁是一个支持重入的排它锁。

        protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

相比可重入锁,添加了对读锁的判断。如果存在读锁则无法获取写锁。

释放流程和可重入锁基本一致。

读锁的获取与释放

读锁是一个支持重入的共享锁。

protected final int tryAcquireShared(int unused) {
    for (;;) {
        int c = getState();
        int netc = c + (1 << 16);
        int (nextc < c) 
            throw new Error("Maximum lock count exceeded");
        if (exclusiveCount(c) != 0 && owner !- Thread.currentThread())
            return -1;
        if (compareAndSetState(c, nextc))
            return 1;
    }
}

如果其他线程获取了写锁,则获取失败。
如果写锁未获取 或者 当前线程获取了锁 ,则成功获取读锁。

11、除了 wait()+notify()+synchronize(),如何实现『等待/通知』模式?

wait()+notify()+synchronize()可以实现『等待/通知』模式,
除此之外Condition+Lock配合也可以实现『等待/通知』模式。

  • 简单使用如下
class ConditionUseCase {
    private Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    public void conditionWait() throws InterruptedException {
        lock.lock();
        try {
            condition.await();
        } finally {
            lock.unlock();
        }
    }
    public void conditionSignal() {
        lock.lock();
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
}
  • 实现BlockingQueue
class BoundedQueue<T> {

    private Object[] items;
    private int addIndex, removeIndex, count;
    private Lock lock = new ReentrantLock();

    private Condition notEmpty = lock.newCondition();
    private Condition notFull = lock.newCondition();

    public BoundedQueue(int size) {
        items = new Object[size];
    }

    public void add(T t) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) {
                notFull.await();
            }
            items[addIndex] = t;
            if (++addIndex == items.length) {
                addIndex = 0;
            }
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public T remove(T t) throws InterruptedException {
        lock.lock();
        try {
            while (0 == count) {
                notEmpty.await();
            }
            Object x = items[removeIndex];
            if (++removeIndex == items.length) {
                removeIndex = 0;
            }
            --count;
            notFull.signal();
            return (T) x;
        } finally {
            lock.unlock();
        }
    }
}

12、Condition内部如何实现?

12.1、等待队列

  • 一个Condition对象包含一个等待队列
  • 调用Condition.awaite()会在队列尾部中追加节点
  • 调用Condition.signal()把队列头部节点移除

  • 一个同步器包含一个同步队列、多个等待队列
  • 不同于Object的监视器模式拥有一个同步队列和一个等待队列

12.2 等待

  • 调用await()相当于同步队列的首节点移动到等待队列的尾节点
  • 调用awaite()和从await()返回的前提都是需要获取锁
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            long savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
  • 构造节点加入等待队列。
  • 释放同步状态
  • 进入等待队列

12.3、通知

  • 调用signal()方法会把等待队列的首节点移动到同步节点尾部。
  • 调用signal()方法的前提是需要获取到锁。
  • 调用signalAll()会把等待队列所有节点移动到同步队列尾部。
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
  • 检查当前线程是否获取了锁
  • 唤醒首节点