博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于Berkeley DB实现的持久化队列
阅读量:6818 次
发布时间:2019-06-26

本文共 9770 字,大约阅读时间需要 32 分钟。

转自:http://guoyunsky.iteye.com/blog/1169912

   队列很常见,但大部分的队列是将数据放入到内存.如果数据过多,就有内存溢出危险,而且长久占据着内存,也会影响性能.比如爬虫,将要抓取的URL放到内存,而URL过多,内存肯定要爆.在读Heritrix源码中,发现Heritrix是基于Bdb实现了一个持久化队列,于是我就将这块代码独立出来,平时使用也蛮爽的,现在拿出来共享.同时数据已经持久化,相比放在内存的一次性,可以循环累加使用.

      大家也知道BDB的高性能和嵌入式.但这个持久化队列我觉得比较适合单机.如果涉及到分布式,就不大适合了.毕竟分布式要通信,负载均衡,冗余等.可以用其他的数据库等替代.

      这里大概先说下实现原理,BDB是Key-Value型数据库,而队列是FIFO.所以这个持久化队列以位置作为BDB的Key,数据作为BDB的Value.然后用两个变量,分别记录队列两头的位置,也就是头部和尾部.当有数据插入的时候,就以尾部的位置为这个数据的Key.而当要取出数据时,以头部位置作为Key,获取这个Key的数据.原理大概如此,这个类也继承AbstractQueue,这里贴上代码.以下代码需引用bdb-je,common-io,junit.请在附件中下载

1.自定义的BDB环境类,可以缓存StoredClassCatalog并共享

