引言

在定位项目中内存泄漏问题时,发现RingBuffer占用内存过大,这个是在使用log4j2时引入的jar,以前只知道必须使用这个Disruptor才可以用异步日志,
但是并不清楚Disruptor的一些实现,也没有通过编码的方式使用过,这次正好研究一下。

Disruptor是一个低延迟(low-latency),高吞吐量(high-throughput)的事件发布订阅框架,用于一个JVM中多个线程之间的消息队列,
作用与ArrayBlockingQueue有相似之处,但是disruptor从功能、性能都远好于ArrayBlockingQueue。

相关概念

RingBuffer

应用需要传递的消息在Disruptor中称为Event(事件)。
RingBuffer是Event的数组,实现了阻塞队列的语义:
如果RingBuffer满了,则生产者会阻塞等待。
如果RingBuffer空了,则消费者会阻塞等待。

Sequence

在上文中,我提到“每个消费者需要自己维护一个指针”。这里的指针就是一个单调递增长整数(及其基于CAS的加法、获取操作),称为Sequence。
除了每个消费者需要维护一个指针外,RingBuffer自身也要维护一个全局指针(如上一节第2点所提到的),记录最后一条可以被消费的消息。

高性能的体现

  • 无锁,无锁就没有锁竞争。当生产者、消费者线程数很高时,意义重大。所以,
    往大里说,每个消费者维护自己的Sequence,基本没有跨线程共享的状态。
    往小里说,Sequence的加法是CAS实现的。
    当生产者需要判断RingBuffer是否已满时,用CAS比较原先RingBuffer的Event个数,和假定放入新Event后Event的个数。
    如果CAS返回false,说明在判断期间,别的生产者加入了新Event;或者别的消费者拿走了Event。那么当前判断无效,需要重新判断。

  • 对象的复用,JVM运行时,一怕创建大对象,二怕创建很多小对象。这都会导致JVM堆碎片化、对象元数据存储的额外开销大。这是高性能Java应用的噩梦。
    为了解决第二点“很多小对象”,主流开源框架都会自己维护、复用对象池。LMAX Disruptor也不例外。
    生产者不是创建新的Event对象,放入到RingBuffer中。而是从RingBuffer中取出一个已有的Event对象,更新它所指向的业务数据,来代表一个逻辑上的新Event。

简单使用

pom

1
2
3
4
5
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>

LogEvent

自定义实体对象,充当“生产者-消费者”模型中的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import java.util.Date;

/**
* 自定义实体对象,充当“生产者-消费者”模型中的数据<br>
*/
public class LogEvent {
/**
* The Log id.
*/
private long logId;
/**
* The Content.
*/
private String content;
/**
* The Date.
*/
private Date date;

/**
* Gets log id.
*
* @return the log id
*/
public long getLogId() {
return logId;
}

/**
* Sets log id.
*
* @param logId the log id
*/
public void setLogId(long logId) {
this.logId = logId;
}

/**
* Gets content.
*
* @return the content
*/
public String getContent() {
return content;
}

/**
* Sets content.
*
* @param content the content
*/
public void setContent(String content) {
this.content = content;
}

/**
* Gets date.
*
* @return the date
*/
public Date getDate() {
return date;
}

/**
* Sets date.
*
* @param date the date
*/
public void setDate(Date date) {
this.date = date;
}

@Override
public String toString() {
return "logId=" + logId +
", content='" + content + '\'' +
", date=" + date;
}
}

LogEventFactory

实现EventFactory的接口,用于生产数据。

1
2
3
4
5
6
7
8
9
10
11
12
import com.lmax.disruptor.EventFactory;

/**
* 事件生成工厂,用来初始化预分配事件对象,即根据RingBuffer大小创建的实体对象
*/
public class LogEventFactory implements EventFactory<LogEvent> {
@Override
public LogEvent newInstance() {
return new LogEvent();
}
}

LogEventProducer

自定义生产者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.util.Date;

import com.lmax.disruptor.RingBuffer;

/**
* 自定义生产者
*/
public class LogEventProducer {
private RingBuffer<LogEvent> ringBuffer;

public LogEventProducer(RingBuffer<LogEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}

public void onData(long logId, String content, Date date) {
//递增并返回循环缓冲区的下一个序列。
long seq = ringBuffer.next();
//获取RingBuffer中给定序列的事件
LogEvent logEvent = ringBuffer.get(seq);
logEvent.setLogId(logId);
logEvent.setContent(content);
logEvent.setDate(date);
//发布指定的序列。此操作将此特定消息标记为可读取。
ringBuffer.publish(seq);
}
}

LogEventProducerWithTranslator

将数据存储到自定义对象中并发布,通过在自定义类中新建EventTranslator类实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.Date;

import com.lmax.disruptor.EventTranslatorVararg;
import com.lmax.disruptor.RingBuffer;

