文章首发于:clawhub.club


项目中使用BDB作为本地持久化缓存,效果很稳定,最近事情比较少,所以在研究能不能将LMDB改成本地持久化队列,根据网上收集的一些性能报告,LMDB还是优于BDB的,参考:LMDB简介

既然是本地持久化队列,首先是本地持久化,LMDB是支持的;其次是队列,先进先出,那就需要指针操作,即key-val中,key为指针位。

开搞~
首先解决LMDB底层操作,在这引入lmdbjava的maven依赖:

1
2
3
4
5
<dependency>
<groupId>org.lmdbjava</groupId>
<artifactId>lmdbjava</artifactId>
<version>0.7.0</version>
</dependency>

操作LMDB的相关方法:因为是测试,也没有规范相关代码,配置项也没有抽出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import org.lmdbjava.Dbi;
import org.lmdbjava.DbiFlags;
import org.lmdbjava.Env;
import org.lmdbjava.EnvFlags;
import org.lmdbjava.Stat;
import org.lmdbjava.Txn;

import java.io.File;
import java.nio.ByteBuffer;

import static org.lmdbjava.Env.create;

/**
* @author clawhub
*/

public enum LMDB {
/**
* Instance lmdb.
*/
INSTANCE;
/**
* The Env.
*/
private Env<ByteBuffer> env;

/**
* The Db.
*/
private Dbi<ByteBuffer> db;

LMDB() {
env = create()
.setMapSize(1024 * 1024 * 1024)
.setMaxReaders(1)
.setMaxDbs(1)
.open(new File("D:\\lmdb"), EnvFlags.MDB_FIXEDMAP, EnvFlags.MDB_NOSYNC, EnvFlags.MDB_WRITEMAP);
db = env.openDbi("db-1", DbiFlags.MDB_CREATE);
noCheck();
}

/**
* No check.能适当增加性能
*/
public void noCheck() {
System.setProperty(Env.DISABLE_CHECKS_PROP, Boolean.TRUE.toString());
}


/**
* Get ByteBuffer版本
*
* @param key the key
* @return the byte buffer
*/
public ByteBuffer get(ByteBuffer key) {
try (Txn<ByteBuffer> txn = env.txnWrite()) {
return db.get(txn, key);
}
}

/**
* Put ByteBuffer版本
*
* @param key the key
* @param val the val
*/
public void put(ByteBuffer key, ByteBuffer val) {
db.put(key, val);
}

/**
* del ByteBuffer版本
*
* @param key the key
* @return boolean
*/
public boolean del(ByteBuffer key) {
return db.delete(key);
}

/**
* 获取库中状态
* @return Stat
*/
public Stat state() {
try (Txn<ByteBuffer> txn = env.txnWrite()) {
return db.stat(txn);
}
}

public void closeDb() {
if (db != null) {
db.close();
}
}

public void closeEnv() {
if (env != null && !env.isClosed()) {
env.close();
}
}
}

把ByteBuffer的工具类也贴出来记录一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

import java.nio.ByteBuffer;
import java.util.Objects;

import static java.nio.ByteBuffer.allocateDirect;
import static java.nio.charset.StandardCharsets.UTF_8;

public class ByteBufferUtil {
/**
* Byte buffer to byte array byte [ ].
*
* @param byteBuffer the byte buffer
* @return the byte [ ]
*/
public static byte[] byteBufferToByteArray(final ByteBuffer byteBuffer) {
final int len = byteBuffer.limit() - byteBuffer.position();
final byte[] bytes = new byte[len];
if (byteBuffer.isReadOnly()) {
return null;
}
byteBuffer.get(bytes);
return bytes;
}

public static String byteBufferToString(ByteBuffer byteBuffer) {
return new String(Objects.requireNonNull(byteBufferToByteArray(byteBuffer)));
}


/**
* String to byte buffer byte buffer.
*
* @param value the value
* @return the byte buffer
*/
public static ByteBuffer stringToByteBuffer(String value) {
final byte[] keyBytes = value.getBytes(UTF_8);
final ByteBuffer keyBuffer = allocateDirect(keyBytes.length);
keyBuffer.put(keyBytes).flip();
return keyBuffer;
}

public static ByteBuffer longToByteBuffer(Long value) {
byte[] longArray = ByteBuffer.allocate(8).putLong(value).array();
return getByteBufferAndFlip(longArray);
}

private static ByteBuffer getByteBufferAndFlip(byte[] bytes) {
ByteBuffer buffer = allocateDirect(bytes.length);
//一定要先执行flip再get,否则会报错
buffer.put(bytes).flip();
return buffer;
}

public static byte[] longToBytes(long x) {
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.putLong(0, x);
return buffer.array();
}

public static long bytesToLong(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.put(bytes, 0, bytes.length);
buffer.flip();//need flip
return buffer.getLong();
}

}

