同时开10个线程存入和取出100万的数据,结论如下:
DoubleBufferedQueue < ConcurrentLinkedQueue < ArrayBlockingQueue < LinkedBlockingQueue
执行结果如下:100万 DoubleBufferedQueue入队时间:9510 出队时间:10771
100万 DoubleBufferedQueue入队时间:8169 出队时间:97891000万 DoubleBufferedQueue入队时间:98285 出队时间:1010881000万 DoubleBufferedQueue入队时间:101859 出队时间:105964100万 ConcurrentLinkedQueue入队时间:10557 出队时间:13716
100万 ConcurrentLinkedQueue入队时间:25298 出队时间:253321000万 ConcurrentLinkedQueue队列时间:121868 出队时间:1361161000万 ConcurrentLinkedQueue队列时间:134306 出队时间:147893100万 ArrayBlockingQueue入队时间:21080 出队时间:22025
100万 ArrayBlockingQueue入队时间:17689 出队时间:196541000万 ArrayBlockingQueue入队时间:194400 出队时间:2059681000万 ArrayBlockingQueue入队时间:192268 出队时间:197982100万 LinkedBlockingQueue入队时间:38236 出队时间:52555
100万 LinkedBlockingQueue入队时间:30646 出队时间:385731000万 LinkedBlockingQueue入队时间:375669 出队时间:3919761000万 LinkedBlockingQueue入队时间:701363 出队时间:711217
doubleBufferedQueue:
package test.MoreThread.d;import java.util.ArrayList;import java.util.concurrent.Callable;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import test.MoreThread.l.linkedBlockingQueue;import comrt.util.DoubleBufferedQueue;//DoubleBufferedQueue入队时间:9510 出队时间:10771//DoubleBufferedQueue入队时间:8169 出队时间:9789public class doubleBufferedQueue { private static final Logger log = LoggerFactory .getLogger(doubleBufferedQueue.class); public final static int size1 = 1000000; public static DoubleBufferedQueue
concurrentLinkedQueue:
package test.MoreThread.c;import java.util.ArrayList;import java.util.concurrent.Callable;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import org.slf4j.Logger;import org.slf4j.LoggerFactory;//ConcurrentLinkedQueue入队时间:10557 出队时间:13716//ConcurrentLinkedQueue入队时间:25298 出队时间:25332public class concurrentLinkedQueue { private static final Logger log = LoggerFactory .getLogger(concurrentLinkedQueue.class); public static ConcurrentLinkedQueuequeue = new ConcurrentLinkedQueue (); public final static int size1 = 1000000; public final static int threadNumber = 10; public static boolean isOver = false; public static void main(String[] args) throws InterruptedException, ExecutionException { // long timestart = System.currentTimeMillis(); Thread thread1 = new Thread(new Runnable() { public void run() { ExecutorService executorService = Executors .newFixedThreadPool(threadNumber); ArrayList > results = new ArrayList >(); for (int i = 0; i < threadNumber; i++) { Future future = executorService.submit(new Exec()); results.add(future); } long allTime = 0; for (Future fs : results) { try { allTime += fs.get();// log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService.shutdown(); } } concurrentLinkedQueue.isOver = true; log.info("队列总共执行时间:" + allTime); } }); thread1.start(); // ------------------------------ Thread thread2 = new Thread(new Runnable() { public void run() { ExecutorService executorService2 = Executors .newFixedThreadPool(threadNumber); ArrayList > results_out = new ArrayList >(); for (int i = 0; i < threadNumber; i++) { Future future = executorService2 .submit(new Exec_Out()); results_out.add(future); } long allTime_out = 0; for (Future fs : results_out) { try { allTime_out += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService2.shutdown(); } } log.info("出队列总共执行时间:" + allTime_out); } }); thread2.start(); // log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart)); }}class Exec implements Callable { private static final Logger log = LoggerFactory .getLogger(concurrentLinkedQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); for (int i = 0; i < concurrentLinkedQueue.size1; i++) { concurrentLinkedQueue.queue.offer(i); } long time2 = System.currentTimeMillis() - time;// log.info("执行时间:" + time2); return time2; }}class Exec_Out implements Callable { private static final Logger log = LoggerFactory .getLogger(concurrentLinkedQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); while (!concurrentLinkedQueue.isOver) { concurrentLinkedQueue.queue.poll(); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; }}
arrayBlockingQueue:
package test.MoreThread.a;import java.util.ArrayList;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import org.slf4j.Logger;import org.slf4j.LoggerFactory;//ArrayBlockingQueue入队时间:21080 出队时间:22025//ArrayBlockingQueue入队时间:17689 出队时间:19654public class arrayBlockingQueue { private static final Logger log = LoggerFactory .getLogger(arrayBlockingQueue.class); public final static int size1 = 1000000; public static ArrayBlockingQueuequeue = new ArrayBlockingQueue ( size1); public final static int threadNumber = 10; public static boolean isOver = false; public static void main(String[] args) throws InterruptedException, ExecutionException { // long timestart = System.currentTimeMillis(); Thread thread1 = new Thread(new Runnable() { public void run() { ExecutorService executorService = Executors .newFixedThreadPool(threadNumber); ArrayList > results = new ArrayList >(); for (int i = 0; i < threadNumber; i++) { Future future = executorService .submit(new ExecArrayBlockingQueue()); results.add(future); } long allTime = 0; for (Future fs : results) { try { allTime += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService.shutdown(); } } arrayBlockingQueue.isOver = true; log.info("队列总共执行时间:" + allTime); } }); thread1.start(); // log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart)); // ------------------------------ Thread thread2 = new Thread(new Runnable() { public void run() { ExecutorService executorService2 = Executors .newFixedThreadPool(threadNumber); ArrayList > results_out = new ArrayList >(); for (int i = 0; i < threadNumber; i++) { Future future = executorService2 .submit(new ExecArrayBlockingQueue_Out()); results_out.add(future); } long allTime_out = 0; for (Future fs : results_out) { try { allTime_out += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService2.shutdown(); } } log.info("出队列总共执行时间:" + allTime_out); } }); thread2.start(); }}class ExecArrayBlockingQueue implements Callable { private static final Logger log = LoggerFactory .getLogger(arrayBlockingQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); for (int i = 0; i < arrayBlockingQueue.size1; i++) { arrayBlockingQueue.queue.offer(i); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; }}class ExecArrayBlockingQueue_Out implements Callable { private static final Logger log = LoggerFactory .getLogger(arrayBlockingQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); while (!arrayBlockingQueue.isOver) { arrayBlockingQueue.queue.poll(); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; }}
linkedBlockingQueue:
package test.MoreThread.l;import java.util.ArrayList;import java.util.Vector;import java.util.concurrent.Callable;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.LinkedBlockingQueue;import org.slf4j.Logger;import org.slf4j.LoggerFactory;//LinkedBlockingQueue入队时间:38236 出队时间:52555//LinkedBlockingQueue入队时间:30646 出队时间:38573public class linkedBlockingQueue { private static final Logger log = LoggerFactory .getLogger(linkedBlockingQueue.class); public final static int size1 = 1000000; public static LinkedBlockingQueuequeue = new LinkedBlockingQueue ( size1); public final static int threadNumber = 10; public static boolean isOver = false; public static void main(String[] args) throws InterruptedException, ExecutionException { long timestart = System.currentTimeMillis(); Thread thread1 = new Thread(new Runnable() { public void run() { ExecutorService executorService = Executors .newFixedThreadPool(threadNumber); ArrayList > results = new ArrayList >(); for (int i = 0; i < threadNumber; i++) { Future future = executorService .submit(new ExecLinkedBlockingQueue()); results.add(future); } long allTime = 0; for (Future fs : results) { try { allTime += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService.shutdown(); } } linkedBlockingQueue.isOver = true; log.info("入队列总共执行时间:" + allTime); } }); thread1.start(); // log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart));// System.out.println(linkedBlockingQueue.queue.size()); // ------------------------------ Thread thread2 = new Thread(new Runnable() { public void run() { ExecutorService executorService2 = Executors .newFixedThreadPool(threadNumber); ArrayList > results_out = new ArrayList >(); for (int i = 0; i < threadNumber; i++) { Future future = executorService2 .submit(new ExecLinkedBlockingQueue_Out()); results_out.add(future); } long allTime_out = 0; for (Future fs : results_out) { try { allTime_out += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService2.shutdown(); } } log.info("出队列总共执行时间:" + allTime_out); } }); thread2.start(); }}class ExecLinkedBlockingQueue implements Callable { private static final Logger log = LoggerFactory .getLogger(linkedBlockingQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); for (int i = 0; i < linkedBlockingQueue.size1; i++) { linkedBlockingQueue.queue.offer(i); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; }}class ExecLinkedBlockingQueue_Out implements Callable { private static final Logger log = LoggerFactory .getLogger(linkedBlockingQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); while (!linkedBlockingQueue.isOver) { linkedBlockingQueue.queue.poll(); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; }}
DoubleBufferedQueue双缓冲队列
package comrt.util;import java.util.AbstractQueue;import java.util.Collection;import java.util.Iterator;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;import org.slf4j.Logger;import org.slf4j.LoggerFactory;//双缓冲队列,线程安全public class DoubleBufferedQueueextends AbstractQueue implements BlockingQueue , java.io.Serializable { private static final long serialVersionUID = 1011398447523020L; public static final int DEFAULT_QUEUE_CAPACITY = 5000000; public static final long DEFAULT_MAX_TIMEOUT = 0; public static final long DEFAULT_MAX_COUNT = 10; private Logger logger = LoggerFactory.getLogger(DoubleBufferedQueue.class.getName()); /** The queued items */ private ReentrantLock readLock; // 写锁 private ReentrantLock writeLock; // 是否满 private Condition notFull; private Condition awake; // 读写数组 private transient E[] writeArray; private transient E[] readArray; // 读写计数 private volatile int writeCount; private volatile int readCount; // 写数组下标指针 private int writeArrayTP; private int writeArrayHP; // 读数组下标指针 private int readArrayTP; private int readArrayHP; private int capacity; public DoubleBufferedQueue(int capacity) { // 默认 this.capacity = DEFAULT_QUEUE_CAPACITY; if (capacity > 0) { this.capacity = capacity; } readArray = (E[]) new Object[capacity]; writeArray = (E[]) new Object[capacity]; readLock = new ReentrantLock(); writeLock = new ReentrantLock(); notFull = writeLock.newCondition(); awake = writeLock.newCondition(); } private void insert(E e) { writeArray[writeArrayTP] = e; ++writeArrayTP; ++writeCount; } private E extract() { E e = readArray[readArrayHP]; readArray[readArrayHP] = null; ++readArrayHP; --readCount; return e; } /** * switch condition: read queue is empty && write queue is not empty * * Notice:This function can only be invoked after readLock is grabbed,or may * cause dead lock * * @param timeout * @param isInfinite * : whether need to wait forever until some other thread awake * it * @return * @throws InterruptedException */ private long queueSwap(long timeout, boolean isInfinite) throws InterruptedException { writeLock.lock(); try { if (writeCount <= 0) { // logger.debug("Write Count:" + writeCount // + ", Write Queue is empty, do not switch!"); try { // logger.debug("Queue is empty, need wait...."); if (isInfinite && timeout <= 0) { awake.await(); return -1; } else if (timeout > 0) { return awake.awaitNanos(timeout); } else { return 0; } } catch (InterruptedException ie) { awake.signal(); throw ie; } } else { E[] tmpArray = readArray; readArray = writeArray; writeArray = tmpArray; readCount = writeCount; readArrayHP = 0; readArrayTP = writeArrayTP; writeCount = 0; writeArrayHP = readArrayHP; writeArrayTP = 0; notFull.signal(); // logger.debug("Queue switch successfully!"); return 0; } } finally { writeLock.unlock(); } } @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) { throw new NullPointerException(); } long nanoTime = 0; if (timeout > 0) { nanoTime = unit.toNanos(timeout); } writeLock.lockInterruptibly(); try { for (int i = 0; i < DEFAULT_MAX_COUNT; i++) { if (writeCount < writeArray.length) { insert(e); if (writeCount == 1) { awake.signal(); } return true; } // Time out if (nanoTime <= 0) { // logger.debug("offer wait time out!"); return false; } // keep waiting try { // logger.debug("Queue is full, need wait...."); nanoTime = notFull.awaitNanos(nanoTime); } catch (InterruptedException ie) { notFull.signal(); throw ie; } } } finally { writeLock.unlock(); } return false; } // 取 @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanoTime = 0; if (timeout > 0) { nanoTime = unit.toNanos(timeout); } readLock.lockInterruptibly(); try { if (nanoTime > 0) { for (int i = 0; i < DEFAULT_MAX_COUNT; i++) { if (readCount > 0) { return extract(); } if (nanoTime <= 0) { // logger.debug("poll time out!"); return null; } nanoTime = queueSwap(nanoTime, false); } } else { if (readCount > 0) { return extract(); } queueSwap(nanoTime, false); if (readCount > 0) { return extract(); } } } finally { readLock.unlock(); } return null; } // 等待500毫秒 @Override public E poll() { E ret = null; try { ret = poll(DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS); } catch (Exception e) { ret = null; } return ret; } // 查看 @Override public E peek() { E e = null; readLock.lock(); try { if (readCount > 0) { e = readArray[readArrayHP]; } } finally { readLock.unlock(); } return e; } // 默认500毫秒 @Override public boolean offer(E e) { boolean ret = false; try { ret = offer(e, DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS); } catch (Exception e2) { ret = false; } return ret; } @Override public void put(E e) throws InterruptedException { // never need to // block offer(e, DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS); } @Override public E take() throws InterruptedException { return poll(DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS); } @Override public int remainingCapacity() { return this.capacity; } @Override public int drainTo(Collection c) { return 0; } @Override public int drainTo(Collection c, int maxElements) { return 0; } @Override public Iterator iterator() { return null; } // 当前读队列中还有多少个 @Override public int size() { int size = 0; readLock.lock(); try { size = readCount; } finally { readLock.unlock(); } return size; } /** * 当前已写入的队列大小 * */ public int WriteSize() { int size = 0; writeLock.lock(); try { size = writeCount; } finally { writeLock.unlock(); } return size; } public int unsafeReadSize() { return readCount; } public int unsafeWriteSize() { return writeCount; } public int capacity() { return capacity; } public String toMemString() { return "--read: " + readCount + "/" + capacity + "--write: " + writeCount + "/" + capacity; } // 清理 /* * public void clear() { readLock.lock(); writeLock.lock(); try { readCount * = 0; readArrayHP = 0; writeCount = 0; writeArrayTP = 0; * //logger.debug("Queue clear successfully!"); } finally { * writeLock.unlock(); readLock.unlock(); } } */}