Blocking Queue and several Consumers



  • The problem of the implementation of Consumer/Producer with several Consumers was a matter of concern; it would be interesting to see the possibility of sending a message to one Producer and the possibility of reading from the queue of many Workers. I'd say the only problem is that I don't know how to make the right mix of classes.

    In the main line, I decided to leave only two challenges:

    public static void main(String[] args) {
            CloudQueue sqs = new CloudQueue(5);
            for(int i=0; i<10; ++i){
                sqs.send("Data");
            }
        }
    

    Accordingly, the object CloudQueue implementing some setup operations, as well as operations, are creating a high priority for the subprogrammes to communicate. I understand everything is going to be created. god-objectI'll try to eliminate it.

    class CloudQueue{
        CloudQueueService queue = new CloudQueueService();
        public CloudQueue(int queues){
        }
        public void send(String s){
            queue.push(s);
        }
    }
    

    CloudQueue in turn creates a copy CloudQueueServicethe aim of which is to attempt to make itself known producerqueueconsumers;

    class CloudQueueService{
        private BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
        private WorkerPool pool;
        public CloudQueueService(){
            WorkerPool pool = new WorkerPool(this.queue);
        }
        public void push(String s){
            queue.add(s);
        }
    }
    

    CloudQueueService It is already directly setting up the line, as well as the worker pool, which should directly extract data from and work with them.

    Examples of the WorkersPool and Worker code; I can't say much about them.

    class WorkerPool{
        public WorkerPool(BlockingQueue<String> q){
            ExecutorService executor = Executors.newFixedThreadPool(5);
            for(int i=0; i<5; i++){
                Runnable worker = new Worker(q);
                executor.execute(worker);
            }
            executor.shutdown();
            while (!executor.isTerminated()){
    
        }
        System.out.println("Done");
    }
    

    }

    class Worker implements Runnable{
    private BlockingQueue<String> queue;
    public Worker(BlockingQueue<String> queue){
    this.queue = queue;
    }
    @Override
    public void run(){
    try{
    System.out.println(Thread.currentThread().getName() + "->" + this.queue.take());
    }catch (Exception e){
    e.printStackTrace();
    }

    }
    

    }

    So, what kind of trouble have I encountered?

    • It's a weak performance. structure of this decision. In the main line, I want to get rid of the system details as much as possible, as in the future. CloudQueue it would be difficult to meet, so the user would like to leave only the methods. sendget and initialization.

    • The most likely misconception of the principles of flow management is that Producer should receive some kind of data. I can't think of a point of entry.



  • As I understand, you want to create a patharina where consumers are to be created immediately (the classes of consumers can and will be transferred to consumer-producer pathterne) and their number is determined in advance. I'm suggesting this NABOSOC, it needs to be refined for your purpose.

    public class Test {
        private static boolean runningFlag = true;
        private static final int CONSUMERS_NUMBER = 5;
        private static ExecutorService executor = Executors.newFixedThreadPool(CONSUMERS_NUMBER);
        private static BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
        public final static Test INSTANCE = new Test();
        private Test(){
            startConsumers();
        }
    
    public static void startConsumers(){
        for(int worker = 0; worker &lt; CONSUMERS_NUMBER; worker++){
            executor.submit(()-&gt;{
                try {
                    while(runningFlag) {
                        String job = queue.take();
                        Thread.sleep((int)(Math.random()*1000));// do some job
                        System.out.println("Job is done:"+job);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
    
    public static void stopConsumers(){
        runningFlag = false;
        executor.shutdown();
    }
    
    public static void sendJob(String someJob) throws InterruptedException {
        queue.put(someJob); // или add и т.п.
    }
    
    public static void main(String[] args) throws InterruptedException {
        for(int i = 100; i&gt; 0; i--){
            Test.INSTANCE.sendJob("sf"+i);
        }
        Test.stopConsumers();
    }
    

    }




Suggested Topics

  • 2
  • 2
  • 2
  • 2
  • 2
  • 2
  • 2
  • 2
  • 2
  • 2
  • 2
  • 2
  • 2
  • 2
  • 2