Adverts

A SynchronousQueue is unlike the other implementations of BlockingQueue in that it isn't a queue of tasks to be carried out it's a queue of consumer threads waiting to process work. With a SynchronousQueue a producer attempts to put an item on the queue and blocks until there is a consumer ready to take the item from the queue. As in a regular queue consumers block on an empty queue.

SynchronousQueue is useful when the producer wants or needs to know when a consumer starts processing the task (or at least accepts it). The producer can be sure that the task has been taken by a consumer as soon as it unblocks from the call to put.

For best performance with a SynchronousQueue there should (nearly) always be a consumer waiting to take the task proffered by a producer. If there isn't generally a consumer waiting then the production speed is throttled by the arrival rate of consumers. This situation can easily arise when task production rate is intrinsically bursty. In such a situation the producer might be able to produce 100 tasks in a second and then none for an hour. If a SynchronousQueue is used the producer will have to wait after every task production. With one of the other BlockingQueue implementations however you could throw the tasks into a buffer.

If you need a task hand-off design where one thread needs to sync with another SynchronousQueue is the obvious choice. The code below exercises SynchronousQueue simply handing messages around.

package com.crazysquirrel;

import java.util.Random;
import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueExample {

 
private final SynchronousQueue<WorkItem> queue = new SynchronousQueue<WorkItem>();
 
private final int consumerThreads = 5;
 
private final int producerThreads = 5;

 
public SynchronousQueueExample() {
   
System.out.println( "Starting..." );

   
// Start the consumers.
   
for( int i = 0; i < consumerThreads; i++ ) {
     
Consumer c = new Consumer( queue );
      c.setName
( "Consumer " + i );
      c.start
();
   
}

   
// Start the producers.
   
for( int i = 0; i < producerThreads; i++ ) {
     
Producer p = new Producer( queue );
      p.setName
( "Producer " + i );
      p.start
();
   
}
  }

 
/**
   *
@param args
   */
 
public static void main( String[] args ) {
   
new SynchronousQueueExample();
 
}

 
private static class Consumer extends Thread {

   
private final SynchronousQueue<WorkItem> queue;

   
public Consumer( SynchronousQueue<WorkItem> queue ) {
     
this.queue = queue;
   
}

   
public void run() {
     
Random r = new Random();
     
while( !Thread.currentThread().isInterrupted() ) {
       
WorkItem item = null;
       
try {
         
item = queue.take();
          System.out.println
( Thread.currentThread().getName() + " consuming: " + item );
          Thread.sleep
( r.nextInt( 1000 ) );
       
} catch( InterruptedException inte ) {

         
Thread.currentThread().interrupt();
       
}
      }
    }
  }

 
private static class Producer extends Thread {
   
private static final String[] DATA = { "a", "b", "c", "d", "e", "f", "g", "h",
     
"i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w",
     
"x", "y", "z" };

   
private final SynchronousQueue<WorkItem> queue;

   
public Producer( SynchronousQueue<WorkItem> queue ) {
     
this.queue = queue;
   
}

   
public void run() {
     
Random r = new Random();
     
while( !Thread.currentThread().isInterrupted() ) {
       
String data = DATA[ r.nextInt( DATA.length ) ];
        WorkItem item =
new WorkItem( data + "(" + Thread.currentThread().getName() + ")" );
       
try {
         
Thread.sleep( r.nextInt( 5000 ) );
          System.out.println
( Thread.currentThread().getName() + " queuing: " + item );
          queue.put
( item );
       
} catch( InterruptedException inte ) {

         
Thread.currentThread().interrupt();
       
}
      }
    }
  }

 
private static class WorkItem {

   
private final String message;

   
public WorkItem( String message ) {
     
this.message = message;
   
}

   
public String toString() {
     
return message;
   
}
  }

}

Adverts

Donate and Help

Please support this site and
Bandwidth doesn't grow on trees y' know :o)

Adverts

Get Adsense