package com.guoyun.util;import java.io.File;import com.sleepycat.bind.serial.StoredClassCatalog;import com.sleepycat.je.Database;import com.sleepycat.je.DatabaseConfig;import com.sleepycat.je.DatabaseException;import com.sleepycat.je.Environment;import com.sleepycat.je.EnvironmentConfig;/** * BDB数据库环境,可以缓存StoredClassCatalog并共享 *  * @contributor guoyun */public class BdbEnvironment extends Environment {    StoredClassCatalog classCatalog;     Database classCatalogDB;        /**     * Constructor     *      * @param envHome 数据库环境目录     * @param envConfig config options  数据库换纪念馆配置     * @throws DatabaseException     */    public BdbEnvironment(File envHome, EnvironmentConfig envConfig) throws DatabaseException {        super(envHome, envConfig);    }    /**     * 返回StoredClassCatalog     * @return the cached class catalog     */    public StoredClassCatalog getClassCatalog() {        if(classCatalog == null) {            DatabaseConfig dbConfig = new DatabaseConfig();            dbConfig.setAllowCreate(true);            try {                classCatalogDB = openDatabase(null, "classCatalog", dbConfig);                classCatalog = new StoredClassCatalog(classCatalogDB);            } catch (DatabaseException e) {                // TODO Auto-generated catch block                throw new RuntimeException(e);            }        }        return classCatalog;    }    @Override    public synchronized void close() throws DatabaseException {        if(classCatalogDB!=null) {            classCatalogDB.close();        }        super.close();    }}

 2.  基于BDB实现的持久化队列

package com.guoyun.util;import java.io.File;import java.io.IOException;import java.io.Serializable;import java.util.AbstractQueue;import java.util.Iterator;import java.util.concurrent.atomic.AtomicLong;import org.apache.commons.io.FileUtils;import com.sleepycat.bind.EntryBinding;import com.sleepycat.bind.serial.SerialBinding;import com.sleepycat.bind.serial.StoredClassCatalog;import com.sleepycat.bind.tuple.TupleBinding;import com.sleepycat.collections.StoredMap;import com.sleepycat.collections.StoredSortedMap;import com.sleepycat.je.Database;import com.sleepycat.je.DatabaseConfig;import com.sleepycat.je.DatabaseException;import com.sleepycat.je.DatabaseExistsException;import com.sleepycat.je.DatabaseNotFoundException;import com.sleepycat.je.EnvironmentConfig;/** * 持久化队列,基于BDB实现,也继承Queue,以及可以序列化.但不等同于Queue的时,不再使用后需要关闭 * 相比一般的内存Queue,插入和获取值需要多消耗一定的时间 * 这里为什么是继承AbstractQueue而不是实现Queue接口,是因为只要实现offer,peek,poll几个方法即可, * 其他如remove,addAll,AbstractQueue会基于这几个方法去实现 *  * @contributor guoyun * @param 
*/public class BdbPersistentQueue
extends AbstractQueue
implements Serializable { private static final long serialVersionUID = 3427799316155220967L; private transient BdbEnvironment dbEnv; // 数据库环境,无需序列化 private transient Database queueDb; // 数据库,用于保存值,使得支持队列持久化,无需序列化 private transient StoredMap
queueMap; // 持久化Map,Key为指针位置,Value为值,无需序列化 private transient String dbDir; // 数据库所在目录 private transient String dbName; // 数据库名字 private AtomicLong headIndex; // 头部指针 private AtomicLong tailIndex; // 尾部指针 private transient E peekItem=null; // 当前获取的值 /** * 构造函数,传入BDB数据库 * * @param db * @param valueClass * @param classCatalog */ public BdbPersistentQueue(Database db,Class
valueClass,StoredClassCatalog classCatalog){ this.queueDb=db; this.dbName=db.getDatabaseName(); headIndex=new AtomicLong(0); tailIndex=new AtomicLong(0); bindDatabase(queueDb,valueClass,classCatalog); } /** * 构造函数,传入BDB数据库位置和名字,自己创建数据库 * * @param dbDir * @param dbName * @param valueClass */ public BdbPersistentQueue(String dbDir,String dbName,Class
valueClass){ headIndex=new AtomicLong(0); tailIndex=new AtomicLong(0); this.dbDir=dbDir; this.dbName=dbName; createAndBindDatabase(dbDir,dbName,valueClass); } /** * 绑定数据库 * * @param db * @param valueClass * @param classCatalog */ public void bindDatabase(Database db, Class
valueClass, StoredClassCatalog classCatalog){ EntryBinding
valueBinding = TupleBinding.getPrimitiveBinding(valueClass); if(valueBinding == null) { valueBinding = new SerialBinding
(classCatalog, valueClass); // 序列化绑定 } queueDb = db; queueMap = new StoredSortedMap
( db, // db TupleBinding.getPrimitiveBinding(Long.class), //Key valueBinding, // Value true); // allow write } /** * 创建以及绑定数据库 * * @param dbDir * @param dbName * @param valueClass * @throws DatabaseNotFoundException * @throws DatabaseExistsException * @throws DatabaseException * @throws IllegalArgumentException */ private void createAndBindDatabase(String dbDir, String dbName,Class
valueClass) throws DatabaseNotFoundException, DatabaseExistsException,DatabaseException,IllegalArgumentException{ File envFile = null; EnvironmentConfig envConfig = null; DatabaseConfig dbConfig = null; Database db=null; try { // 数据库位置 envFile = new File(dbDir); // 数据库环境配置 envConfig = new EnvironmentConfig(); envConfig.setAllowCreate(true); envConfig.setTransactional(false); // 数据库配置 dbConfig = new DatabaseConfig(); dbConfig.setAllowCreate(true); dbConfig.setTransactional(false); dbConfig.setDeferredWrite(true); // 创建环境 dbEnv = new BdbEnvironment(envFile, envConfig); // 打开数据库 db = dbEnv.openDatabase(null, dbName, dbConfig); // 绑定数据库 bindDatabase(db,valueClass,dbEnv.getClassCatalog()); } catch (DatabaseNotFoundException e) { throw e; } catch (DatabaseExistsException e) { throw e; } catch (DatabaseException e) { throw e; } catch (IllegalArgumentException e) { throw e; } } /** * 值遍历器 */ @Override public Iterator
iterator() { return queueMap.values().iterator(); } /** * 大小 */ @Override public int size() { synchronized(tailIndex){ synchronized(headIndex){ return (int)(tailIndex.get()-headIndex.get()); } } } /** * 插入值 */ @Override public boolean offer(E e) { synchronized(tailIndex){ queueMap.put(tailIndex.getAndIncrement(), e); // 从尾部插入 } return true; } /** * 获取值,从头部获取 */ @Override public E peek() { synchronized(headIndex){ if(peekItem!=null){ return peekItem; } E headItem=null; while(headItem==null&&headIndex.get()

3. 测试类,测试数据准确性和性能

package com.guoyun.util;import java.io.File;import java.util.Queue;import java.util.concurrent.LinkedBlockingQueue;import junit.framework.TestCase;public class BdbPersistentQueueTest extends TestCase{    Queue
memoryQueue; Queue
persistentQueue; @Override protected void setUp() throws Exception { super.setUp(); memoryQueue=new LinkedBlockingQueue
(); String dbDir="E:/java/test/bdbDir"; File file=new File(dbDir); if(!file.exists()||!file.isDirectory()){ file.mkdirs(); } persistentQueue=new BdbPersistentQueue(dbDir,"pq",String.class); } @Override protected void tearDown() throws Exception { super.tearDown(); memoryQueue.clear(); memoryQueue=null; persistentQueue.clear(); persistentQueue=null; } /** * 排放值 * @param queue * @return 排放的数据个数 */ public int drain(Queue
queue){ int count=0; while(true){ try { queue.remove(); count++; } catch (Exception e) { return count; } } } /** * * @param queue * @param size */ public void fill(Queue
queue,int size){ for(int i=0;i
queue,String queueName,int size){ if(queue.size()!=size){ System.err.println("Error size of "+queueName); } String value=null; for(int i=0;i
"+value); } } } /** * 测试队列中数据的准确性,包括长度 */ public void testExact(){ int size=100; fill(memoryQueue,size); fill(persistentQueue,size); checkDataExact(memoryQueue,"MemoryQueue",100); checkDataExact(persistentQueue,"PersistentQueue",100); } }

4.测试性能

========测试1000000(十万)条数据=================

1.内存Queue插入和排空数据所耗时间
 填充 100000 条数据耗时: 53.550787 毫秒,单条耗时: 535.50787 纳秒
 排空 100000 条数据耗时: 27.09901 毫秒,单条耗时: 270.9901 纳秒
2.持久化Queue插入和排空数据所耗时间
 填充 100000 条数据耗时: 1399.644305 毫秒,单条耗时: 0.01399644305 豪秒
 排空 100000 条数据耗时: 2104.765179 毫秒,单条耗时: 21.04765179 豪秒

 持久化写入是内存写入的26倍,读取是77倍

========测试1000000(百万)条数据=================

1.内存Queue插入和排空数据所耗时间
 填充 1000000 条数据耗时: 699.105888 毫秒,单条耗时: 699.105888 纳秒
 排空 1000000 条数据耗时: 158.792281 毫秒,单条耗时: 158.792281 纳秒
2.持久化Queue插入和排空数据所耗时间
 填充 1000000 条数据耗时: 11978.132218 毫秒,单条耗时: 0.011978132218 豪秒
 排空 1000000 条数据耗时: 22355.617205 毫秒,单条耗时: 22.355617204999998 豪秒

 持久化写入是内存写入的17倍,读取是141倍

 

========测试10000000(千万)条数据=================

1.内存Queue插入和排空数据所耗时间
 填充 10000000 条数据耗时: 9678.377046 毫秒,单条耗时: 967.8377046 纳秒
 排空 10000000 条数据耗时: 1473.416825 毫秒,单条耗时: 147.3416825 纳秒
2.持久化Queue插入和排空数据所耗时间
 填充 10000000 条数据耗时: 151177.036391 毫秒,单条耗时: 0.0151177036391 豪秒
 排空 10000000 条数据耗时: 361642.655135 毫秒,单条耗时: 36.164265513500006 豪秒

 持久化写入是内存写入的15倍,读取是245倍

可以看出写入和遍历一条都是在毫秒级别,还有千万级的数据,BDB的性能着实牛逼.而且随着数据的增多,写的时间在缩短,读的时间在增长.

 

 

 

jar包:

转载地址:http://kwpzl.baihongyu.com/

你可能感兴趣的文章