๐ 1. Streaming์ด๋?
Streaming์ ๋ฐ์ดํฐ๋ฅผ ํ ๋ฒ์ ๋ชจ๋ ๋ณด๋ด๋ ๊ฒ ์๋๋ผ, ์กฐ๊ฐ(chunk) ๋จ์๋ก ๋๋ ์ ์์ฐจ์ ์ผ๋ก ์ ์กํ๋ ๋ฐฉ์์ด๋ค.
๐ ์๋ฅผ ๋ค์ด:
- ๋์ฉ๋ ํ์ผ ์ ์ก ์ ์ ์ฒด๋ฅผ ๋ค ๋ค๊ณ ์์ง ์๊ณ ์กฐ๊ธ์ฉ ์ฝ๊ณ ๋ณด๋ด๋ ๋ฐฉ์
- ์ฑํ , ๋ก๊ทธ, ์ค์๊ฐ ์๋ฆผ ๋ฑ ๊ณ์ ์ด์ด์ง๋ ๋ฐ์ดํฐ๋ฅผ ์ค์๊ฐ์ผ๋ก ์ ๋ฌํ๊ณ ์ถ์ ๋ ์ฌ์ฉ
- ๋๋ ai API ์๋ต์ ์ฌ์ฉ์์๊ฒ ๋นจ๋ฆฌ ๋ณด์ฌ์ฃผ๊ธฐ ์ํด ์ฌ์ฉํ๋ค!!
๐ 2. Reactive Stream ๊ธฐ๋ณธ ๊ฐ๋
Reactive Stream์ ๋น๋๊ธฐ + ๋ ผ๋ธ๋กํน ๋ฐฉ์์ผ๋ก ๋ฐ์ดํฐ ํ๋ฆ(์คํธ๋ฆผ)์ ์ฒ๋ฆฌํ๋ ๊ธฐ์ ์ด๋ค.
Java์์๋ ์ด๋ฅผ Flux, Mono ๊ฐ์ ํด๋์ค๋ก ํํํ๋ฉฐ, Spring WebFlux๋ ์ด๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ์๋ํ๋ ๋น๋๊ธฐ ์น ํ๋ ์์ํฌ๋ค.
๐ 3. Mono vs Flux
Reactive Streams์์ ์ฌ์ฉ๋๋ ์๋ต ํํ(๋ฆฌํด ํ์ )์ด๋ค.
| ๊ตฌ๋ถ | Mono | Flux |
| ์๋ฏธ | 0๊ฐ ๋๋ 1๊ฐ์ ๋ฐ์ดํฐ | 0๊ฐ ์ด์ (๋ฌด์ ํ ๊ฐ๋ฅ) |
| ๋น์ | ๋จ์ผ ์๋ต (๋จ๊ฑด) | ์คํธ๋ฆฌ๋ฐ ์๋ต (๋ค๊ฑด / ์ค์๊ฐ) |
| ์ฌ์ฉ ์ | ๋ก๊ทธ์ธ ๊ฒฐ๊ณผ, ์ฌ์ฉ์ ์ ๋ณด ์กฐํ | ๊ฒ์ ๊ฒฐ๊ณผ ๋ฆฌ์คํธ, ์ค์๊ฐ ๋ฐ์ดํฐ ์ ์ก |
| ์๋ต ๋ฐฉ์ | ๋จ์ผ JSON | text/event-stream, chunked ๋ฑ ๊ฐ๋ฅ |
| ๋ํ ๋ฉ์๋ | bodyToMono(...) | bodyToFlux(...) |
๐ 4. ์ฃผ์ ์ฐ์ฐ์/ํจ์ ์ค๋ช
| ํจ์ | ์ญํ | ์ฃผ์ ์ฌํญ |
| subscribe() | ์ค์ ์คํธ๋ฆผ ์คํ (์์ผ๋ฉด ์๋ฌด๊ฒ๋ ์ ์ผ์ด๋จ) | ๋น๋๊ธฐ ์๋น |
| block() / blockLast() | Mono/Flux ์คํ ๋ฐ ๊ฒฐ๊ณผ ๋๊ธฐ | ๋๊ธฐ ๋ฐฉ์ |
| doOnNext() | ๋ฐ์ดํฐ emit๋ ๋๋ง๋ค ๋ถ๊ฐ ์์ | ์ค๊ฐ ๋ก๊ทธ ์ฐ๊ธฐ ๋ฑ |
| doOnSubscribe() | ๊ตฌ๋ ์์ ์ ์คํ | ํ๋ฆ ํธ๋ฆฌ๊ฑฐ |
| doOnComplete() | ์คํธ๋ฆผ ์ ์ ์ข ๋ฃ ์ ์คํ | onError ์์๋ ํธ์ถ ์ ๋จ |
| doFinally() | ์คํธ๋ฆผ ์ข ๋ฃ ์ ํญ์ ์คํ (์ฑ๊ณต/์คํจ ๋ชจ๋) | ๋ฆฌ์์ค ์ ๋ฆฌ ๋ฑ์ ์ ์ฉ |
| bodyToMono(...) | ์๋ต์ Mono(๋จ์ผ ๋ฐ์ดํฐ)๋ก ๋ณํ | POST ์์ฒญ ๊ฒฐ๊ณผ ๋ฑ์ ์ฌ์ฉ |
| bodyToFlux(...) | ์๋ต์ Flux(๋ค๊ฑด ์คํธ๋ฆผ)์ผ๋ก ๋ณํ | ์คํธ๋ฆฌ๋ฐ ์๋ต ๋ฐ์ ๋ ์ฌ์ฉ |
| DataBufferUtils.read(...) | ํ์ผ์ DataBuffer ์คํธ๋ฆผ์ผ๋ก ์ฝ์ | ์๋ฒ ํ์ผ ์๋ต์ ์ ์ฉ |
| DataBufferUtils.write(...) | DataBuffer ์คํธ๋ฆผ์ ํ์ผ๋ก ์ ์ฅ | ํด๋ผ์ด์ธํธ ํ์ผ ๋ค์ด๋ก๋ ์ ํ์ฉ |
โ bodyToMono(T), .bodyToFlux(T) ์ธ์ ํ์
| ํ์ ์์ | ์ค๋ช |
| DataBuffer.class | ๋ฐ์ดํธ ์กฐ๊ฐ์ ์ง์ ๋ฐ์ (๋ก์ฐ ์คํธ๋ฆผ) ->์ง์ CharsetDecoder๋ก ๋์ฝ๋ฉํด์ผ ํจ |
| String.class | ๋ฐ์ดํธ๋ฅผ ๋ฌธ์๋ก ๋์ฝ๋ฉํ ๋ฌธ์์ด ๋ฐ์ |
| ์ปค์คํ DTO ํด๋์ค (์: MyDto.class) | JSON, XML ๋ฑ์ ์๋์ผ๋ก ์ญ์ง๋ ฌํ(deserialize) ํ์ฌ ๊ฐ์ฒด๋ก ๋ฐ์ |
.bodyToFlux(String.class)๋ ๋ณดํต ์์ ํ
์คํธ ์กฐ๊ฐ์ด๋ JSON ๋ผ์ธ ๋จ์ ๋ฑ์์ ์ฐ๊ณ ,
๋์ฉ๋ ๋ฐ์ด๋๋ฆฌ๋ ์ปค์คํ
์ธ์ฝ๋ฉ ์ฒ๋ฆฌํ ๋ .bodyToFlux(DataBuffer.class)๋ก ๋ฐ๋ ๊ฒ ์์ !
๐ 5. ์์ฃผ ์ฌ์ฉ๋๋ ์คํธ๋ฆฌ๋ฐ ์ผ์ด์ค
| ์ผ์ด์ค | ์ฌ์ฉ ๋ฐฉ์ | ์ค๋ช |
| ์ค์๊ฐ ๋ก๊ทธ, ์๋ฆผ | Flux + SSE(text/event-stream) | 1์ด๋ง๋ค ๋ฐ์ดํฐ ์ ์ก |
| ๋์ฉ๋ ํ์ผ ๋ค์ด๋ก๋ | Flux<DataBuffer> → Mono<Void> | ์กฐ๊ฐ ๋จ์ ์ ์ก |
| ํ์ผ ์ ๋ก๋ | Flux<DataBuffer> | ์คํธ๋ฆฌ๋ฐ์ผ๋ก ์๋ฒ์ ์ ๋ฌ |
| ๋จ์ผ ์์ฒญ ๊ฒฐ๊ณผ | Mono | ๋น๋๊ธฐ ๋จ๊ฑด ์๋ต (์: ๋ก๊ทธ์ธ) |
โ Flux ์คํธ๋ฆฌ๋ฐ ์๋ต
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> stream() {
return Flux.interval(Duration.ofSeconds(1))
.map(i -> "๋ฐ์ดํฐ " + i)
.take(5);
}
โ WebClient๋ก Flux ๋ฐ๊ธฐ
WebClient.create("http://localhost:8080")
.get().uri("/stream")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class)
.subscribe(System.out::println);
โ ํ์ผ ๋ค์ด๋ก๋ (DataBufferUtils ํ์ฉ)
Flux<DataBuffer> fileStream = WebClient.create()
.get().uri("/files/sample.pdf")
.retrieve()
.bodyToFlux(DataBuffer.class);
DataBufferUtils.write(fileStream, path, StandardOpenOption.CREATE)
.block(); // ์๋ฃ ๋๊ธฐ
โ ํ์ผ ์๋ต ์๋ฒ ์ฝ๋
@GetMapping("/files/sample.pdf")
public Mono<Void> downloadFile(ServerHttpResponse response) {
Flux<DataBuffer> fileStream = DataBufferUtils.read(filePath, response.bufferFactory(), 4096);
response.getHeaders().setContentType(MediaType.APPLICATION_PDF);
return response.writeWith(fileStream);
}
โ ํด๋ผ์ด์ธํธ - WebClient๋ก ํ์ผ ์ ๋ก๋ํ๊ธฐ
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
public class FileUploadClient {
public static void main(String[] args) {
WebClient webClient = WebClient.create("http://localhost:8080");
Mono<String> result = webClient.post()
.uri("/upload")
.contentType(MediaType.MULTIPART_FORM_DATA)
.bodyValue(
MultipartBodyBuilderHelper.build("file", "C:/temp/sample.pdf")
)
.retrieve()
.bodyToMono(String.class);
result.subscribe(System.out::println);
}
}
๐ MultipartBodyBuilderHelper๋ ์๋์ฒ๋ผ ๋ฐ๋ก ์ ์
import org.springframework.http.client.MultipartBodyBuilder;
import org.springframework.core.io.FileSystemResource;
public class MultipartBodyBuilderHelper {
public static org.springframework.util.MultiValueMap<String, Object> build(String field, String filePath) {
MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part(field, new FileSystemResource(filePath));
return builder.build();
}
}
โ ์๋ฒ - WebFlux ์ปจํธ๋กค๋ฌ์์ ํ์ผ ์์ ๋ฐ ์ ์ฅ
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
import java.nio.file.Path;
import java.nio.file.Paths;
@RestController
public class FileUploadController {
@PostMapping(value = "/upload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public Mono<String> handleUpload(@RequestPart("file") FilePart filePart) {
Path path = Paths.get("uploaded_" + filePart.filename());
// DataBuffer๋ก ์คํธ๋ฆฌ๋ฐ ์ ์ฅ
return DataBufferUtils.write(filePart.content(), path)
.then(Mono.just("ํ์ผ ์
๋ก๋ ์ฑ๊ณต: " + filePart.filename()));
}
}
๐ 6. ๋์ฝ๋ฉ ์ฒ๋ฆฌ
โ 6-1. ๋์ฝ๋ฉ ์ฒ๋ฆฌ ํ๋ฆ
- DataBuffer → byte[] → ByteBuffer ๋์ → CharsetDecoder.decode() → CharBuffer → String
CharsetDecoder๋ ByteBuffer๋ง ์
๋ ฅ์ผ๋ก ๋ฐ์ผ๋ฏ๋ก, DataBuffer๋ฅผ ์ง์ ์ฒ๋ฆฌํ ์ ์๋ค.
โ 6-2. WebFlux DataBuffer ๋ฐ์ดํฐ์ฒ๋ฆฌ(๋์ฝ๋ฉ) ์์
- ์๋ฒ๋ UTF-8 ๋ฑ ๋ฌธ์ ์ธ์ฝ๋ฉ๋ ๋ฐ์ดํธ๋ฅผ ๋ณด๋ด๊ณ , ํด๋ผ์ด์ธํธ/์๋ฒ๋ ์ด ๋ฐ์ดํธ๋ฅผ ๋ฌธ์์ด๋ก ๋์ฝ๋ฉํด์ผ ์ต์ข ๋ฐ์ดํฐ๋ก ํ์ฉํ ์ ์๋ค.
- bodyToFlux(DataBuffer.class)๋ก ๋ฐ์ผ๋ฉด, doOnNext(buffer -> {//์ฌ๊ธฐ์ ์ฒ๋ฆฌ!!})
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import reactor.core.publisher.Flux;
public class DataBufferDecodingExample {
// UTF-8 ๋ฌธ์ ๋์ฝ๋ ์์ฑ
private final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
// ๋์ ๋ฐ์ดํธ ๋ฒํผ (์
๋ ฅ ๋ฐ์ดํฐ ์ ์ฅ)
private final ByteBuffer cumulativeBuffer = ByteBuffer.allocate(8192);
// ๋ฌธ์ ๋ฒํผ (๋์ฝ๋ฉ ๊ฒฐ๊ณผ ์์ ์ ์ฅ)
private final CharBuffer charBuffer = CharBuffer.allocate(8192);
// ๋ฌธ์์ด ๋์ ์ ์ฅ์ฉ
private final StringBuilder decodedStr = new StringBuilder();
public void decodeDataBufferFlux(Flux<DataBuffer> dataBufferFlux) {
dataBufferFlux
.doOnNext(buffer -> {
// 1๏ธโฃ DataBuffer๋ฅผ byte[]๋ก ์์ ํ๊ฒ ๋ณต์ฌ
byte[] newBytes = new byte[buffer.readableByteCount()];
buffer.read(newBytes);
DataBufferUtils.release(buffer); // ์ฌ์ฉ ํ ๋ฐ๋์ ํด์ !
// 2๏ธโฃ ๋์ ๋ฐ์ดํธ ๋ฒํผ์ ์ ๋ฐ์ดํธ ์ถ๊ฐ
cumulativeBuffer.put(newBytes);
// 3๏ธโฃ ์ฝ๊ธฐ ๋ชจ๋๋ก ์ ํํ์ฌ ๋์ฝ๋ฉ ์ค๋น
cumulativeBuffer.flip();
// 4๏ธโฃ ์ฌ๋ฌ ์ํ์ ๋์ํด ๋์ฝ๋ฉ ๋ฐ๋ณต ์ํ
while (true) {
CoderResult result = decoder.decode(cumulativeBuffer, charBuffer, false);
charBuffer.flip(); // ์ฐ๊ธฐ๋ชจ๋ → ์ฝ๊ธฐ๋ชจ๋ ์ ํ
// 5๏ธโฃ ๋์ฝ๋ฉ๋ ๋ฌธ์๋ค ๋์ ์ ์ฅ
decodedStr.append(charBuffer);
charBuffer.clear(); // ๋ฌธ์ ๋ฒํผ ์ด๊ธฐํ
if (result.isUnderflow()) {
// ์
๋ ฅ ๋ฐ์ดํธ ๋ถ์กฑ → ๋ค์ ์กฐ๊ฐ ๊ธฐ๋ค๋ฆผ
break;
} else if (result.isOverflow()) {
// ๋ฌธ์ ๋ฒํผ ๊ฝ ์ฐธ → ๊ณ์ ๋์ฝ๋ฉ
continue;
} else if (result.isError()) {
System.err.println("๋์ฝ๋ฉ ์๋ฌ ๋ฐ์");
break;
}
}
// 6๏ธโฃ ์ฝ์ง ์์ ๋ฐ์ดํธ๋ฅผ ์์ผ๋ก ์ด๋ (๋ค์ ์กฐ๊ฐ๊ณผ ํฉ์น๊ธฐ ์ํด)
cumulativeBuffer.compact();
// 7๏ธโฃ(ํ์ ์ ์ฌ๊ธฐ์ decodedStr.toString()์ผ๋ก ๋ฌธ์์ด ์ฌ์ฉ ๊ฐ๋ฅ)
try {
PrintWriter writer = response.getWriter();
writer.write("data: " + decodedStr.toString() + "\n\n"); // SSE ๋ฐ์ดํฐ ํฌ๋งท
writer.flush(); // ์ฆ์ ์ ์ก
logger.debug("ํด๋ผ์ด์ธํธ ์ ์ก: {} ({} bytes)", decodedStr.toString(), decodedStr.length());
} catch (IOException e) {
logger.error("ํด๋ผ์ด์ธํธ ์ ์ก ์ค๋ฅ", e);
asyncContext.complete(); // ์๋ฌ ์ ๋น๋๊ธฐ ์ปจํ
์คํธ ์ข
๋ฃ
}
})
.subscribe();
}
}
- ์ ์ฝ๋๋ WebFlux ์๋ฒ ํน์ ํด๋ผ์ด์ธํธ๊ฐ ์คํธ๋ฆผ์ผ๋ก ์กฐ๊ฐ์กฐ๊ฐ ์ค๋ ๋ฐ์ดํธ ๋ฐ์ดํฐ๋ฅผ ์ค์๊ฐ์ผ๋ก ์์ ํ๊ฒ UTF-8 ๋ฌธ์์ด๋ก ๋ณํํ๋ ํจํด์ด๋ค.
- ์ค์๊ฐ AI ํ
์คํธ ์๋ต ์ฒ๋ฆฌ, ๋์ฉ๋ JSON ์คํธ๋ฆผ ํ์ฑ, SSE(Server-Sent Events) ์ฒ๋ฆฌ ๋ฑ์ ํ์ฉ๋๋ค.
1๏ธโฃ์ DataBuffer๋ฅผ ๋ฐ๋ก ๋์ฝ๋ฉํ์ง ์๊ณ ๋ณต์ฌํ๋๊ฐ?
DataBuffer๋ Netty ๊ธฐ๋ฐ์ ์์ ๋ฉ๋ชจ๋ฆฌ ๋ฒํผ์ด๋ฏ๋ก
- ์ฌ์ฉ ํ ๋ฐ๋์ DataBufferUtils.release(buffer)๋ก ๋ฉ๋ชจ๋ฆฌ๋ฅผ ํด์ ํด์ผ ํ๋ค.
- ๋ฒํผ๋ฅผ ๋ฐ๋ก ์ฌํ์ฉํ์ง ์๊ณ , byte[]์ ๋ณต์ฌํด ์์ ํ๊ฒ ์ฒ๋ฆฌํด์ผ ํ๋ค.
- DataBuffer๋ ์ฌ์ฌ์ฉ๋๋ ํ(pool) ๋ฉ๋ชจ๋ฆฌ๋ผ, ํด์ (release()) ํ ์ง์ ์ฐธ์กฐํ๋ฉด ์ค๋ฅ๊ฐ ๋ฐ์ํ๋ค.
2๏ธโฃ ๋คํธ์ํฌ ๋ฐ์ดํฐ๋ ์กฐ๊ฐ์ผ๋ก ๋ค์ด์์, ๋จ์ผ DataBuffer๋ง ๋ณด๊ณ ๋์ฝ๋ฉํ๋ฉด ์ค๊ฐ ์กฐ๊ฐ์ด ๊นจ์ ธ์ ๋ฌธ์์ด ์์(๏ฟฝ)์ด ์๊ธด๋ค.
- "hello " → DataBuffer 1
- "wor" → DataBuffer 2 (๋จ๋ ๋์ฝ๋ฉํ๋ฉด ๊นจ์ง)
- "ld\n" → DataBuffer 3
- ๋ฐ๋ผ์ ์์ ํ๊ฒ byte[]์ ๋ณต์ฌ ํ, ByteBuffer ๋์ ๋ฒํผ์ ์ถ๊ฐํ์ฌ ๋ฌธ์ ๋์ฝ๋ฉ์ ํด์ผ ํ๋ค.
3๏ธโฃdecoder.decode()๋ ์ถ๋ ฅ์ฉ ๋ฌธ์ ๋ฒํผ๊ฐ ๊ฝ ์ฐจ๋ฉด Overflow ์ํ๋ฅผ ๋ฐํ
- ์ด๋ด ๋ ๊ณ์ ํธ์ถํด์ ๋จ์ ๋ฌธ์๋ฅผ ๋ ๋์ฝ๋ฉํด์ผ ํจ
๊ทธ๋์ while๋ฌธ์ผ๋ก ๋ฐ๋ณตํ๋ฉฐ ์ถ๋ ฅ ๋ฒํผ๊ฐ ๋น์์ง ๋๊น์ง ๋์ฝ๋ฉํด์ผ
ํ์ฌ ๋ฒํผ ๋ด ๋ชจ๋ ๋ฐ์ดํธ๋ฅผ ์ฒ๋ฆฌ ๊ฐ๋ฅ
๐ 7. Streaming ์์ ์์ฝ ๐ฏ
- Flux๋ ์คํธ๋ฆผ, Mono๋ ๋จ๊ฑด ์๋ต
- subscribe()๋ฅผ ํธ์ถํด์ผ ๋ฐ์ดํฐ๊ฐ ํ๋ฅด๊ธฐ ์์ํจ
- DataBuffer / DataBufferUtils๋ ๋ฐ์ด๋๋ฆฌ ๋ฐ์ดํฐ ๋ค๋ฃฐ ๋ ํต์ฌ
- WebFlux๋ ๋น๋๊ธฐ ์คํธ๋ฆฌ๋ฐ ์๋ต์ ์ํ ํ๋ ์์ํฌ
- ์คํธ๋ฆฌ๋ฐ ์๋ต์ text/event-stream ๋๋ chunked๋ก ์ฌ์ฉ๋จ
'Backend > JAVA' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
| jsonํ์ผ ์ฝ๊ธฐ (0) | 2025.12.18 |
|---|---|
| [JAVA] Client IP, Device Type ์ถ์ถ (IP ๊ธฐ๋ฐ ๋ก๊ทธ์ธ) (0) | 2025.12.18 |
| ๋๋ DB INSERT ์ต์ ํ (0) | 2025.11.29 |
| ํ์ผ ์ ๋ก๋, ๋ค์ด๋ก๋ (MultipartFile) (0) | 2025.11.08 |
| JUnit ๋จ์ ํ ์คํธ ์ ์ฉ, JaCoCo ์ปค๋ฒ๋ฆฌ์ง ์ธก์ (0) | 2025.08.27 |