BoundedBuffer.java
package ch.hslu.exercises.sw06.ex4;
import java.util.ArrayDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* Puffer nach dem First In First Out Prinzip mit einer begrenzten Kapazität.
* Der Puffer ist thread sicher.
*
* @param <T> Elememente des Buffers
*/
public final class BoundedBuffer<T> implements Buffer<T> {
private final ArrayDeque<T> queue;
private final Semaphore putSema;
private final Semaphore takeSema;
/**
* Erzeugt einen Puffer mit bestimmter Kapazität.
*
* @param n Kapazität des Puffers
*/
public BoundedBuffer(final int n) {
queue = new ArrayDeque<>(n);
putSema = new Semaphore(n);
takeSema = new Semaphore(0);
}
@Override
public void add(final T elem) throws InterruptedException {
putSema.acquire();
addSynchronized(elem);
}
@Override
public T remove() throws InterruptedException {
takeSema.acquire();
return removeSynchronized();
}
@Override
public boolean add(T elem, long millis) throws InterruptedException {
if (putSema.tryAcquire(millis, TimeUnit.MILLISECONDS)) {
addSynchronized(elem);
return true;
} else {
return false;
}
}
@Override
public T remove(long millis) throws InterruptedException {
if (takeSema.tryAcquire(millis, TimeUnit.MILLISECONDS)) {
return removeSynchronized();
} else {
return null;
}
}
public void addSynchronized(T elem) {
synchronized (queue) {
queue.addFirst(elem);
}
takeSema.release();
}
private T removeSynchronized() {
T elem;
synchronized (queue) {
elem = queue.removeLast();
}
putSema.release();
return elem;
}
@Override
public boolean empty() {
return takeSema.availablePermits() == 0;
}
@Override
public boolean full() {
return putSema.availablePermits() == 0;
}
@Override
public int size() {
return takeSema.availablePermits();
}
}