/**
* 使用translator方式到事件生产者发布事件,通常使用该方法
*/
public class LogEventProducerWithTranslator {

/**
* 实现将另一个数据表示转换为从{@link RingBuffer}声明的事件。
*/
private EventTranslatorVararg eventTranslatorVararg = (EventTranslatorVararg<LogEvent>) (event, sequence, args) -> {
event.setLogId((Long) args[0]);
event.setContent((String) args[1]);
event.setDate((Date) args[2]);
};

private RingBuffer<LogEvent> ringBuffer;

public LogEventProducerWithTranslator(RingBuffer<LogEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}

public void onData(long logId, String content, Date date) {
//允许用户提供可变数量的参数
ringBuffer.publishEvent(eventTranslatorVararg, logId, content, date);
}
}

LogEventConsumer

自定义消费者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;

/**
* 自定义消费者
*/
public class LogEventConsumer implements EventHandler<LogEvent> {


/**
* The Name.
*/
private String name;

/**
* Instantiates a new Log event consumer.
*
* @param name the name
*/
public LogEventConsumer(String name) {
this.name = name;
}

/**
* 当发布者将事件发布到RingBuffer时调用。
*
* @param event published to the {@link RingBuffer}
* @param sequence 正在处理的事件的sequence
* @param endOfBatch 标志,指示这是否是来自{@link RingBuffer}的批处理中的最后一个事件
*/
@Override
public void onEvent(LogEvent event, long sequence, boolean endOfBatch) {
System.out.println("LogEventConsumer name:" + name + ",sequence:" + sequence + ",endOfBatch:" + endOfBatch + ",logEvent:" + event.toString());
}
}

LogEventMain

启动项,通过单一生产者,多生产者,单一消费者,多消费者的组合,测试了disruptor的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201

import java.util.Date;
import java.util.concurrent.Executors;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;

