引言
在定位项目中内存泄漏问题时,发现RingBuffer占用内存过大,这个是在使用log4j2时引入的jar,以前只知道必须使用这个Disruptor才可以用异步日志,
但是并不清楚Disruptor的一些实现,也没有通过编码的方式使用过,这次正好研究一下。
Disruptor是一个低延迟(low-latency),高吞吐量(high-throughput)的事件发布订阅框架,用于一个JVM中多个线程之间的消息队列,
作用与ArrayBlockingQueue有相似之处,但是disruptor从功能、性能都远好于ArrayBlockingQueue。
相关概念
RingBuffer
应用需要传递的消息在Disruptor中称为Event(事件)。
RingBuffer是Event的数组,实现了阻塞队列的语义:
如果RingBuffer满了,则生产者会阻塞等待。
如果RingBuffer空了,则消费者会阻塞等待。
Sequence
在上文中,我提到“每个消费者需要自己维护一个指针”。这里的指针就是一个单调递增长整数(及其基于CAS的加法、获取操作),称为Sequence。
除了每个消费者需要维护一个指针外,RingBuffer自身也要维护一个全局指针(如上一节第2点所提到的),记录最后一条可以被消费的消息。
高性能的体现
无锁,无锁就没有锁竞争。当生产者、消费者线程数很高时,意义重大。所以,
往大里说,每个消费者维护自己的Sequence,基本没有跨线程共享的状态。
往小里说,Sequence的加法是CAS实现的。
当生产者需要判断RingBuffer是否已满时,用CAS比较原先RingBuffer的Event个数,和假定放入新Event后Event的个数。
如果CAS返回false,说明在判断期间,别的生产者加入了新Event;或者别的消费者拿走了Event。那么当前判断无效,需要重新判断。
对象的复用,JVM运行时,一怕创建大对象,二怕创建很多小对象。这都会导致JVM堆碎片化、对象元数据存储的额外开销大。这是高性能Java应用的噩梦。
为了解决第二点“很多小对象”,主流开源框架都会自己维护、复用对象池。LMAX Disruptor也不例外。
生产者不是创建新的Event对象,放入到RingBuffer中。而是从RingBuffer中取出一个已有的Event对象,更新它所指向的业务数据,来代表一个逻辑上的新Event。
简单使用
pom
1 2 3 4 5
| <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.2</version> </dependency>
|
LogEvent
自定义实体对象,充当“生产者-消费者”模型中的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| import java.util.Date;
public class LogEvent {
private long logId;
private String content;
private Date date;
public long getLogId() { return logId; }
public void setLogId(long logId) { this.logId = logId; }
public String getContent() { return content; }
public void setContent(String content) { this.content = content; }
public Date getDate() { return date; }
public void setDate(Date date) { this.date = date; }
@Override public String toString() { return "logId=" + logId + ", content='" + content + '\'' + ", date=" + date; } }
|
LogEventFactory
实现EventFactory的接口,用于生产数据。
1 2 3 4 5 6 7 8 9 10 11 12
| import com.lmax.disruptor.EventFactory;
public class LogEventFactory implements EventFactory<LogEvent> { @Override public LogEvent newInstance() { return new LogEvent(); } }
|
LogEventProducer
自定义生产者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import java.util.Date;
import com.lmax.disruptor.RingBuffer;
public class LogEventProducer { private RingBuffer<LogEvent> ringBuffer;
public LogEventProducer(RingBuffer<LogEvent> ringBuffer) { this.ringBuffer = ringBuffer; }
public void onData(long logId, String content, Date date) { long seq = ringBuffer.next(); LogEvent logEvent = ringBuffer.get(seq); logEvent.setLogId(logId); logEvent.setContent(content); logEvent.setDate(date); ringBuffer.publish(seq); } }
|
LogEventProducerWithTranslator
将数据存储到自定义对象中并发布,通过在自定义类中新建EventTranslator类实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import java.util.Date;
import com.lmax.disruptor.EventTranslatorVararg; import com.lmax.disruptor.RingBuffer;
public class LogEventProducerWithTranslator {
private EventTranslatorVararg eventTranslatorVararg = (EventTranslatorVararg<LogEvent>) (event, sequence, args) -> { event.setLogId((Long) args[0]); event.setContent((String) args[1]); event.setDate((Date) args[2]); };
private RingBuffer<LogEvent> ringBuffer;
public LogEventProducerWithTranslator(RingBuffer<LogEvent> ringBuffer) { this.ringBuffer = ringBuffer; }
public void onData(long logId, String content, Date date) { ringBuffer.publishEvent(eventTranslatorVararg, logId, content, date); } }
|
LogEventConsumer
自定义消费者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer;
public class LogEventConsumer implements EventHandler<LogEvent> {
private String name;
public LogEventConsumer(String name) { this.name = name; }
@Override public void onEvent(LogEvent event, long sequence, boolean endOfBatch) { System.out.println("LogEventConsumer name:" + name + ",sequence:" + sequence + ",endOfBatch:" + endOfBatch + ",logEvent:" + event.toString()); } }
|
LogEventMain
启动项,通过单一生产者,多生产者,单一消费者,多消费者的组合,测试了disruptor的功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
| import java.util.Date; import java.util.concurrent.Executors;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; import com.lmax.disruptor.dsl.ProducerType;
public class LogEventMain {
public static void main(String[] args) throws InterruptedException { producer(); producerWithTranslator(); multiConsumer(); multiConsumers(); multiProcedureConsumers();
}
public static void multiProcedureConsumers() throws InterruptedException { LogEventFactory logEventFactory = new LogEventFactory(); int ringBufferSize = 2 << 3; Disruptor<LogEvent> disruptor = new Disruptor<>(logEventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.MULTI, new YieldingWaitStrategy()); LogEventConsumer consumer1 = new LogEventConsumer("1"); LogEventConsumer consumer2 = new LogEventConsumer("2"); LogEventConsumer consumer3 = new LogEventConsumer("3"); LogEventConsumer consumer4 = new LogEventConsumer("4"); LogEventConsumer consumer5 = new LogEventConsumer("5"); disruptor.handleEventsWith(consumer1, consumer2); disruptor.after(consumer1).handleEventsWith(consumer3); disruptor.after(consumer2).handleEventsWith(consumer4); disruptor.after(consumer3, consumer4).handleEventsWith(consumer5); disruptor.start(); RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); LogEventProducer logEventProducer = new LogEventProducer(ringBuffer); LogEventProducer logEventProducer2 = new LogEventProducer(ringBuffer); LogEventProducer logEventProducer3 = new LogEventProducer(ringBuffer); for (int i = 0; i < 10; i++) { logEventProducer.onData(i, "1-logEventProducer" + i, new Date()); logEventProducer2.onData(i, "2-logEventProducer" + i, new Date()); logEventProducer3.onData(i, "3-logEventProducer" + i, new Date()); } Thread.sleep(1000); disruptor.shutdown(); }
public static void multiConsumers() throws InterruptedException { LogEventFactory logEventFactory = new LogEventFactory(); int ringBufferSize = 2 << 3; Disruptor<LogEvent> disruptor = new Disruptor<>(logEventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); LogEventConsumer consumer1 = new LogEventConsumer("1"); LogEventConsumer consumer2 = new LogEventConsumer("2"); LogEventConsumer consumer3 = new LogEventConsumer("3"); LogEventConsumer consumer4 = new LogEventConsumer("4"); LogEventConsumer consumer5 = new LogEventConsumer("5"); disruptor.handleEventsWith(consumer1, consumer2); disruptor.after(consumer1).handleEventsWith(consumer3); disruptor.after(consumer2).handleEventsWith(consumer4); disruptor.after(consumer3, consumer4).handleEventsWith(consumer5); disruptor.start();
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); LogEventProducer logEventProducer = new LogEventProducer(ringBuffer); for (int i = 0; i < 10; i++) { logEventProducer.onData(i, "logEventProducer" + i, new Date()); } Thread.sleep(1000); disruptor.shutdown(); }
public static void multiConsumer() throws InterruptedException { LogEventFactory logEventFactory = new LogEventFactory(); int ringBufferSize = 2 << 3; Disruptor<LogEvent> disruptor = new Disruptor<>(logEventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
EventHandlerGroup<LogEvent> eventEventHandlerGroup = disruptor.handleEventsWith(new LogEventConsumer("1"), new LogEventConsumer("2")); eventEventHandlerGroup.then(new LogEventConsumer("3")); disruptor.start(); RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer); for (int i = 0; i < 10; i++) { producerWithTranslator.onData(i, "producerWithTranslator" + i, new Date()); } Thread.sleep(1000); disruptor.shutdown(); }
public static void producerWithTranslator() throws InterruptedException { LogEventFactory logEventFactory = new LogEventFactory(); int ringBufferSize = 2 << 3; Disruptor<LogEvent> disruptor = new Disruptor<>(logEventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleEventsWith(new LogEventConsumer("1")); disruptor.start();
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer); for (int i = 0; i < 10; i++) { producerWithTranslator.onData(i, "producerWithTranslator" + i, new Date()); } Thread.sleep(1000); disruptor.shutdown(); }
public static void producer() throws InterruptedException { LogEventFactory logEventFactory = new LogEventFactory(); int ringBufferSize = 2 << 3; Disruptor<LogEvent> disruptor = new Disruptor<>(logEventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleEventsWith(new LogEventConsumer("1")); disruptor.start();
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); LogEventProducer logEventProducer = new LogEventProducer(ringBuffer); for (int i = 0; i < 10; i++) { logEventProducer.onData(i, "logEventProducer" + i, new Date()); } Thread.sleep(1000); disruptor.shutdown(); } }
|
参考:
disruptor的简单介绍及使用
高性能线程间队列 DISRUPTOR 简介
浅谈Disruptor
剖析Disruptor:为什么会这么快?(一)Ringbuffer的特别之处
剖析Disruptor:为什么会这么快?(一)锁的缺点
剖析Disruptor:为什么会这么快?(二)神奇的缓存行填充
剖析Disruptor:为什么会这么快?(三)揭秘内存屏障
高性能队列——Disruptor
还在用BlockingQueue?读这篇文章,了解下Disruptor吧