1、线上问题
提醒单手动发送短信发送不了,没有日志定位
核心代码:
2、问题猜想
线上复现发送异常, 查询到对应时间日志, 消息配置为空, 现实是消息配置没有问题
线上发送偶尔有问题, 偶尔正常, pre 则完全正常
查看日志合理怀疑, 异步时, 获取 groupId 异常, F6static 使用了 TransmittableThreadLocal 获取
按正常道理来说, 使用 TransmittableThreadLocal 应该不存在这个问题, 查看 TransmittableThreadLocal 文档, 使用线程池对象需要使用 TtlExecutors.getTtlExecutorService 修饰
怀疑 @Async 注解, 有默认线程池对象, 当时写代码, 没有添加 async 单独使用的线程池, 所以更没有将 异步线程池使用 TtlExecutors 修饰
3、验证猜想 3.1、验证线程池修饰问题 3.1.1、未修饰程池, 导致 ttl 获取上下文异常
添加测试文件, 再两个主线程内, 添加普通线程池异步, 线程池对象没有使用 TtlExecutors.getTtlExecutorService 修饰
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 public class NormalExecutorsTest { private static ExecutorService executorService = Executors.newFixedThreadPool(2 ); private static ThreadLocal tl = new TransmittableThreadLocal <>(); public static void main (String[] args) { new Thread (() -> { String mainThreadName = "main_01" ; tl.set(1 ); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); sleep(1L ); tl.set(2 ); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); System.out.println(String.format("线程名称-%s, 变量值=%s" , Thread.currentThread().getName(), tl.get())); }).start(); new Thread (() -> { String mainThreadName = "main_02" ; tl.set(3 ); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); sleep(1L ); tl.set(4 ); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); System.out.println(String.format("线程名称-%s, 变量值=%s" , Thread.currentThread().getName(), tl.get())); }).start(); } private static void sleep (long time) { try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } }
从上述测试发现, 两个主线程里都使用线程池异步,而且值在主线程里还发生过改变,测试结果显示确实获取上下文异常,发生错乱
3.1.2、修饰线程池, 获取上下文信息正常 添加测试文件, 区别只有线程池使用了 TtlExecutors.getTtlExecutorService 修饰
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 public class TtlExecutorsTest { private static ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(2 )); private static ThreadLocal tl = new TransmittableThreadLocal <>(); public static void main (String[] args) { new Thread (() -> { String mainThreadName = "main_01" ; tl.set(1 ); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); sleep(1L ); tl.set(2 ); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); System.out.println(String.format("线程名称-%s, 变量值=%s" , Thread.currentThread().getName(), tl.get())); }).start(); new Thread (() -> { String mainThreadName = "main_02" ; tl.set(3 ); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); sleep(1L ); tl.set(4 ); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); executorService.execute(() -> { sleep(1L ); System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }); System.out.println(String.format("线程名称-%s, 变量值=%s" , Thread.currentThread().getName(), tl.get())); }).start(); } private static void sleep (long time) { try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } }
不难发现,两个主线程里都使用线程池异步,而且值在主线程里还发生过改变,测试结果展示一切正常,由此可以知道 TTL 在使用线程池的情况下,只要使用 TtlExecutors 对应方法修饰线程池, 也可以很好的完成传递,而且不会发生错乱。
3.1.3、普通异步请求不使用线程池, 正常获取上下文信息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 public class NormalAsyncTest { private static ThreadLocal tl = new TransmittableThreadLocal <>(); public static void main (String[] args) { new Thread (() -> { String mainThreadName = "main_01" ; tl.set(1 ); new Thread (() -> { sleep(1L ); System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); new Thread (() -> { sleep(1L ); System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); new Thread (() -> { sleep(1L ); System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); sleep(1L ); tl.set(2 ); new Thread (() -> { sleep(1L ); System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); new Thread (() -> { sleep(1L ); System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); new Thread (() -> { sleep(1L ); System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); System.out.println(String.format("线程名称-%s, 变量值=%s" , Thread.currentThread().getName(), tl.get())); }).start(); new Thread (() -> { String mainThreadName = "main_02" ; tl.set(3 ); new Thread (() -> { sleep(1L ); System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); new Thread (() -> { sleep(1L ); System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); new Thread (() -> { sleep(1L ); System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); sleep(1L ); tl.set(4 ); new Thread (() -> { sleep(1L ); System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); new Thread (() -> { sleep(1L ); System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); new Thread (() -> { sleep(1L ); System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s" , mainThreadName, Thread.currentThread().getName(), tl.get())); }).start(); System.out.println(String.format("线程名称-%s, 变量值=%s" , Thread.currentThread().getName(), tl.get())); }).start(); } private static void sleep (long time) { try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } }
测试结果显示和使用了线程池修饰的测试保持一致, 普通线程异步直接获取 ttl 上下文信息没有问题
3.2、验证 @Async 使用了默认线程池 3.2.1、断点异步方法, 查看线程池相关
很明显可以看到默认线程池配置
4、源码解析 4.1、TransmittableThreadLocal
接入文档显示的很清楚, 使用线程池需要 TtlExecutors 对应修饰
先来看 TTL 里面的几个重要属性及方法
TTL 定义:
1 public class TransmittableThreadLocal extends InheritableThreadLocal
可以看到,TTL 继承了 ITL,意味着 TTL 首先具备 ITL 的功能。
再来看看一个重要属性 holder:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder = new InheritableThreadLocal <Map<TransmittableThreadLocal<?>, ?>>() { @Override protected Map<TransmittableThreadLocal<?>, ?> initialValue() { return new WeakHashMap <TransmittableThreadLocal<?>, Object>(); } @Override protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) { return new WeakHashMap <TransmittableThreadLocal<?>, Object>(parentValue); } };
再来看下 set 和 get:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Override public final void set (T value) { super .set(value); if (null == value) removeValue(); else addValue(); } @Override public final T get () { T value = super .get(); if (null != value) addValue(); return value; } private void removeValue () { holder.get().remove(this ); } private void addValue () { if (!holder.get().containsKey(this )) { holder.get().put(this , null ); } }
TTL 里先了解上述的几个方法及对象,可以看出,单纯的使用 TTL 是达不到支持线程池本地变量的传递的,通过第一部分的例子,可以发现,除了要启用 TTL,还需要通过 TtlExecutors.getTtlExecutorService 包装一下线程池才可以,那么,下面就来看看在程序即将通过线程池异步的时候,TTL 帮我们做了哪些操作(这一部分是 TTL 支持线程池传递的核心部分):
首先打开包装类,看下 execute 方法在执行时做了些什么。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Override public void execute (@Nonnull Runnable command) { executor.execute(TtlRunnable.get(command)); } @Nullable public static TtlRunnable get (@Nullable Runnable runnable) { return get(runnable, false , false ); } @Nullable public static TtlRunnable get (@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) { if (null == runnable) return null ; if (runnable instanceof TtlEnhanced) { if (idempotent) return (TtlRunnable) runnable; else throw new IllegalStateException ("Already TtlRunnable!" ); } return new TtlRunnable (runnable, releaseTtlValueReferenceAfterRun); } private TtlRunnable (@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) { this .capturedRef = new AtomicReference <Object>(capture()); this .runnable = runnable; this .releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun; } @Nonnull public static Object capture () { Map<TransmittableThreadLocal<?>, Object> captured = new HashMap <TransmittableThreadLocal<?>, Object>(); for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) { captured.put(threadLocal, threadLocal.copyValue()); } return captured; } private T copyValue () { return copy(get()); } protected T copy (T parentValue) { return parentValue; }
结合上述代码,大致知道了在线程池异步之前需要做的事情,其实就是把当前父线程里的本地变量取出来,然后赋值给 Rannable 包装类里的 capturedRef 属性,到此为止,下面会发生什么,我们大致上可以猜出来了,接下来大概率会在 run 方法里,将这些捕获到的值赋给子线程的 holder 赋对应的 TTL 值,那么我们继续往下看 Rannable 包装类里的 run 方法是怎么实现的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public void run () { Object captured = capturedRef.get(); if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null )) { throw new IllegalStateException ("TTL value reference is released after run!" ); } Object backup = replay(captured); try { runnable.run(); } finally { restore(backup); } }
根据上述代码,我们看到了 TTL 在异步任务执行前,会先进行赋值操作(就是拿着异步发生时捕获到的父线程的本地变量,赋给自己),当任务执行完,就恢复原生的自己本身的线程变量值。
下面来具体看这俩方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 @Nonnull public static Object replay (@Nonnull Object captured) { @SuppressWarnings("unchecked") Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured; Map<TransmittableThreadLocal<?>, Object> backup = new HashMap <TransmittableThreadLocal<?>, Object>(); for (Iterator<? extends Map .Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next(); TransmittableThreadLocal<?> threadLocal = next.getKey(); backup.put(threadLocal, threadLocal.get()); if (!capturedMap.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } setTtlValuesTo(capturedMap); doExecuteCallback(true ); return backup; } public static void restore (@Nonnull Object backup) { @SuppressWarnings("unchecked") Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup; doExecuteCallback(false ); for (Iterator<? extends Map .Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next(); TransmittableThreadLocal<?> threadLocal = next.getKey(); if (!backupMap.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } setTtlValuesTo(backupMap); } private static void setTtlValuesTo (@Nonnull Map<TransmittableThreadLocal<?>, Object> ttlValues) { for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) { @SuppressWarnings("unchecked") TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey(); threadLocal.set(entry.getValue()); } }
ok,到这里基本上把 TTL 比较核心的代码看完了,下面整理下整个流程,这是官方给出的时序图:
4.1、@Async 注解 这个类的 value 参数简直太友好了:
五处调用的地方,其中四处都是注释。
有效的调用就这一个地方,直接先打上断点再说:
org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor#getExecutorQualifier
发起调用之后,果然跑到了断点这个地方:
顺着断点往下调试,就会来到这个地方:
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor
这个代码结构非常的清晰。
编号为 ① 的地方,是获取对应方法上的 @Async 注解的 value 值。这个值其实就是 bean 名称,如果不为空则从 Spring 容器中获取对应的 bean。
如果 value 是没有值的,也就是我们 Demo 的这种情况,会走到编号为 ② 的地方。
这个地方就是我要找的默认的线程池。
最后,不论是默认的线程池还是 Spring 容器中我们自定义的线程池。
都会以方法为维度,在 map 中维护方法和线程池的映射关系。
也就是编号为 ③ 的这一步,代码中的 executors 就是一个 map:
所以,我要找的东西,就是编号为 ② 的这个地方的逻辑。
这里面主要是一个 defaultExecutor 对象:
最终你会调试到这个地方来:
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#getDefaultExecutor
这个代码就有点意思了,就是从 BeanFactory 里面获取一个默认的线程池相关的 Bean 出来。流程很简单,日志也打印的很清楚,就不赘述了。
5、修改方案 添加异步线程池, 使用 TtlExecutors.getTtlExecutor 修饰, 其余不变
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Component public class AsyncThreadPoolConfiguration implements AsyncConfigurer { @Override public Executor getAsyncExecutor () { ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor (); threadPool.setCorePoolSize(2 ); threadPool.setMaxPoolSize(5 ); threadPool.setQueueCapacity(20 ); threadPool.setKeepAliveSeconds(60 ); threadPool.setThreadNamePrefix("AsyncThread: " ); threadPool.initialize(); return TtlExecutors.getTtlExecutor(threadPool); } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler () { return new SimpleAsyncUncaughtExceptionHandler (); } }
本地验证, 虽然没有指定 value 但是还是进到了 配置的异步线程池中 会被 TtlExecutors 方法修饰, 获取上线文信息应该不会再有问题, 上线添加相关日志
6、总结
上线接口, 需要添加关键性日志, 以方便排查问题, 但是要主要日志数量控制
使用 @Asnyc 注解前需要关注到其余组件的配合使用
使用 ttl 获取上下文信息异常时, 不能简单传参解决问题, 以免后续 dubbo 等其他调用是其他字段信息异常
参考文档 TransmittableThreadLocal github : 传送门
ThreadLocal 系列(三)-TransmittableThreadLocal 的使用及原理解析 : 传送门
别问了,我真的不喜欢这个注解 : 传送门