/**
* 启动
*/
public class LogEventMain {
/**
* The entry point of application.
*
* @param args the input arguments
* @throws InterruptedException the interrupted exception
*/
public static void main(String[] args) throws InterruptedException {
// 单个生产者和消费者的模式
producer();
// 使用EventTranslatorVararg的单个生产者和消费者模式
producerWithTranslator();
// 一个生产者,3个消费者,其中前面2个消费者完成后第3个消费者才可以消费
multiConsumer();
// 一个生产者,多个消费者,有2条支线
multiConsumers();
// 多个生产者,多个消费者,有2条消费者支线
multiProcedureConsumers();

}

/**
* 多个生产者,多个消费者,有2条消费者支线,其中消费者1和消费者3在同一条支线上,
* 消费者2和消费者4在同一条支线上,消费者5是消费者3和消费者4的终点消费者
* 这样的消费将会在消费者1和消费者2把所有的RingBuffer大小消费完成后才会执行消费者3和消费者4
* 在消费者3和消费者4把RingBuffer大小消费完成后才会执行消费者5
* 消费者5消费完RingBuffer大小后又按照上面的顺序来消费
* 如果剩余的生产数据比RingBuffer小,那么还是要依照顺序来
* 生产者只是多生产了数据
*/
public static void multiProcedureConsumers() throws InterruptedException {
LogEventFactory logEventFactory = new LogEventFactory();
//用于生成RingBuffer大小,其大小必须是2的n次方
int ringBufferSize = 2 << 3;
//定义Disruptor初始化信息
Disruptor<LogEvent> disruptor = new Disruptor<>(logEventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.MULTI, new YieldingWaitStrategy());
LogEventConsumer consumer1 = new LogEventConsumer("1");
LogEventConsumer consumer2 = new LogEventConsumer("2");
LogEventConsumer consumer3 = new LogEventConsumer("3");
LogEventConsumer consumer4 = new LogEventConsumer("4");
LogEventConsumer consumer5 = new LogEventConsumer("5");
//同时执行消费者1和消费者2
disruptor.handleEventsWith(consumer1, consumer2);
//消费者1后面执行消费者3
disruptor.after(consumer1).handleEventsWith(consumer3);
//消费者后面执行消费者4
disruptor.after(consumer2).handleEventsWith(consumer4);
//消费者3和消费者3执行完后执行消费者5
disruptor.after(consumer3, consumer4).handleEventsWith(consumer5);
//定义事件的开始
disruptor.start();
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
//进行事件的发布
LogEventProducer logEventProducer = new LogEventProducer(ringBuffer);
LogEventProducer logEventProducer2 = new LogEventProducer(ringBuffer);
LogEventProducer logEventProducer3 = new LogEventProducer(ringBuffer);
for (int i = 0; i < 10; i++) {
logEventProducer.onData(i, "1-logEventProducer" + i, new Date());
logEventProducer2.onData(i, "2-logEventProducer" + i, new Date());
logEventProducer3.onData(i, "3-logEventProducer" + i, new Date());
}
Thread.sleep(1000);
//关闭Disruptor
disruptor.shutdown();
}

/**
* 一个生产者,多个消费者,有2条支线,其中消费者1和消费者3在同一条支线上,
* 消费者2和消费者4在同一条支线上,消费者5是消费者3和消费者4的终点消费者
* 这样的消费将会在消费者1和消费者2把所有的RingBuffer大小消费完成后才会执行消费者3和消费者4
* 在消费者3和消费者4把RingBuffer大小消费完成后才会执行消费者5
* 消费者5消费完RingBuffer大小后又按照上面的顺序来消费
* 如果剩余的生产数据比RingBuffer小,那么还是要依照顺序来
*/
public static void multiConsumers() throws InterruptedException {
LogEventFactory logEventFactory = new LogEventFactory();
//用于生成RingBuffer大小,其大小必须是2的n次方
int ringBufferSize = 2 << 3;
//定义Disruptor初始化信息
Disruptor<LogEvent> disruptor = new Disruptor<>(logEventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
LogEventConsumer consumer1 = new LogEventConsumer("1");
LogEventConsumer consumer2 = new LogEventConsumer("2");
LogEventConsumer consumer3 = new LogEventConsumer("3");
LogEventConsumer consumer4 = new LogEventConsumer("4");
LogEventConsumer consumer5 = new LogEventConsumer("5");
//同时执行消费者1和消费者2
disruptor.handleEventsWith(consumer1, consumer2);
//消费者1后面执行消费者3
disruptor.after(consumer1).handleEventsWith(consumer3);
//消费者后面执行消费者4
disruptor.after(consumer2).handleEventsWith(consumer4);
//消费者3和消费者3执行完后执行消费者5
disruptor.after(consumer3, consumer4).handleEventsWith(consumer5);
//定义事件的开始
disruptor.start();

RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
//进行事件的发布
LogEventProducer logEventProducer = new LogEventProducer(ringBuffer);
for (int i = 0; i < 10; i++) {
logEventProducer.onData(i, "logEventProducer" + i, new Date());
}
Thread.sleep(1000);
//关闭Disruptor
disruptor.shutdown();
}

/**
* 一个生产者,3个消费者,其中前面2个消费者完成后第3个消费者才可以消费
* 也即使说当前面2个消费者把所有的RingBuffer占领完成,同时都消费完成后才会有第3个消费者的消费
* 当发布的事件数量大于RingBuffer的大小的时候,在第3个消费者消费完RingBuffer大小的时候前面2个消费者才能继续消费,序号递增的
*/
public static void multiConsumer() throws InterruptedException {
LogEventFactory logEventFactory = new LogEventFactory();
//用于生成RingBuffer大小,其大小必须是2的n次方
int ringBufferSize = 2 << 3;
//定义Disruptor初始化信息
Disruptor<LogEvent> disruptor = new Disruptor<>(logEventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());

//设置多个消费者
EventHandlerGroup<LogEvent> eventEventHandlerGroup = disruptor.handleEventsWith(new LogEventConsumer("1"), new LogEventConsumer("2"));
eventEventHandlerGroup.then(new LogEventConsumer("3"));
//启动事件的开始
disruptor.start();
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
//进行事件的发布
LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer);
for (int i = 0; i < 10; i++) {
producerWithTranslator.onData(i, "producerWithTranslator" + i, new Date());
}
Thread.sleep(1000);
//关闭Disruptor
disruptor.shutdown();
}

/**
* 使用EventTranslatorVararg的单个生产者和消费者模式
*/
public static void producerWithTranslator() throws InterruptedException {
LogEventFactory logEventFactory = new LogEventFactory();
//用于生成RingBuffer大小,其大小必须是2的n次方
int ringBufferSize = 2 << 3;
//定义Disruptor初始化信息
Disruptor<LogEvent> disruptor = new Disruptor<>(logEventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
//定义处理事件的消费者
disruptor.handleEventsWith(new LogEventConsumer("1"));
//定义事件的开始
disruptor.start();

RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
//进行事件的发布
LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer);
for (int i = 0; i < 10; i++) {
producerWithTranslator.onData(i, "producerWithTranslator" + i, new Date());
}
Thread.sleep(1000);
//关闭Disruptor
disruptor.shutdown();
}

/**
* 单个生产者和消费者的模式
*/
public static void producer() throws InterruptedException {
// 事件生成工厂
LogEventFactory logEventFactory = new LogEventFactory();
//用于生成RingBuffer大小,其大小必须是2的n次方
int ringBufferSize = 2 << 3;
//定义Disruptor初始化信息
Disruptor<LogEvent> disruptor = new Disruptor<>(logEventFactory, ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());
//定义处理事件的消费者
disruptor.handleEventsWith(new LogEventConsumer("1"));
//定义事件的开始
disruptor.start();

//获取RingBuffer
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
//进行事件的发布
LogEventProducer logEventProducer = new LogEventProducer(ringBuffer);
for (int i = 0; i < 10; i++) {
logEventProducer.onData(i, "logEventProducer" + i, new Date());
}
Thread.sleep(1000);
//关闭Disruptor
disruptor.shutdown();
}
}

参考:

disruptor的简单介绍及使用
高性能线程间队列 DISRUPTOR 简介
浅谈Disruptor
剖析Disruptor:为什么会这么快?(一)Ringbuffer的特别之处
剖析Disruptor:为什么会这么快?(一)锁的缺点
剖析Disruptor:为什么会这么快?(二)神奇的缓存行填充
剖析Disruptor:为什么会这么快?(三)揭秘内存屏障
高性能队列——Disruptor
还在用BlockingQueue?读这篇文章,了解下Disruptor吧