Backend/JAVA

๋ธ”๋กํ‚นํ(Blocking Queue)

dddzr 2024. 6. 5. 17:53

๐Ÿ” 1. Queue๋ž€?

Queue๋Š” ์ปฌ๋ ‰์…˜ ํ”„๋ ˆ์ž„์›Œํฌ์˜ ์ผ๋ถ€๋กœ, ๋ฐ์ดํ„ฐ๋ฅผ ์„ ์ž…์„ ์ถœ(FIFO, First-In-First-Out) ๋ฐฉ์‹์œผ๋กœ ์ €์žฅํ•˜๊ณ  ๊ด€๋ฆฌํ•˜๋Š” ์ž๋ฃŒ๊ตฌ์กฐ.

 

๐Ÿ” 2. BlockingQueue๋ž€?

Queue๋ฅผ ์ƒ์† ๋ฐ›์•„, ํ์˜ ๊ธฐ๋ณธ์ž‘์—…์— ๋ธ”๋กœํ‚น ์—ฐ์ด ์ถ”๊ฐ€๋œ ์ธํ„ฐํŽ˜์ด์Šค.

๋™๊ธฐํ™”๋œ ๋ฐฉ์‹์œผ๋กœ ์ž‘๋™ํ•˜์—ฌ ์—ฌ๋Ÿฌ ์Šค๋ ˆ๋“œ๊ฐ€ ์•ˆ์ „ํ•˜๊ฒŒ ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ๋‹ค.

BlockingQueue๋Š” ํ๊ฐ€ ๋น„์–ด ์žˆ์„ ๋•Œ ์š”์†Œ๋ฅผ ๊ฐ€์ ธ์˜ค๋ ค๊ณ  ํ•˜๋ฉด ๋Œ€๊ธฐํ•˜๊ณ , ํ๊ฐ€ ๊ฐ€๋“ ์ฐจ ์žˆ์„ ๋•Œ ์š”์†Œ๋ฅผ ์ถ”๊ฐ€ํ•˜๋ ค๊ณ  ํ•˜๋ฉด ๋Œ€๊ธฐํ•œ๋‹ค. (put, take ํ•จ์ˆ˜์—์„œ ๋ธ”๋ฝ๋์„ ๋•Œ ๋‹ค์Œ ์ฝ”๋“œ๊ฐ€ ์‹คํ–‰๋˜์ง€ ์•Š๊ณ  ๋Œ€๊ธฐํ•˜๋Š” ๊ฑธ ๋ณผ ์ˆ˜ ์žˆ๋‹ค!!)

 

*๋ธ”๋กœํ‚น ์—ฐ์‚ฐ

ํŠน์ • ์กฐ๊ฑด์ด ์ถฉ์กฑ๋  ๋•Œ๊นŒ์ง€ ์Šค๋ ˆ๋“œ๋ฅผ ์ผ์‹œ ์ค‘์ง€์‹œํ‚ค๋Š” ์—ฐ์‚ฐ์œผ๋กœ, ์—ฐ์‚ฐ์ด ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ์Šค๋ ˆ๋“œ๋ฅผ ๋Œ€๊ธฐ ์ƒํƒœ๋กœ ๋งŒ๋“ ๋‹ค.

 

