Eric 发布的文章

多线程情况下不正确的编码引起的内存泄漏

那年冬天, 新上线了一个处理在线消息的应用, 这个应用上线没多久, 就被发现有内存泄漏的问题. 从verbose GC log 来看, 是内存 heap 被用光了. 于是进一步做了一个 heap dump, 发现里面有个 ConcurrentHashMap 占用了大部分 heap 还不释放. 看上去是这个 ConcurrentHashMap 引起的问题.

这个 ConcurrentHashMap 是这么定义的:

Map<Runnable, ScheduledFuture<?>> futures = new ConcurrentHashMap<>();

它的 key 是一个将要执行的 task, value 是一个已经被 scheduled 的 Future 对象. 仔细观察这个 futures 里面的 ScheduledFuture 元素, 发现里面有一大部分全部都是已经被执行过的(通过Future对象的 status 字段可以判断它当前的状态), 不过按照代码的思路, 如果被执行过之后, 它就会被从这个 futures 里面移除, 可是这里却有大量应该被移除, 却没有被移除的 ScheduledFuture 对象.

代码稍微有点复杂, 那个类大概1千行左右代码. 不过这里我们只把跟问题相关的代码描述一下, 另外加了一个main函数做测试, 代码如下:

public class TaskProcessor {
    
    private TaskProcessor(){};
    private static final TaskProcessor instance = new TaskProcessor();
    public static TaskProcessor getInstance() {//单例
        return instance;
    }
    
    private static final ScheduledExecutorService ses = Executors.newScheduledThreadPool(3);
    private static final Map<Runnable, ScheduledFuture<?>> futures = new ConcurrentHashMap<>();
    
    public void submitTask(Runnable task, int latency) {//提交任务, 并且放入 map
        futures.put(task, ses.schedule(new TaskProcessor.Request(task), latency, TimeUnit.MILLISECONDS));
    }
    
    @PreDestroy
    public void shutdown() {//善后
        ses.shutdown();
    }
    
    /**
     *  internal class wrapper a Runnable 封装任务
     */
    private final class Request implements Runnable {
        
        private final Runnable runnable;

        private Request(final Runnable runnable) { this.runnable = runnable;}
        
        public void run() {//真正执行之前先移除
            ScheduledFuture<?> future = futures.remove(runnable);//remove the task from futures queue
            if (null == future) {
                System.out.println("running in a task");
            }
        }
    }
    