接下来就是比较核心的地方了,首先得实现Queue的一些方法,比如所poll,peek,offer之类。
这里要分析一下了。
首先:

poll获取第一个元素并删除

得维护一个首指针,执行这个方法得获取同步锁,在获取元素之后,删除元素,并把首指针+1(即向后移动一位),并将首指针交由LMDB管理。这几步操作必须保证原子性。

peek 获取第一个元素,不删除

直接根据首指针获取即可,也需要同步锁,因为当操作peek的时候,可能有线程操作了poll,以至于peek拿到的首指针已经后移,获取数据失败。

offer 插入一个元素

肯定要插入到队尾,即要维护一个队尾指针,并且要使用AtomicLong原子类,这个AtomicLong的初始值为系统启动时,获取的LMDB库中元素的数量+首指针的值(也在LMDB中,最初为0)。当增加一个元素时,AtomicLong+1并获取这个值作为Key,所以这个方法不需要同步锁。
简单的代码描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;

public class IndexQueue {
private long capacity;
private long firstIndex;
private AtomicLong currentIndex;

public IndexQueue() {
//系统启动后,获取lmdb中数据量多少
capacity = LMDB.INSTANCE.state().entries;
//获取 firstIndex
ByteBuffer byteBuffer = LMDB.INSTANCE.get(ByteBufferUtil.stringToByteBuffer("index_queue_first_index"));
if (byteBuffer != null) {
firstIndex = byteBuffer.getLong();
}
//当前指针
currentIndex = new AtomicLong(capacity + firstIndex);
}

public void offer(String val) {
long i = currentIndex.getAndIncrement();
System.out.println(i);
LMDB.INSTANCE.put(ByteBufferUtil.longToByteBuffer(i), ByteBufferUtil.stringToByteBuffer(val));
}

public synchronized String peek() {
return ByteBufferUtil.byteBufferToString(LMDB.INSTANCE.get(ByteBufferUtil.longToByteBuffer(firstIndex)));

}

public synchronized String poll() {
String item = ByteBufferUtil.byteBufferToString(LMDB.INSTANCE.get(ByteBufferUtil.longToByteBuffer(firstIndex)));
LMDB.INSTANCE.del(ByteBufferUtil.longToByteBuffer(firstIndex));
firstIndex += 1;
//更新firstIndex
System.out.println("firstIndex:" + firstIndex);
LMDB.INSTANCE.put(ByteBufferUtil.stringToByteBuffer("index_queue_first_index"), ByteBufferUtil.longToByteBuffer(firstIndex));
return item;
}

public static void main(String[] args) {
IndexQueue indexQueue = new IndexQueue();
long start = System.nanoTime();
// for (int i = 0; i < 100000; i++) {
// indexQueue.offer("offer test:" + i);
// }
String item = indexQueue.peek();
System.out.println(item);
System.out.println(System.nanoTime() - start);
System.out.println(LMDB.INSTANCE.state());
LMDB.INSTANCE.closeDb();
LMDB.INSTANCE.closeEnv();
}

public void close() {
//系统关闭前时,将数据写入lmdb
}
}

初步测试效果比较理想,持续在研究,如果有网友也在做这块工作,欢迎留言交流。