分类 Troubleshooting 相关 下的文章

有依赖的多层子任务共享固定数值的线程池导致业务卡死的问题

最近遇到多次有依赖的多层子任务共享固定数值的线程池导致业务卡死的问题, 这里简单描述一下这个场景.

如下图所示, 这里有一个请求进来, 它被拆分成 3 个子任务(task0, task1, task2) 分别去执行, 这 3 个子任务都是由一个包含 3 个线程的线程池 Pool_A 去执行. Tomcat 主线程分发完任务之后, 就等待 3 个子任务的完成结果.

同时, 这 3 个子任务由分别每个都派发出 3 个新任务, 所以我们看到 9 个 level 2 层的 sub-task. 同时这 9 个 sub-task 也是交由线程池 Pool_A 去执行.

一种可能产生的如下图的结果是:

  1. 当主线程产生 3 个子任务之后, task0, 和 task1 分别获得一个线程来执行, 并且产生了二层的 6 个子任务, 分别是 task0-0, task0-1, task0-2, task1-0, task1-1, task1-2.
  2. 这时共有 9 个任务, 其中一层任务 task0, task1 分别在执行, 并且在等他们的下层任务的结果, task3 没有被执行(我们假设任务并不是 FIFO 被执行).
  3. 第三个线程从已有的 7 个任务中先后选取了 task0-0, task0-1, task1-0, task1-2 去执行.
  4. 接着第三个线程选取了 第一层任务 task2, 然后接着也产生 3 个子任务, 分别是 task2-0, task2-1, task2-2. 然后它也 block 在了等它的 3 个二层子任务完成.
  5. 到此, 所有线程池 Pool_A 里面的线程全部被用光, 剩下的子任务没有线程在去执行,大家都变成了死等.

threadShares.png

Java thread dump 里面的 re-lock 行

网上有些web工具, 可以分析 Java thread dump, 通常是根据栈的情况作出火焰图, 指出死锁, 根据线程优先级, 线程组的名字做分类. 我在公司内部也做了一个这么一个工具. 基本思路是通过正则表达式匹配各种行, 然后重新组装成栈的模型. 然后做分析报告.
最近 2 天有人抱怨说, 我写的这个工具在分析他们的 thread dump 的时候, 总是报 5xx 内部错误. 于是, 根据提供的 thread dump 去分析原因. 原来这个工具在分析下面一个线程栈的时候, 出错了:

"Finalizer" #3 daemon prio=8 os_prio=0 cpu=4.41ms elapsed=1059.54s tid=0x00007f072ce76000 nid=0x151d in Object.wait()  [0x00007f0704976000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(java.base@11.0.11/Native Method)
    - waiting on <0x00000006c0001478> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(java.base@11.0.11/ReferenceQueue.java:155)
    - waiting to re-lock in wait() <0x00000006c0001478> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(java.base@11.0.11/ReferenceQueue.java:176)
    at java.lang.ref.Finalizer$FinalizerThread.run(java.base@11.0.11/Finalizer.java:170)

   Locked ownable synchronizers:
    - None

具体的出错在识别下面这行的处理中:

- waiting to re-lock in wait() <0x00000006c0001478> (a java.lang.ref.ReferenceQueue$Lock)

为什么处理这行会出错呢?

根据这个栈的信息, 我们可以看到, 栈顶说正在等待这个锁: 0x00000006c0001478, 而下面一行又说 在 wait 里面要 relock 这个锁. 并且是在 remove 方法里面. 这里到底发生了什么事情呢?

重新认识一下最基础的 java.lang.Object 的 wait(), wait(timeout), notify(), notifyAll() 方法

wait 方法表示当前线程主动让出 cpu, 等待其它线程通过 notify()/notifyAll() 方法唤醒, 或者被 inerrupt 唤醒, 或者设置 timeout等着超时唤醒
notify/notifyAll 方法告诉之前让出 CPU 进入 wait 的线程, 你可以醒了.
那么 wait 和 notify 机制的载体是什么呢? 就是 wait 方法和 notify 方法所在的实体对象(object/instance). 更具体一点就是这个实体对象的锁(Monitor).
所以要调用一个实体对象的 wait 方法, 你必须先获得这个实现对象的锁, 否则虽然编译通过, 运行时也会报错. 比如:

try {
  lockObj.wait();
} catch (InterruptedException e) {
  e.printStackTrace();
}

运行时出错:

Exception in thread "main" java.lang.IllegalMonitorStateException
  at java.base/java.lang.Object.wait(Native Method)
  at java.base/java.lang.Object.wait(Object.java:328)

正确的做法是这样:

synchronized (lock) {//获得锁, 如果不能获得, 进入等待队列
  try {
    lock.wait();//释放锁, 进入 sleep 队列, 被 notify 之后, 重新尝试去获得锁
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

必须在另外一个线程 notify 上面的等待线程, 否则上面的线程将死等在这. 除非设置timeout 或者被 interrupt. 唤醒代码(这里的 lock 对象就是上面的同一个 lock 对象):

lock.notify(); // 或者 lock.notifyAll()

关于这里的 2 个队列

这里涉及 2 个队列: 1) 该锁对象的等待队列(wait queue); 2) 该锁对象的睡眠队列 (sleep queue);
当一个线程尝试获得某个锁, 而不能获得的时候, 就会进入一个这个锁的等待队列(逻辑上,实际代码实现不一定是队列). 每当等待队列里面的线程获得 CPU 时间片的时候, 它就尝试再去获得一次, 直到获得锁.
当一个线程执行 wait 方法的时候, 它肯定已经获得了这个锁. 虽然获得了这个锁, 可能业务上某个其它条件不满足, 必须等在这里, 而又不能阻碍其它线程获得之前这个锁, 所以, 它进入 wait 方法之后的第一件事就是释放之前已经获得这个锁, 同时当前线程进入一个逻辑上的这个锁的睡眠队列(sleep)队列, 之后这个线程不会被分配 CPU 时间, 直到有 1)有其他线程调用这个锁的 notify/notifyAll 方法, 唤醒它 2) 被 interrupt, 3) 设置了 timeout, 到了超时时间. 当这个线程被唤醒之后, 它首先尝试去获得之前释放的锁, 如果立马获得, 则退出 wait 方法, 继续执行. 否则进入这个锁的等待队列, 在里面排队, 直到再次获得这个锁, 然后从 wait 方法退出, 继续执行.

关于重新获得锁 re-lock

根据上面的描述, 当一个线程被唤醒(notify/notifyAll, 被 interrupt, 或 timeout 到), 它要再次获得这个锁. 所以在这个JDK 的 task (https://bugs.openjdk.java.net/browse/JDK-8130448) 当中, 有人就想把第一次想获得这个锁和进入 wait 方法之后再次获得锁的这 2 种情况区分开来, 于是在 thread dump 中, 就有了 re-lock 这行.

为什么我们遇到的栈里面的 re-lock 不对?

我们可以找出我们看到的情况的源代码(java.lang.ref.ReferenceQueue.remove(long)方法):

    public Reference<? extends T> remove(long timeout)
        throws IllegalArgumentException, InterruptedException
    {
        if (timeout < 0) {
            throw new IllegalArgumentException("Negative timeout value");
        }
        synchronized (lock) {
            Reference<? extends T> r = reallyPoll();
            if (r != null) return r;
            long start = (timeout == 0) ? 0 : System.nanoTime();
            for (;;) {
                lock.wait(timeout);
                r = reallyPoll();
                if (r != null) return r;
                if (timeout != 0) {
                    long end = System.nanoTime();
                    timeout -= (end - start) / 1000_000;
                    if (timeout <= 0) return null;
                    start = end;
                }
            }
        }
    }

从上面的代码可以看出, 该线程已经在 synchronized (lock) { 这行获得了锁, 可是我们从 thread dump 里面读出的是: "waiting to re-lock in wait", 这明显是与事实不符的. 这行应该是获得了锁, 虽然之后释放了.
同时从 lock.wait(timeout) 这行, 我们可以知道, 等到 timeout 或者被唤醒, 这个线程应该要重新获得锁, 所以这里标记为 re-lock 比较合适.
根据这个逻辑, 我们确实找到了有人早就上报了这个问题, 并且已经修复: https://bugs.openjdk.java.net/browse/JDK-8150689. 不过从这个 bug 给出的信息看, 这个 bug 由于是低优先级, 要到 JDK 12 里面才有 fix. 所以我们公司大部分 JVM 都无法有这个修复.

关于 HandshakeCompletedNotify-Thread 线程

最近诊断一个线程泄漏问题的时候, 发现大量的 HandshakeCompletedNotify-Thread 线程, 观察它的代码栈, 发现源自于 JDK 代码 sun.security.ssl.TransportContext.finishHandshake.

每次 SSL 连接建立都创建一个新的线程去做通知, 显然是有性能问题. 可是这个问题尚没有修复, 看上去为了兼容老的 API没人想修复: https://bugs.openjdk.java.net/browse/JDK-8246039

所以尽量连接重用, 减少性能开销.

创建线程栈:

HandshakeCompletedNotify-Thread
java.lang.Thread.start(Thread.java)
sun.security.ssl.TransportContext.finishHandshake(TransportContext.java:620)
sun.security.ssl.Finished$T12FinishedConsumer.onConsumeFinished(Finished.java:546)
sun.security.ssl.Finished$T12FinishedConsumer.consume(Finished.java:515)
sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:377)
sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:444)
sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:422)
sun.security.ssl.TransportContext.dispatch(TransportContext.java:182)
sun.security.ssl.SSLTransport.decode(SSLTransport.java:156)
sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1409)
sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1315)
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:439)
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:410)
sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:197)
sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1570)
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498)
sun.net.www.protocol.https.HttpsURLConnectionImpl.getInputStream(HttpsURLConnectionImpl.java:268)