    public static void main(String[] args) {//做测试
        TaskProcessor processor = TaskProcessor.getInstance();
        
        for (int i = 0; i < 50000; i++) {//添加5万个任务
            Runnable r = new Runnable() {

                @Override
                public void run() {//做点打印的任务, 防止代码被优化掉
                    System.out.print("-");
                }
            };
            
            processor.submitTask(r, i % 2);//延迟要么0, 要么1
        }
        
        System.out.println(futures.size());//先打印一次队列长度
        
        try {
            Thread.sleep(8000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(futures.size());//8s之后在打印, 正常已经全部执行完
        
        try {
            Thread.sleep(8000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(futures.size());//再8s之后在打印, 正常已经全部执行完
        
        processor.shutdown();
    }
}

这段代码主要描述这么一个事情: 有任务来的时候, 把任务封装到一个 Request 里面, 然后把它 schedule 将来的某个时间去执行. schedule 动作同步返回一个 ScheduledFuture 实例, 为了统计或反映将要执行的task 数量, 把返回的 ScheduledFuture 实例放到了一个 futures 的ConcurrentHashMap 里面. 当真的要执行的时候, 再把它从 futures 这个map 里面移除. 看上去很简单的一个操作, 就是为了算一下在将要执行队列里面的任务, 加了一个这么 futures.

问题也出现在这个 futures 里面. 进一步对 heap 分析, 那些执行过的 ScheduledFuture, 已经都从 scheduler 内部的队列中移除了, 可是没有从这个futures Map 中移除.

仔细看一下这个代码, 如果说 schedule 一个task的时候, 把延迟设置成0或者非常短, 那么这个问题就很有可能出现. 写代码的人期望的顺序是:

ScheduledFuture future = ses.schedule(new TaskProcessor.Request(task), 0, TimeUnit.MILLISECONDS);  //添加将来任务 在schedule task的线程执行
futures.put(task, future);  //放入map, 在schedule task的线程执行
futures.remove(task) //在map中移除, 在scheduler的worker 执行

而实际执行的时间先后顺序可能是:

ScheduledFuture future = ses.schedule(new TaskProcessor.Request(task), 0, TimeUnit.MILLISECONDS);  //1 schedule task的线程执行
futures.remove(task) //3 在scheduler的worker 执行, 可是这里还没有加到Map
futures.put(task, future);  //2 schedule task的线程执行, 加到map

所以, 这个是没有考虑到多线程情况下, 代码同步可能引起的问题.

java volatile 只能保证可见行, 不能操作保证一致性

前段时间, 有个来面试的Java工程师说: 由volatile修饰的变量相当于synchronized修饰的块, 能保证一致性.
这个错误的结论其实很容易被证明是错误的, 以下代码由两个线程对同一个 volatile 变量进行操作, 一个顺序加100000, 另外一个顺序减100000, 为了防止JVM 对于for loop 进行优化, 每一次循环都打印当前的i变量, 结果有时候不是0.

package com.tianxiaohui.art;

public class Main {

public static volatile int count = 0;

public static void main(String[] args) {
    Thread t1 = new Thread(new Runnable(){

        @Override
        public void run() {
            for (int i = 0; i < 100000; i++) {
                System.out.print(i);
                count++;
            }
        }
    });
    
    Thread t2 = new Thread(new Runnable(){

        @Override
        public void run() {
            for (int i = 0; i < 100000; i++) {
                System.out.print(i);
                count--;
            }
        }
    });
    
    t2.start();
    t1.start();
    try {
        t1.join();
        t2.join();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    
    System.out.println();
    System.out.println(count);
}
}

运行结果:
01234567891001234....
1

为什么会出现这种情况?
使用 volatile 修饰的变量在进行写操作的时候, 会发生两件事情:

  1. 当前缓存行数据写回主存, 也有书上说当前线程workspace的数据写回主存;
  2. 写回主存的操作会使其它CPU里缓存该内存地址的缓存无效.
    也就是说一旦有一个CPU里的volatile的变量有写操作, 它会立即写回主内存, 并且使其它CPU的缓存里有个变量的值都无效. 效果类似于对于这个volatile 修饰的变量没有缓存, 全部直接使用内存操作.

上面的代码里面对于 volatile 变量的操作 ++ 其实不是一个原子操作, 尽管它能保证每个CPU 拿到count值时候都是最新的值, 但是对它的加1, 再写回内存并不是原子操作. 举例来说, 某时间点CPU-0 和 CPU-1要对它操作之前同时拿到count的值都是5, 然后CPU-0对5加1, 得到6, CPU-1对5减1, 得到4, 那么赋值回去的时候, CPU-0先写回, 主内存值变成6, 同时导致CPU-1里面的值失效, 不过这个时候CPU-1已经在做加操作,之后CPU-1把加的结果4赋值给count, 写回主内存, 变成4. 更细一点讲是一个+操作包含好几条内存指令, 从内存load 它的值只是最早一步.

volatile变量能保证每个线程读到的数据都是主内存最新的值. 一旦有个写操作, 就会导致其它CPU里面的缓存的该值无效, 对它的read都会重新从主内存读取. 但是对它的操作不能保证一致性. 操作一致性要通过同步块或者CAS来保证.

volatile最适合的使用场景是作为一个开关, 一旦有一个线程对这个开关做了操作, 其它线程立马就感知到.

读懂 thread heap

  1. 第一行是线程基本信息, 分别是 线程名字, 是否是Daemon线程(如果不是就不显示), 线程优先级, 线程id, OS native 线程id, 当前运行的状态[当前在那个对象对象].

  2. 当前线程的状态

  3. 下面就是当前线程的Stack;

    "DefaultThreadPool-89" daemon prio=10 tid=0x00007f3974036000 nid=0xb4a waiting on condition [0x00007f393e7e4000]
    java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for <0x00000007ac99baf8> (a com.ebay.raptor.orchestration.impl.FutureCallableTask)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
    at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:425)
    at java.util.concurrent.FutureTask.get(FutureTask.java:187)

  4. Java 线程有6种状态: New, Runnable, Blocked, Waiting, Timed Waiting, Terminated. 对应到 thread dump 里面: 到现在做的 thread dump 里面没有看到状态是 New的. 也没有看到 Terminated 的. 其它都看到过:
    java.lang.Thread.State: RUNNABLE

java.lang.Thread.State: BLOCKED (on object monitor)

java.lang.Thread.State: TIMED_WAITING (sleeping)
java.lang.Thread.State: TIMED_WAITING (parking)
java.lang.Thread.State: TIMED_WAITING (on object monitor)

java.lang.Thread.State: WAITING (parking)
java.lang.Thread.State: WAITING (on object monitor)

  1. JVM 6种线程定义:

    A thread state. A thread can be in one of the following states:
    NEW
    A thread that has not yet started is in this state.
    RUNNABLE
    A thread executing in the Java virtual machine is in this state.
    BLOCKED
    A thread that is blocked waiting for a monitor lock is in this state.
    WAITING
    A thread that is waiting indefinitely for another thread to perform a particular action is in this state.
    TIMED_WAITING
    A thread that is waiting for another thread to perform an action for up to a specified waiting time is in this state.
    TERMINATED
    A thread that has exited is in this state.
    A thread can be in only one state at a given point in time. These states are virtual machine states which do not reflect any operating system thread states.

  2. 什么情况下会进入 blocked 状态?
    根据 Thread.State 类的描述: 1. 当一个线程在准备进入Synchronized的块/方法的时候, 2. 或者该线程之前已经进入synchronized 块/方法, 之后又call 了 Object.wait, 这个时候, 该线程进入 Waiting状态 或者 timed_waiting 状态, 之后又被 notify 或notifyAll 唤醒, 等待重新进入Synchronized同步块/方法, 这时候又进入blocked 状态.

  3. 什么情况会进入 waiting 状态?
    等待某种事件发生.
    Object.wait with no timeout -> 等待notify 或 notifyAll
    Thread.join with no timeout -> 等待特定线程终止
    LockSupport.park -> 等待 unpark

  4. 什么情况会进入 timed_waiting 状态?
    虽然等待某种特殊事件发生, 不过最多只等待特定时间
    Thread.sleep
    Object.wait with timeout
    Thread.join with timeout
    LockSupport.parkNanos
    LockSupport.parkUntil