๐Ÿ“Œ 3. BlockingQueue์˜ ํŠน์ง•

  • ์Šค๋ ˆ๋“œ ์•ˆ์ „(Thread-safe): ๋‚ด๋ถ€์—์„œ lock์œผ๋กœ ๋™๊ธฐํ™”ํ•˜์—ฌ ์—ฌ๋Ÿฌ ์Šค๋ ˆ๋“œ๊ฐ€ ๋™์‹œ์— ํ์— ์ ‘๊ทผํ•˜๋”๋ผ๋„ ์•ˆ์ „ํ•˜๊ฒŒ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.
  • ๋ธ”๋กœํ‚น ์—ฐ์‚ฐ: ํ๊ฐ€ ๊ฐ€๋“ ์ฐผ์„ ๋•Œ์˜ put ์—ฐ์‚ฐ๊ณผ ํ๊ฐ€ ๋น„์—ˆ์„ ๋•Œ์˜ take ์—ฐ์‚ฐ์ด ๋ธ”๋กœํ‚น๋œ๋‹ค. ์ด๋Ÿฌํ•œ ์—ฐ์‚ฐ๋“ค์€ ํŠน์ • ์กฐ๊ฑด์ด ์ถฉ์กฑ๋  ๋•Œ๊นŒ์ง€ ์Šค๋ ˆ๋“œ๋ฅผ ๋Œ€๊ธฐ(Block)์‹œํ‚จ๋‹ค.
  • ์‹œ๊ฐ„์ œํ•œ: offer(E e, long timeout, TimeUnit unit) ๋ฐ poll(long timeout, TimeUnit unit)๋กœ ๋Œ€๊ธฐ ์‹œ๊ฐ„์„ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋‹ค. ์ง€์ •๋œ ์‹œ๊ฐ„ ๋‚ด์— ์—ฐ์‚ฐ์ด ์™„๋ฃŒ๋˜์ง€ ์•Š์œผ๋ฉด ํƒ€์ž„์•„์›ƒ๊ณผ ํ•จ๊ป˜ ์‹คํŒจํ•œ๋‹ค.

 

๐Ÿ“Œ 4. BlockingQueue์˜ ์ฃผ์š” ๋ฉ”์„œ๋“œ

  • put(E e): ์š”์†Œ๋ฅผ ํ์— ์ถ”๊ฐ€. ํ๊ฐ€ ๊ฐ€๋“ ์ฐจ ์žˆ์œผ๋ฉด ์š”์†Œ๋ฅผ ์ถ”๊ฐ€ํ•  ๋•Œ๊นŒ์ง€ ๋ธ”๋ก. (๋ฌดํ•œ์ • ๋Œ€๊ธฐ)
  • offer(E e, long timeout, TimeUnit unit): ์š”์†Œ๋ฅผ ํ์— ์ถ”๊ฐ€. ํ๊ฐ€ ๊ฐ€๋“ ์ฐจ ์žˆ์œผ๋ฉด ์ง€์ •๋œ ์‹œ๊ฐ„ ๋™์•ˆ ๋ธ”๋ก. (timeOut ์ƒ๋žต ์‹œ ์ฆ‰์‹œ ์‹คํŒจ)
  • take(): ํ์—์„œ ์š”์†Œ๋ฅผ ์ œ๊ฑฐํ•˜๊ณ  ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค. ํ๊ฐ€ ๋น„์–ด ์žˆ์œผ๋ฉด ์š”์†Œ๊ฐ€ ์ถ”๊ฐ€๋  ๋•Œ๊นŒ์ง€ ๋ธ”๋ก.
  • poll(long timeout, TimeUnit unit): ํ์—์„œ ์š”์†Œ๋ฅผ ์ œ๊ฑฐํ•˜๊ณ  ๋ฐ˜ํ™˜. ํ๊ฐ€ ๋น„์–ด ์žˆ์œผ๋ฉด ์ง€์ •๋œ ์‹œ๊ฐ„ ๋™์•ˆ ๋ธ”๋ก.
  • remainingCapacity(): ํ์— ์ถ”๊ฐ€ํ•  ์ˆ˜ ์žˆ๋Š” ์š”์†Œ์˜ ์ตœ๋Œ€ ์ˆ˜๋ฅผ ๋ฐ˜ํ™˜.
  • size(): ํ์— ํ˜„์žฌ ์ €์žฅ๋œ ์š”์†Œ์˜ ์ˆ˜๋ฅผ ๋ฐ˜ํ™˜.
  • isEmpty(): ํ๊ฐ€ ๋น„์–ด ์žˆ๋Š”์ง€ ํ™•์ธ.

 

๐Ÿ“Œ 5. BlockingQueue์˜ ๊ตฌํ˜„์ฒด

โœ… 5-1. ArrayBlockingQueue

