Backend/JAVA

블록킹큐(Blocking Queue)

dddzr 2024. 6. 5. 17:53

Queue의 개념

Queue는 컬렉션 프레임워크의 일부로, 데이터를 선입선출(FIFO, First-In-First-Out) 방식으로 저장하고 관리하는 자료구조입니다. 큐는 주로 데이터의 순서를 유지하면서 처리해야 하는 상황에서 사용됩니다.

 

 

BlockingQueue의 개념

Queue를 상속 받아, 큐의 기본작업에 블로킹 연이 추가된 인터페이스입니다.

동기화된 방식으로 작동하여 여러 스레드가 안전하게 접근할 수 있습니다.

BlockingQueue는 큐가 비어 있을 때 요소를 가져오려고 하면 대기하고, 큐가 가득 차 있을 때 요소를 추가하려고 하면 대기합니다. (put, take 함수에서 블락됐을 때 다음 코드가 실행되지 않고 대기하는 걸 볼 수 있다!!)

 

*블로킹 연산

특정 조건이 충족될 때까지 스레드를 일시 중지시키는 연산으로, 연산이 완료될 때까지 스레드를 대기 상태로 만든다.

 

BlockingQueue의 특징, 일반 Queue와의 차이

1. 동기화된 작업

  • 동기화된 작업을 제공하 여러 스레드가 동시에 큐에 접근하더라도 안전하게 작업을 수행할 수 있습니다.
  • 일반 Queue는 동시성을 지원하지 않아 여러 스레드에서 동시에 접근하면 데이터 일관성이 깨질 수 있다.

2. 블로킹 연산

  • 큐가 가득 찼을 때의 put 연산과 큐가 비었을 때의 take 연산이 블로킹된다. 이러한 연산들은 특정 조건이 충족될 때까지 스레드를 대기(Block)시킨다.

3. 시간제한

  • offer(E e, long timeout, TimeUnit unit) 및 poll(long timeout, TimeUnit unit)로 대기 시간을 설정할 수 있다. 지정된 시간 내에 연산이 완료되지 않으면 타임아웃과 함께 실패한다.

 

BlockingQueue의 주요 메서드

  • put(E e): 요소를 큐에 추가합니다. 큐가 가득 차 있으면 요소를 추가할 때까지 블록됩니다.
  • take(): 큐에서 요소를 제거하고 반환합니다. 큐가 비어 있으면 요소가 추가될 때까지 블록됩니다.
  • offer(E e, long timeout, TimeUnit unit): 요소를 큐에 추가합니다. 큐가 가득 차 있으면 지정된 시간 동안 블록됩니다.
  • poll(long timeout, TimeUnit unit): 큐에서 요소를 제거하고 반환합니다. 큐가 비어 있으면 지정된 시간 동안 블록됩니다.
  • remainingCapacity(): 큐에 추가할 수 있는 요소의 최대 수를 반환합니다.
  • size(): 큐에 현재 저장된 요소의 수를 반환합니다.
  • isEmpty(): 큐가 비어 있는지 확인합니다.

 

BlockingQueue의 구현체

1. ArrayBlockingQueue

고정 크기의 배열을 기반으로 한 구현. 구현 시 크기를 지정, 변경 불가합니다.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ArrayBlockingQueueExample {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

        queue.put("first");
        queue.put("second");
        queue.put("third");

        System.out.println(queue.take()); // 출력: first
        System.out.println(queue.peek()); // 출력: second
    }
}

 

2. LinkedBlockingQueue

연결 노드를 기반으로 한 구현. 선택적으로 최대 크기를 설정할 수 있다.

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class LinkedBlockingQueueExample {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new LinkedBlockingQueue<>();

        queue.put("first");
        queue.put("second");
        queue.put("third");

        System.out.println(queue.take()); // 출력: first
        System.out.println(queue.peek()); // 출력: second
    }
}

 

3. PriorityBlockingQueue

요소를 우선순위에 따라 저장하는 구현.

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class PriorityBlockingQueueExample {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();

        queue.put(3);
        queue.put(1);
        queue.put(2);

        System.out.println(queue.take()); // 출력: 1
        System.out.println(queue.peek()); // 출력: 2
    }
}

 

4. SynchronousQueue

두 스레드 간에 요소를 직접 전달하는 동기화된 큐입니다. 이 큐는 버퍼를 가지지 않고, 하나의 스레드가 put() 메서드로 요소를 추가하면 다른 스레드는 이 요소를 take() 메서드로 직접 받아야 합니다. 따라서 요소를 넣을 때는 반드시 다른 스레드가 해당 요소를 받을 때까지 대기합니다.

import java.util.concurrent.*;

public class SynchronousQueueExample {
    public static void main(String[] args) {
        // SynchronousQueue 생성
        SynchronousQueue<String> queue = new SynchronousQueue<>();

        // Producer 스레드 생성
        Thread producer = new Thread(() -> {
            try {
                String message = "Hello from producer!";
                System.out.println("Producer is putting: " + message);
                queue.put(message); // 요소 추가 (대기상태가 됨)
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // Consumer 스레드 생성
        Thread consumer = new Thread(() -> {
            try {
                String message = queue.take(); // 요소 가져오기 (대기상태가 됨)
                System.out.println("Consumer received: " + message);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // Producer와 Consumer 스레드 시작
        producer.start();
        consumer.start();
    }
}

 

 

* 사용 예시) BlockingQueue을 이용한 생산자-소비자 패턴

생산자-소비자 패턴은 멀티스레드 환경에서 자원을 공유하는 프로그램을 구현하는 데 사용되는 일반적인 디자인 패턴 중 하나입니다.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumerExample {
    private static final int CAPACITY = 10;
    private static final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(CAPACITY);

    public static void main(String[] args) {
        Thread producerThread = new Thread(new Producer());
        Thread consumerThread = new Thread(new Consumer());

        producerThread.start();
        consumerThread.start();
    }

    static class Producer implements Runnable {
        @Override
        public void run() {
            try {
                int value = 0;
                while (true) {
                    queue.put(value);
                    System.out.println("Produced: " + value);
                    value++;
                    Thread.sleep(1000); // 생산 속도 제어
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    int value = queue.take();
                    System.out.println("Consumed: " + value);
                    Thread.sleep(2000); // 소비 속도 제어
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

 

생산자 스레드는 1초마다 큐에 값을 넣고(put) 생성된 값을 출력합니다. 소비자 스레드는 2초마다 큐에서 값을 꺼내어(take) 소비하고 출력합니다.

이 과정에서 BlockingQueue는 생산자와 소비자 간의 데이터 흐름을 조율하고 동기화합니다. 만약 큐가 비어 있으면 소비자 스레드는 take() 메서드에서 블록되어 대기하고, 큐가 가득 차 있으면 생산자 스레드는 put() 메서드에서 블록되어 대기하게 됩니다.

'Backend > JAVA' 카테고리의 다른 글

HttpURLConnection  (0) 2024.06.07
Callable, Runnable, Future  (0) 2024.06.05
스레드 풀(Thread Pool), Executor  (0) 2024.06.05
[Java] java.lang 패키지의 System  (0) 2023.10.12
JAVA에서 html 다루기 (Jsoup)  (0) 2023.08.16