I have a Producer/Consumer situation where producer produces domains for the consumer to visit. The Consumer sends a https request and grabs the links from the page and submits them back to the producer. When the Producer finishes, the consumer does not and hangs on the final domain. I cannot for the life of my figure out why this is happening.
I have simplified my question
Main:
public class Main {
public static void main(String[] args) throws InterruptedException {
try
{
Broker broker = new Broker();
ExecutorService threadPool = Executors.newFixedThreadPool(3);
threadPool.execute(new Consumer(broker));
threadPool.execute(new Consumer(broker));
Future producerStatus = threadPool.submit(new Producer(broker));
// this will wait for the producer to finish its execution.
producerStatus.get();
threadPool.shutdown();
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
Broker:
public class Broker {
private BlockingQueue<String> QUEUE = new LinkedBlockingQueue<String>();
public Boolean continueProducing = Boolean.TRUE;
public void put(String data) throws InterruptedException
{
this.QUEUE.put(data);
}
public String get() throws InterruptedException
{
//return this.queue.poll(1, TimeUnit.SECONDS);
return this.QUEUE.take();
}
}
Consumer:
public class Consumer implements Runnable{
private Broker broker;
public Consumer(Broker broker) {
this.broker = broker;
}
@Override
public void run() {
try {
String data = broker.get();
while (broker.continueProducing || data != null)
{
Thread.sleep(1000);
System.out.println("Consumer " + Thread.currentThread().getName() + " processed data from broker: " + data);
data = broker.get();
}
System.out.println("Comsumer " + Thread.currentThread().getName() + " finished its job; terminating.");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}
Producer:
public class Producer implements Runnable{
private Broker broker;
public Producer(Broker broker) {
this.broker = broker;
}
@Override
public void run() {
try
{
for (int i = 0; i < 2; ++i) {
System.out.println("Producer produced: " + "https://example" + i + ".com");
Thread.sleep(100);
broker.put("https://example" + i + ".com");
}
//broker.put("https://example.com/2");
this.broker.continueProducing = Boolean.FALSE;
System.out.println("Producer finished its job; terminating.");
}catch(Exception e)
{
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
Updated answer:
When I run your code, the consumer gets stuck on the line
data = broker.get()
. The broker is calling theBlockingQueue.take
method. Here's the Javadoc for this method (emphasis mine):That means that even if the producer isn't producing anything, the consumer will still wait for something to be produced.
One possible solution for you would be to use a "poison pill" approach. Assuming that you only ever have one producer, your
Broker
class could look like this:Answer for previous code:
It would be great if you could narrow the scope of this question so that it included only the minimum amount of code to get the deadlock. As it currently is, there's a lot of code that you're posting that is not relevant, and there's some code that is relevant that you're not posting.
Additionally, there are a lot of problems with your current code. Your
toLinkedHashSet
method does not compile. In your add method, you're calling theBlockingQueue.put
method even though yourBlockingQueue
should never hit its limit. You claim to want Ο(1) time forcontains
, but your code has Ο(n) time. You also seem to be doing a lot of unnecessary copying in youraddAll
andcontains
methods.There's not enough information here for me to know what the problem is, but one thing that could be causing your problem is in your
get
method. If the consumer thread is interrupted, then yourget
method will cause it to uninterrupt itself (which probably wouldn't lead to a deadlock, but could look like one). In Java, it's very rarely acceptable to ignore an exception. If your call to thetake
method throws anInterruptedException
, it's for a reason: another thread wants the current thread to stop. Yourget
method should throwInterruptedException
. For example:If you really need the
get
method to not throw anInterruptedException
, you could throw some other chained exception containing theInterruptedException
. If it's really appropriate to return""
on interruption, you could do something like this:By interrupting the current thread, you are making sure that at least the current thread is marked as interrupted, so something down the line could deal with it. But throwing
InterruptedException
is probably most appropriate if possible.I still don't understand why you're creating your own wrapper for
LinkedBlockingQueue
, as opposed to just using aLinkedBlockingQueue
on its own. It seems like everything you're adding on top ofLinkedBlockingQueue
is doing nothing but slowing it down.