diff --git a/scm-core/src/main/java/sonia/scm/collect/IterableQueue.java b/scm-core/src/main/java/sonia/scm/collect/IterableQueue.java
index 353ccf919b..9d3c85bb90 100644
--- a/scm-core/src/main/java/sonia/scm/collect/IterableQueue.java
+++ b/scm-core/src/main/java/sonia/scm/collect/IterableQueue.java
@@ -47,11 +47,12 @@ import java.util.Iterator;
import java.util.List;
/**
- * A iterable queue. The queue can have multiple consumer {@link Iterator},
- * which can iterate over all items of the queue until the end of the queue is
- * reached. The end of the queue if reached, if a producer call the method
- * {@link #endReached()} and the iterator has consumed all items of the backend
- * list.
+ * A iterable queue. The queue can have multiple parallel consumer
+ * {@link Iterator}s, which can iterate over all items of the queue until the
+ * end of the queue is reached. The end of the queue if reached, if a producer
+ * call the method {@link #endReached()} and the iterator has consumed all items
+ * of the backend list. Warning: The queue iterator blocks
+ * forever if the producer never call {@link #endReached()}.
*
* @author Sebastian Sdorra
*
diff --git a/scm-core/src/main/java/sonia/scm/collect/QueueIterator.java b/scm-core/src/main/java/sonia/scm/collect/QueueIterator.java
index 10b3bf8a78..d4f5f4954a 100644
--- a/scm-core/src/main/java/sonia/scm/collect/QueueIterator.java
+++ b/scm-core/src/main/java/sonia/scm/collect/QueueIterator.java
@@ -33,15 +33,18 @@
package sonia.scm.collect;
-//~--- JDK imports ------------------------------------------------------------
+//~--- non-JDK imports --------------------------------------------------------
import com.google.common.collect.UnmodifiableIterator;
+
+//~--- JDK imports ------------------------------------------------------------
+
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
/**
- * Iterator for the {@link IterableQueue}. The {@link QueueIterator} should
- * only be created from the {@link IterableQueue} by calling the
+ * Iterator for the {@link IterableQueue}. The {@link QueueIterator} should
+ * only be created from the {@link IterableQueue} by calling the
* {@link IterableQueue#iterator()}.
*
* @author Sebastian Sdorra
@@ -67,28 +70,30 @@ public final class QueueIterator extends UnmodifiableIterator
/**
* Returns the next item in the queue. This method will block until the next
- * item is pushed to the queue, if the queue is empty and the end is not
+ * item is pushed to the queue, if the queue is empty and the end is not
* reached.
*
* @throws NoSuchElementException if the iteration has no more elements
- *
+ *
* @return the next item in the queue
*/
@Override
public T next()
{
- if ( ! hasNext() ){
+ if (!hasNext())
+ {
throw new NoSuchElementException("no more items in the queue");
}
+
return queue.get(index++);
}
//~--- get methods ----------------------------------------------------------
/**
- * Returns {@code true} {@code true} if the queue has more items.
- * This method will block until the next item is pushed to the queue, if the
- * queue is empty and the end is not
+ * Returns {@code true} {@code true} if the queue has more items.
+ * This method will block until the next item is pushed to the queue, if the
+ * queue is empty and the end is not
* reached.
*
*
@@ -122,7 +127,6 @@ public final class QueueIterator extends UnmodifiableIterator
return result;
}
-
//~--- fields ---------------------------------------------------------------
/** queue for the iterator */
diff --git a/scm-core/src/test/java/sonia/scm/collect/IterableQueueTest.java b/scm-core/src/test/java/sonia/scm/collect/IterableQueueTest.java
new file mode 100644
index 0000000000..ceadfe1aea
--- /dev/null
+++ b/scm-core/src/test/java/sonia/scm/collect/IterableQueueTest.java
@@ -0,0 +1,266 @@
+/**
+ * Copyright (c) 2010, Sebastian Sdorra All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer. 2. Redistributions in
+ * binary form must reproduce the above copyright notice, this list of
+ * conditions and the following disclaimer in the documentation and/or other
+ * materials provided with the distribution. 3. Neither the name of SCM-Manager;
+ * nor the names of its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR
+ * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * http://bitbucket.org/sdorra/scm-manager
+ *
+ */
+
+
+
+package sonia.scm.collect;
+
+//~--- non-JDK imports --------------------------------------------------------
+
+import com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+//~--- JDK imports ------------------------------------------------------------
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ *
+ * @author Sebastian Sdorra
+ */
+public class IterableQueueTest
+{
+
+ /**
+ * Method description
+ *
+ */
+ @Test(expected = IllegalStateException.class)
+ public void testDuplicatedEndReached()
+ {
+ IterableQueue queue = new IterableQueue();
+
+ queue.endReached();
+ queue.endReached();
+ }
+
+ /**
+ * Method description
+ *
+ */
+ @Test
+ public void testIterator()
+ {
+ IterableQueue queue = new IterableQueue();
+
+ assertEquals(QueueIterator.class, queue.iterator().getClass());
+ queue.endReached();
+ assertNotEquals(QueueIterator.class, queue.iterator().getClass());
+ }
+
+ /**
+ * Method description
+ *
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMultiThreaded() throws Exception
+ {
+ testMultiThreaded(5, 10, false, 1000);
+ }
+
+ /**
+ * Method description
+ *
+ *
+ * @throws ExecutionException
+ * @throws InterruptedException
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMultiThreadedWithRandomSleep() throws Exception
+ {
+ testMultiThreaded(5, 10, true, 50);
+ }
+
+ /**
+ * Method description
+ *
+ */
+ @Test(expected = IllegalStateException.class)
+ public void testPushEndReached()
+ {
+ IterableQueue queue = new IterableQueue();
+
+ queue.push("a");
+ queue.endReached();
+ queue.push("b");
+ }
+
+ /**
+ * Method description
+ *
+ */
+ @Test
+ public void testSingleConsumer()
+ {
+ final IterableQueue queue = new IterableQueue();
+
+ new Thread(new IntegerProducer(queue, false, 100)).start();
+ assertResult(Lists.newArrayList(queue), 100);
+ }
+
+ /**
+ * Method description
+ *
+ *
+ * @param result
+ * @param itemCount
+ */
+ private void assertResult(List result, int itemCount)
+ {
+ assertNotNull(result);
+ assertEquals(itemCount, result.size());
+
+ for (int c = 0; c < itemCount; c++)
+ {
+ assertEquals(Integer.valueOf(c), result.get(c));
+ }
+ }
+
+ /**
+ * Method description
+ *
+ *
+ * @param threads
+ * @param consumer
+ * @param randomSleep
+ * @param itemCount
+ *
+ * @throws Exception
+ */
+ private void testMultiThreaded(int threads, int consumer,
+ boolean randomSleep, int itemCount)
+ throws Exception
+ {
+ ExecutorService executor = Executors.newFixedThreadPool(threads);
+ List>> futures = Lists.newArrayList();
+
+ final IterableQueue queue = new IterableQueue();
+
+ for (int i = 0; i < consumer; i++)
+ {
+ Future> future =
+ executor.submit(new CallableQueueCollector(queue));
+
+ futures.add(future);
+ }
+
+ new Thread(new IntegerProducer(queue, randomSleep, itemCount)).start();
+
+ for (Future> f : futures)
+ {
+ assertResult(f.get(), itemCount);
+ }
+ }
+
+ //~--- inner classes --------------------------------------------------------
+
+ /**
+ * Class description
+ *
+ *
+ * @version Enter version here..., 13/03/01
+ * @author Enter your name here...
+ */
+ private static class IntegerProducer implements Runnable
+ {
+
+ /**
+ * Constructs ...
+ *
+ *
+ * @param queue
+ * @param randomSleep
+ * @param itemCount
+ */
+ public IntegerProducer(IterableQueue queue, boolean randomSleep,
+ int itemCount)
+ {
+ this.queue = queue;
+ this.randomSleep = randomSleep;
+ this.itemCount = itemCount;
+ }
+
+ //~--- methods ------------------------------------------------------------
+
+ /**
+ * Method description
+ *
+ */
+ @Override
+ public void run()
+ {
+ Random r = new Random();
+
+ for (int c = 0; c < itemCount; c++)
+ {
+ if (randomSleep)
+ {
+ try
+ {
+ Thread.sleep(r.nextInt(5));
+ }
+ catch (InterruptedException ex)
+ {
+ throw new RuntimeException("thread interrupted", ex);
+ }
+ }
+
+ queue.push(c);
+ }
+
+ queue.endReached();
+ }
+
+ //~--- fields -------------------------------------------------------------
+
+ /** Field description */
+ IterableQueue queue;
+
+ /** Field description */
+ private int itemCount;
+
+ /** Field description */
+ private boolean randomSleep;
+ }
+}