这段代码:

    HandshakeStatus finishHandshake() {
        if (protocolVersion.useTLS13PlusSpec()) {
            outputRecord.tc = this;
            inputRecord.tc = this;
            cipherSuite = handshakeContext.negotiatedCipherSuite;
            inputRecord.readCipher.baseSecret =
                    handshakeContext.baseReadSecret;
            outputRecord.writeCipher.baseSecret =
                    handshakeContext.baseWriteSecret;
        }

        handshakeContext = null;
        outputRecord.handshakeHash.finish();
        inputRecord.finishHandshake();
        outputRecord.finishHandshake();
        isNegotiated = true;

        // Tell folk about handshake completion, but do it in a separate thread.
        if (transport instanceof SSLSocket &&
                sslConfig.handshakeListeners != null &&
                !sslConfig.handshakeListeners.isEmpty()) {
            HandshakeCompletedEvent hce =
                new HandshakeCompletedEvent((SSLSocket)transport, conSession);
            Thread thread = new Thread(
                null,
                new NotifyHandshake(sslConfig.handshakeListeners, hce),
                "HandshakeCompletedNotify-Thread",
                0,
                false);
            thread.start();
        }

        return HandshakeStatus.FINISHED;
    }

btrace 常见的代码

btrace 官方教程 https://github.com/btraceio/btrace/blob/develop/docs/BTraceTutorial.md

通常一个 btrace 代码长这样:

import org.openjdk.btrace.core.annotations.*;
import static org.openjdk.btrace.core.BTraceUtils.*;
 
import org.openjdk.btrace.core.BTraceUtils.Strings;
 
@BTrace
public class ClosedByInterruptExceptionTracer {
 
    @OnMethod( clazz="/java\\.nio\\.channels\\.ClosedByInterruptException/", method="<init>" )
    public static void createException() {
        println(Strings.strcat("current thread: ", name(currentThread())));
        println(jstackStr());
    }

    @OnMethod( clazz="/com\\.tianxiaohui\\.Example/", method="/methodA/" )
    public static void createException(@ProbeClassName String probeClass, @ProbeMethodName String probeMethod, Object param1) {
        println(Strings.strcat("praram1 is: ", BTraceUtils.classOf(param1)));
        BTraceUtils.printFields(param1);
    }
}

