Streams API — ReadableStream, WritableStream으로 데이터 스트리밍
Streams API는 대용량 데이터를 청크(chunk) 단위로 처리하는 API입니다. 전체 데이터를 메모리에 올리지 않고 조금씩 읽고 쓸 수 있어서, 대용량 파일 처리나 실시간 데이터 전송에 적합합니다.
ReadableStream — 읽기 스트림
// 커스텀 ReadableStream 생성
const stream = new ReadableStream({
start(controller) {
// 초기화
controller.enqueue("첫 번째 청크");
controller.enqueue("두 번째 청크");
controller.close(); // 스트림 종료
},
});
// 읽기
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(value);
}
비동기 데이터 생성
function createCounterStream(limit) {
let count = 0;
return new ReadableStream({
pull(controller) {
if (count >= limit) {
controller.close();
return;
}
// pull은 소비자가 데이터를 요청할 때 호출됨
controller.enqueue(count++);
},
});
}
const stream = createCounterStream(5);
const reader = stream.getReader();
let result;
while (!(result = await reader.read()).done) {
console.log(result.value); // 0, 1, 2, 3, 4
}
WritableStream — 쓰기 스트림
const writableStream = new WritableStream({
write(chunk) {
console.log("쓰기:", chunk);
},
close() {
console.log("스트림 닫힘");
},
abort(reason) {
console.error("중단:", reason);
},
});
const writer = writableStream.getWriter();
await writer.write("데이터 1");
await writer.write("데이터 2");
await writer.close();
TransformStream — 변환 스트림
// 대문자 변환 스트림
const upperCaseTransform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
// 파이프라인: 읽기 → 변환 → 쓰기
const readable = new ReadableStream({
start(controller) {
controller.enqueue("hello");
controller.enqueue("world");
controller.close();
},
});
const results = [];
await readable
.pipeThrough(upperCaseTransform)
.pipeTo(new WritableStream({
write(chunk) { results.push(chunk); },
}));
console.log(results); // ["HELLO", "WORLD"]
fetch 응답 스트리밍
// 대용량 파일 다운로드 + 진행률
async function downloadWithProgress(url) {
const response = await fetch(url);
const total = parseInt(response.headers.get("Content-Length") || "0");
let loaded = 0;
const reader = response.body.getReader();
const chunks = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
loaded += value.length;
if (total > 0) {
console.log(`진행률: ${((loaded / total) * 100).toFixed(1)}%`);
}
}
return new Blob(chunks);
}
NDJSON 스트리밍 파싱
// 서버에서 줄 단위 JSON을 스트리밍으로 받기
async function* parseNDJSON(response) {
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop(); // 미완성 줄은 버퍼에 유지
for (const line of lines) {
if (line.trim()) {
yield JSON.parse(line);
}
}
}
if (buffer.trim()) {
yield JSON.parse(buffer);
}
}
// 사용
const response = await fetch("/api/stream");
for await (const item of parseNDJSON(response)) {
console.log(item); // 각 JSON 객체가 도착할 때마다 처리
}
스트림 파이프라인
// 여러 변환을 체이닝
const compressionStream = new CompressionStream("gzip");
const decompressionStream = new DecompressionStream("gzip");
// 텍스트 → 압축 → 전송
const blob = new Blob(["대용량 텍스트 데이터..."]);
const compressed = blob.stream()
.pipeThrough(new CompressionStream("gzip"));
// 서버로 압축 전송
await fetch("/api/upload", {
method: "POST",
body: compressed,
headers: { "Content-Encoding": "gzip" },
});
tee — 스트림 분기
// 하나의 스트림을 두 갈래로 분기
const response = await fetch("/api/data");
const [stream1, stream2] = response.body.tee();
// stream1은 화면에 표시
displayStream(stream1);
// stream2는 캐시에 저장
cacheStream(stream2);
스트림과 일반 처리 비교
| 항목 | 일반 처리 | 스트림 |
|---|---|---|
| 메모리 | 전체 데이터 로드 | 청크 단위 |
| 첫 데이터까지 시간 | 전체 로드 후 | 첫 청크 도착 즉시 |
| 대용량 데이터 | 메모리 부족 위험 | 안전 |
| 코드 복잡도 | 낮음 | 높음 |
**기억하기 **: Streams API는 "물이 흐르듯" 데이터를 처리합니다. ReadableStream에서 읽고, TransformStream으로 변환하고, WritableStream에 쓰는 파이프라인을 구성할 수 있습니다.
fetch응답의body가 이미 ReadableStream이므로, 대용량 다운로드의 진행률 표시나 실시간 데이터 파싱에 바로 활용할 수 있습니다.
댓글 로딩 중...