๊ณ ์ • ํฌ๊ธฐ์˜ ๋ฐฐ์—ด์„ ๊ธฐ๋ฐ˜์œผ๋กœ ํ•œ ๊ตฌํ˜„. ๊ตฌํ˜„ ์‹œ ํฌ๊ธฐ๋ฅผ ์ง€์ •, ๋ณ€๊ฒฝ ๋ถˆ๊ฐ€.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ArrayBlockingQueueExample {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

        queue.put("first");
        queue.put("second");
        queue.put("third");

        System.out.println(queue.take()); // ์ถœ๋ ฅ: first
        System.out.println(queue.peek()); // ์ถœ๋ ฅ: second
    }
}

 

โœ… 5-2. LinkedBlockingQueue

์—ฐ๊ฒฐ ๋…ธ๋“œ๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ํ•œ ๊ตฌํ˜„. ์„ ํƒ์ ์œผ๋กœ ์ตœ๋Œ€ ํฌ๊ธฐ๋ฅผ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

โญ ๊ฐ€์žฅ ์‰ฝ๊ณ  ๋งŽ์ด ์“ฐ์ด๋Š” ํ˜•ํƒœ๋ผ๊ณ  ํ•œ๋‹ค!!

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class LinkedBlockingQueueExample {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new LinkedBlockingQueue<>();

        queue.put("first");
        queue.put("second");
        queue.put("third");

        System.out.println(queue.take()); // ์ถœ๋ ฅ: first
        System.out.println(queue.peek()); // ์ถœ๋ ฅ: second
    }
}

 

โœ… 5-3. PriorityBlockingQueue

์š”์†Œ๋ฅผ ์šฐ์„ ์ˆœ์œ„์— ๋”ฐ๋ผ ์ €์žฅํ•˜๋Š” ๊ตฌํ˜„.

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class PriorityBlockingQueueExample {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();

        queue.put(3);
        queue.put(1);
        queue.put(2);

        System.out.println(queue.take()); // ์ถœ๋ ฅ: 1
        System.out.println(queue.peek()); // ์ถœ๋ ฅ: 2
    }
}

 

โœ… 5-4. SynchronousQueue

๋‘ ์Šค๋ ˆ๋“œ ๊ฐ„์— ์š”์†Œ๋ฅผ ์ง์ ‘ ์ „๋‹ฌํ•˜๋Š” ๋™๊ธฐํ™”๋œ ํ. ์ด ํ๋Š” ๋ฒ„ํผ๋ฅผ ๊ฐ€์ง€์ง€ ์•Š๊ณ , ํ•˜๋‚˜์˜ ์Šค๋ ˆ๋“œ๊ฐ€ put() ๋ฉ”์„œ๋“œ๋กœ ์š”์†Œ๋ฅผ ์ถ”๊ฐ€ํ•˜๋ฉด ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ๋Š” ์ด ์š”์†Œ๋ฅผ take() ๋ฉ”์„œ๋“œ๋กœ ์ง์ ‘ ๋ฐ›์•„์•ผ ํ•œ๋‹ค. ๋”ฐ๋ผ์„œ ์š”์†Œ๋ฅผ ๋„ฃ์„ ๋•Œ๋Š” ๋ฐ˜๋“œ์‹œ ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ๊ฐ€ ํ•ด๋‹น ์š”์†Œ๋ฅผ ๋ฐ›์„ ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐํ•œ๋‹ค.

import java.util.concurrent.*;