其它常用的代码:

  1. 获得当前线程

    println(Strings.strcat("current thread name: ", name(currentThread())));
    println(str(currentThread()));
  2. 打印某个对象的自身属性

    str(obj);
  3. 打印返回值

    @BTrace
    public class CaptureReturn {
    
    @OnMethod(clazz = "/com\\.tianxiaohui\\.ProcessToken/", method = "/validate/", location=@Location(Kind.RETURN) )
    public static void capture(@Return Object result) {
        BTraceUtils.printFields(result);
        println(jstackStr());
    }
    }
  4. 打印当前时间

    println("current time " + BTraceUtils.timestamp("yyyy-MM-dd' 'HH:mm:ss"));
  5. 对字符串操作可以用 BTraceUtils.Strings 的方法, 比如

    if (Strings.startsWith(fileName, "MIME")) {
  6. 创建很多 class 造成 metaspace 满 https://github.com/btraceio/btrace/blob/4f9f1c446da2105cd7e83dba63e26761a629103e/btrace-dist/src/main/resources/samples/Classload.java
  7. 创建新线程: https://github.com/btraceio/btrace/blob/4f9f1c446da2105cd7e83dba63e26761a629103e/btrace-dist/src/main/resources/samples/ThreadStart.java
  8. 更多例子: https://github.com/btraceio/btrace/tree/4f9f1c446da2105cd7e83dba63e26761a629103e/btrace-dist/src/main/resources/samples
  9. 另外一个例子:

    import org.openjdk.btrace.core.annotations.*;
    import static org.openjdk.btrace.core.BTraceUtils.*;
    
    import org.openjdk.btrace.core.BTraceUtils.Strings;
    
    @BTrace
    public class ThreadInterruptTrace {
    
    @OnMethod( clazz="/java\\.nio\\.channels\\.ClosedByInterruptException/", method="<init>" )
    public static void createException() {
      println(Strings.strcat(name(currentThread()), " --- get ClosedByInterruptException "));
      println(jstackStr());
    }
    
    @OnMethod( clazz="/java\\.lang\\.Thread1/", method="/interrupt/" )
    public static void t1r(@ProbeClassName String probeClass, @Self Thread self) {
      if (Strings.startsWith(name(self), "RxIoScheduler")) {
          println(Strings.strcat(Strings.strcat(name(currentThread()), " try to interrupt ->  "), str(self)));
          println(jstackStr());
      }
    }
    }
  10. 打印方法执行时间例子

    import org.openjdk.btrace.core.annotations.*;
    import static org.openjdk.btrace.core.BTraceUtils.*;
    
    import org.openjdk.btrace.core.BTraceUtils.Strings;
    
    @BTrace
    public class ExecDuration {
     
    @OnMethod(clazz = "/org\\.monstor\\.client\\.impl\\.CallControllerBase/", method = "/execute/", location = @Location(Kind.RETURN))
    public static void executeDuration(@Duration long duration) {
      if (duration / 1000000 > 100) {
          println("CallControllerBase.execute:" + duration);
      }
    }
    
    @OnMethod(clazz = "/org\\.monstor\\.client\\.impl\\.MonstorService\\$ServiceCallImpl/", method = "/call/", location = @Location(Kind.RETURN))
    public static void callDuration(@Duration long duration) {
      if (duration / 1000000 > 100) {
          println("ServiceCallImpl.call:" + duration);
      }
    }
    }

记录一次tcp连接持续断掉的排查过程

有人上报说他们的 app 连接另外一个微服务的时候, 连接经常断掉, 不知道什么原因.

查看网络方面的监控, 网络一切安好.

查看日志, 看到一些异常信息: java.nio.channels.ClosedByInterruptException. 顺着这个信息, 我们找到了根本原因.

  1. 既然是 java.nio.channels.ClosedByInterruptException, 那么我们可以用 btrace 查看为什么这个异常被创建, 得到结果如下:

    java.nio.channels.ClosedByInterruptException.<init>(ClosedByInterruptException.java:51)
    java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
    sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:477)
    com.tianxiaohui.java.BufferedWriteChannel.flush(BufferedWriteChannel.java:281)
    com.tianxiaohui.java.BufferedWriteChannel.write(BufferedWriteChannel.java:223)
    com.tianxiaohui.java.BaseClient.sendBytesActual(BaseClient.java:339)
    com.tianxiaohui.java.BaseClient.sendBytesOverChannel(BaseClient.java:203)
    ...
    rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    rx.Observable.unsafeSubscribe(Observable.java:10327) 
    rx.internal.operators.OperatorSubscribeOn$SubscribeOnSubscriber.call(OperatorSubscribeOn.java:100)
    rx.xinternal.schedulers.CachedThreadScheduler$EventLoopWorker$1.call(CachedThreadScheduler.java:230)
    rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    ...
     java.lang.Thread.run(Thread.java:748)
  2. 既然是 interrupt 异常, 那么就一定有其它线程给这个线程发 interrupt, 于是通过下面的 btrace 代码, 我们就能找到发 interrupt 的线程

    import org.openjdk.btrace.core.annotations.*;
    import static org.openjdk.btrace.core.BTraceUtils.*;
    
    import org.openjdk.btrace.core.BTraceUtils.Strings;
    
    @BTrace
    public class ThreadInterruptTrace {
    
     @OnMethod( clazz="/java\\.nio\\.channels\\.ClosedByInterruptException/", method="<init>" )
     public static void createException() {
       println(Strings.strcat(name(currentThread()), " --- get ClosedByInterruptException "));
       println(jstackStr());
     }
    
     @OnMethod( clazz="/java\\.lang\\.Thread1/", method="/interrupt/" )
     public static void t1r(@ProbeClassName String probeClass, @Self Thread self) {
       if (Strings.startsWith(name(self), "RxIoScheduler")) {
           println(Strings.strcat(Strings.strcat(name(currentThread()), " try to interrupt ->  "), str(self)));
             println(jstackStr());
         }
     }
    }
  3. 找到发 interrupt 的线程, 那么就去看代码, 找了发生 interrupt 的原因, 最终原因是下面的代码. 每次 create 一个 Observable 都要新起一个线程池做为 scheduler, 其实不需要. 到时每次新建一个 scheduler 线程池导致创建太多线程池, 消耗大量内存, 之后又自动销毁. 然后这些 scheduler 销毁的时候, 还要通知 I/O 线程(这里的 I/O 线程池用的默认的), 也就是给 I/O 线程发一个 interrupt. 最终导致了我们看到的问题.

    Observable.create(new Observable.OnSubscribe<List<T>>() {
     @Override
     public void call(Subscriber<? super List<T>> subscriber) {
            //some code here
       }
     }).}).observeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.io())
        .timeout(threadTimeout, TimeUnit.MILLISECONDS);