文章首发于:clawhub.club
项目中使用BDB作为本地持久化缓存,效果很稳定,最近事情比较少,所以在研究能不能将LMDB改成本地持久化队列,根据网上收集的一些性能报告,LMDB还是优于BDB的,参考:LMDB简介 。
既然是本地持久化队列,首先是本地持久化,LMDB是支持的;其次是队列,先进先出,那就需要指针操作,即key-val中,key为指针位。
开搞~ 首先解决LMDB底层操作,在这引入lmdbjava的maven依赖:
1 2 3 4 5 <dependency > <groupId > org.lmdbjava</groupId > <artifactId > lmdbjava</artifactId > <version > 0.7.0</version > </dependency >
操作LMDB的相关方法:因为是测试,也没有规范相关代码,配置项也没有抽出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 import org.lmdbjava.Dbi;import org.lmdbjava.DbiFlags;import org.lmdbjava.Env;import org.lmdbjava.EnvFlags;import org.lmdbjava.Stat;import org.lmdbjava.Txn;import java.io.File;import java.nio.ByteBuffer;import static org.lmdbjava.Env.create;public enum LMDB { INSTANCE; private Env<ByteBuffer> env; private Dbi<ByteBuffer> db; LMDB() { env = create() .setMapSize(1024 * 1024 * 1024 ) .setMaxReaders(1 ) .setMaxDbs(1 ) .open(new File ("D:\\lmdb" ), EnvFlags.MDB_FIXEDMAP, EnvFlags.MDB_NOSYNC, EnvFlags.MDB_WRITEMAP); db = env.openDbi("db-1" , DbiFlags.MDB_CREATE); noCheck(); } public void noCheck () { System.setProperty(Env.DISABLE_CHECKS_PROP, Boolean.TRUE.toString()); } public ByteBuffer get (ByteBuffer key) { try (Txn<ByteBuffer> txn = env.txnWrite()) { return db.get(txn, key); } } public void put (ByteBuffer key, ByteBuffer val) { db.put(key, val); } public boolean del (ByteBuffer key) { return db.delete(key); } public Stat state () { try (Txn<ByteBuffer> txn = env.txnWrite()) { return db.stat(txn); } } public void closeDb () { if (db != null ) { db.close(); } } public void closeEnv () { if (env != null && !env.isClosed()) { env.close(); } } }
把ByteBuffer的工具类也贴出来记录一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 import java.nio.ByteBuffer; import java.util.Objects; import static java.nio.ByteBuffer.allocateDirect; import static java.nio.charset.StandardCharsets.UTF_8; public class ByteBufferUtil { /** * Byte buffer to byte array byte [ ]. * * @param byteBuffer the byte buffer * @return the byte [ ] */ public static byte[] byteBufferToByteArray(final ByteBuffer byteBuffer) { final int len = byteBuffer.limit() - byteBuffer.position(); final byte[] bytes = new byte[len]; if (byteBuffer.isReadOnly()) { return null; } byteBuffer.get(bytes); return bytes; } public static String byteBufferToString(ByteBuffer byteBuffer) { return new String(Objects.requireNonNull(byteBufferToByteArray(byteBuffer))); } /** * String to byte buffer byte buffer. * * @param value the value * @return the byte buffer */ public static ByteBuffer stringToByteBuffer(String value) { final byte[] keyBytes = value.getBytes(UTF_8); final ByteBuffer keyBuffer = allocateDirect(keyBytes.length); keyBuffer.put(keyBytes).flip(); return keyBuffer; } public static ByteBuffer longToByteBuffer(Long value) { byte[] longArray = ByteBuffer.allocate(8).putLong(value).array(); return getByteBufferAndFlip(longArray); } private static ByteBuffer getByteBufferAndFlip(byte[] bytes) { ByteBuffer buffer = allocateDirect(bytes.length); //一定要先执行flip再get,否则会报错 buffer.put(bytes).flip(); return buffer; } public static byte[] longToBytes(long x) { ByteBuffer buffer = ByteBuffer.allocate(8); buffer.putLong(0, x); return buffer.array(); } public static long bytesToLong(byte[] bytes) { ByteBuffer buffer = ByteBuffer.allocate(8); buffer.put(bytes, 0, bytes.length); buffer.flip();//need flip return buffer.getLong(); } }
接下来就是比较核心的地方了,首先得实现Queue的一些方法,比如所poll,peek,offer之类。 这里要分析一下了。 首先:
poll获取第一个元素并删除 得维护一个首指针,执行这个方法得获取同步锁,在获取元素之后,删除元素,并把首指针+1(即向后移动一位),并将首指针交由LMDB管理。这几步操作必须保证原子性。
peek 获取第一个元素,不删除 直接根据首指针获取即可,也需要同步锁,因为当操作peek的时候,可能有线程操作了poll,以至于peek拿到的首指针已经后移,获取数据失败。
offer 插入一个元素 肯定要插入到队尾,即要维护一个队尾指针,并且要使用AtomicLong原子类,这个AtomicLong的初始值为系统启动时,获取的LMDB库中元素的数量+首指针的值(也在LMDB中,最初为0)。当增加一个元素时,AtomicLong+1并获取这个值作为Key,所以这个方法不需要同步锁。 简单的代码描述:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 import java.nio.ByteBuffer;import java.util.concurrent.atomic.AtomicLong;public class IndexQueue { private long capacity; private long firstIndex; private AtomicLong currentIndex; public IndexQueue () { capacity = LMDB.INSTANCE.state().entries; ByteBuffer byteBuffer = LMDB.INSTANCE.get(ByteBufferUtil.stringToByteBuffer("index_queue_first_index" )); if (byteBuffer != null ) { firstIndex = byteBuffer.getLong(); } currentIndex = new AtomicLong (capacity + firstIndex); } public void offer (String val) { long i = currentIndex.getAndIncrement(); System.out.println(i); LMDB.INSTANCE.put(ByteBufferUtil.longToByteBuffer(i), ByteBufferUtil.stringToByteBuffer(val)); } public synchronized String peek () { return ByteBufferUtil.byteBufferToString(LMDB.INSTANCE.get(ByteBufferUtil.longToByteBuffer(firstIndex))); } public synchronized String poll () { String item = ByteBufferUtil.byteBufferToString(LMDB.INSTANCE.get(ByteBufferUtil.longToByteBuffer(firstIndex))); LMDB.INSTANCE.del(ByteBufferUtil.longToByteBuffer(firstIndex)); firstIndex += 1 ; System.out.println("firstIndex:" + firstIndex); LMDB.INSTANCE.put(ByteBufferUtil.stringToByteBuffer("index_queue_first_index" ), ByteBufferUtil.longToByteBuffer(firstIndex)); return item; } public static void main (String[] args) { IndexQueue indexQueue = new IndexQueue (); long start = System.nanoTime(); String item = indexQueue.peek(); System.out.println(item); System.out.println(System.nanoTime() - start); System.out.println(LMDB.INSTANCE.state()); LMDB.INSTANCE.closeDb(); LMDB.INSTANCE.closeEnv(); } public void close () { } }
初步测试效果比较理想,持续在研究,如果有网友也在做这块工作,欢迎留言交流。