public class SynchronousQueueExample {
    public static void main(String[] args) {
        // SynchronousQueue ์ƒ์„ฑ
        SynchronousQueue<String> queue = new SynchronousQueue<>();

        // Producer ์Šค๋ ˆ๋“œ ์ƒ์„ฑ
        Thread producer = new Thread(() -> {
            try {
                String message = "Hello from producer!";
                System.out.println("Producer is putting: " + message);
                queue.put(message); // ์š”์†Œ ์ถ”๊ฐ€ (๋Œ€๊ธฐ์ƒํƒœ๊ฐ€ ๋จ)
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // Consumer ์Šค๋ ˆ๋“œ ์ƒ์„ฑ
        Thread consumer = new Thread(() -> {
            try {
                String message = queue.take(); // ์š”์†Œ ๊ฐ€์ ธ์˜ค๊ธฐ (๋Œ€๊ธฐ์ƒํƒœ๊ฐ€ ๋จ)
                System.out.println("Consumer received: " + message);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // Producer์™€ Consumer ์Šค๋ ˆ๋“œ ์‹œ์ž‘
        producer.start();
        consumer.start();
    }
}

 

 

๐Ÿ“Œ BlockingQueue์„ ์ด์šฉํ•œ ์ƒ์‚ฐ์ž-์†Œ๋น„์ž ํŒจํ„ด

๐Ÿ“– ๋ธ”๋ฝํ‚นํ์˜ ์‚ฌ์šฉ ์˜ˆ์‹œ

์ƒ์‚ฐ์ž-์†Œ๋น„์ž ํŒจํ„ด์€ ๋ฉ€ํ‹ฐ์Šค๋ ˆ๋“œ ํ™˜๊ฒฝ์—์„œ ์ž์›์„ ๊ณต์œ ํ•˜๋Š” ํ”„๋กœ๊ทธ๋žจ์„ ๊ตฌํ˜„ํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋˜๋Š” ์ผ๋ฐ˜์ ์ธ ๋””์ž์ธ ํŒจํ„ด ์ค‘ ํ•˜๋‚˜.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumerExample {
    private static final int CAPACITY = 10;
    private static final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(CAPACITY);

    public static void main(String[] args) {
        Thread producerThread = new Thread(new Producer());
        Thread consumerThread = new Thread(new Consumer());

        producerThread.start();
        consumerThread.start();
    }

    static class Producer implements Runnable {
        @Override
        public void run() {
            try {
                int value = 0;
                while (true) {
                    queue.put(value);
                    System.out.println("Produced: " + value);
                    value++;
                    Thread.sleep(1000); // ์ƒ์‚ฐ ์†๋„ ์ œ์–ด
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    int value = queue.take();
                    System.out.println("Consumed: " + value);
                    Thread.sleep(2000); // ์†Œ๋น„ ์†๋„ ์ œ์–ด
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  • ์ƒ์‚ฐ์ž ์Šค๋ ˆ๋“œ๋Š” 1์ดˆ๋งˆ๋‹ค ํ์— ๊ฐ’์„ ๋„ฃ๊ณ (put) ์ƒ์„ฑ๋œ ๊ฐ’์„ ์ถœ๋ ฅ.
  • ์†Œ๋น„์ž ์Šค๋ ˆ๋“œ๋Š” 2์ดˆ๋งˆ๋‹ค ํ์—์„œ ๊ฐ’์„ ๊บผ๋‚ด์–ด(take) ์†Œ๋น„ํ•˜๊ณ  ์ถœ๋ ฅ.
  • ์ด ๊ณผ์ •์—์„œ BlockingQueue๋Š” ์ƒ์‚ฐ์ž์™€ ์†Œ๋น„์ž ๊ฐ„์˜ ๋ฐ์ดํ„ฐ ํ๋ฆ„์„ ์กฐ์œจํ•˜๊ณ  ๋™๊ธฐํ™”ํ•œ๋‹ค. ๋งŒ์•ฝ ํ๊ฐ€ ๋น„์–ด ์žˆ์œผ๋ฉด ์†Œ๋น„์ž ์Šค๋ ˆ๋“œ๋Š” take() ๋ฉ”์„œ๋“œ์—์„œ ๋ธ”๋ก๋˜์–ด ๋Œ€๊ธฐํ•˜๊ณ , ํ๊ฐ€ ๊ฐ€๋“ ์ฐจ ์žˆ์œผ๋ฉด ์ƒ์‚ฐ์ž ์Šค๋ ˆ๋“œ๋Š” put() ๋ฉ”์„œ๋“œ์—์„œ ๋ธ”๋ก๋˜์–ด ๋Œ€๊ธฐํ•˜๊ฒŒ ๋œ๋‹ค.