fs.createReadStream + csv-parser 사용 시 비동기 처리에 주의하세요!
📌 문제 상황: 비동기 스트림에서 예상치 못한 동작
CSV 파일을 읽고, 특정 수량(quantity)만큼 데이터를 처리하려고 했지만, 모든 데이터가 처리됨. 😲
✅ 목표:
- CSV 데이터에서 특정 개수(quantity)만 읽고 중단
- 비동기 함수(await)가 정확히 작동하도록 수정
💥 문제 발생 코드: 비동기 흐름 제어 실패
import fs from 'fs';
import csv from 'csv-parser';
let count = 0;
const quantity = 20;
const winDataList: any[] = [];
fs.createReadStream(filePath)
.pipe(csv())
.on("data", async (data: CsvRow) => {
if (count < quantity) {
const email = data.email;
const point = data.point;
const choice = data.choice;
const choiceImage = { a: "imagea", b: "imageb" }[choice]!;
winDataList.push({ email, point, choice, choiceImage });
console.log("Processing:", email, point);
await updateEmailFile(email, fileName); // 비동기 작업
await updateEmail(email); // 비동기 작업
count++;
} else {
return; // ❌ 스트림은 계속 실행됨
}
});
⚠️ 문제 원인:
- csv-parser는 비동기 흐름을 기다리지 않음.
- await 사용했지만, 스트림은 비동기 작업과 무관하게 계속 실행.
- 결과: 모든 CSV 데이터가 처리됨. 😱
✅ 해결 방법: 스트림 흐름을 명확히 제어
### 🎯 방법 1: stream.destroy()로 스트림 종료
count가 quantity에 도달하면 스트림 자체를 종료하여 추가 데이터를 읽지 않도록 합니다.
import fs from 'fs';
import csv from 'csv-parser';
let count = 0;
const quantity = 20;
const winDataList: any[] = [];
const stream = fs.createReadStream(filePath)
.pipe(csv());
stream.on("data", async (data: CsvRow) => {
if (count < quantity) {
const email = data.email;
const point = data.point;
const choice = data.choice;
const choiceImage = { a: "imagea", b: "imageb" }[choice]!;
winDataList.push({ email, point, choice, choiceImage });
console.log("Processing:", email, point);
await updateEmailFile(email, fileName);
await updateEmail(email);
count++;
if (count >= quantity) {
stream.destroy(); // ✅ 스트림 완전 종료
}
}
});
stream.on("close", () => {
console.log("Stream closed after processing", count, "items.");
});
stream.on("error", (err) => {
console.error("Stream error:", err);
});
💡 장점:
- 간단하고 빠른 구현
- quantity에 도달하면 즉시 스트림 종료
❗ 단점:
- 스트림 완전 종료: 이후 데이터는 절대 다시 읽을 수 없음
### 🎯 방법 2: pause() & resume()으로 정교한 제어
각 데이터 처리 시 스트림을 일시정지하고, 비동기 작업 완료 후 다시 재개합니다.
import fs from 'fs';
import csv from 'csv-parser';
let count = 0;
const quantity = 20;
const winDataList: any[] = [];
const stream = fs.createReadStream(filePath)
.pipe(csv());
stream.on("data", async (data: CsvRow) => {
if (count < quantity) {
stream.pause(); // ✅ 데이터 흐름 일시 정지
const email = data.email;
const point = data.point;
const choice = data.choice;
const choiceImage = { a: "imagea", b: "imageb" }[choice]!;
winDataList.push({ email, point, choice, choiceImage });
console.log("Processing:", email, point);
await updateEmailFile(email, fileName);
await updateEmail(email);
count++;
if (count >= quantity) {
stream.destroy(); // ✅ 최대 수량 도달 시 스트림 종료
} else {
stream.resume(); // ✅ 다음 데이터 읽기 재개
}
}
});
stream.on("close", () => {
console.log("Stream closed after processing", count, "items.");
});
stream.on("error", (err) => {
console.error("Stream error:", err);
});
💡 장점:
- 비동기 흐름 제어 가능 (데이터 순서 보장)
- 중간에 작업 재개 가능
❗ 단점:
- 코드 복잡성 증가
🎯 방법 3: pipeline()으로 스트림 관리 최적화
Node.js 10+에서는 **stream.pipeline()**을 사용해 에러 핸들링과 비동기 흐름 관리를 더 효율적으로 처리할 수 있습니다.
import { createReadStream } from 'fs';
import { pipeline } from 'stream';
import csv from 'csv-parser';
import { promisify } from 'util';
const asyncPipeline = promisify(pipeline);
async function processCSV(filePath: string, quantity: number) {
let count = 0;
const winDataList: any[] = [];
await asyncPipeline(
createReadStream(filePath),
csv(),
async function* (source) {
for await (const data of source) {
if (count >= quantity) break;
const email = data.email;
const point = data.point;
const choice = data.choice;
const choiceImage = { a: "imagea", b: "imageb" }[choice]!;
winDataList.push({ email, point, choice, choiceImage });
console.log("Processing:", email, point);
await updateEmailFile(email, fileName);
await updateEmail(email);
count++;
}
}
);
console.log("Processed", count, "records");
}
processCSV(filePath, 20).catch(console.error);
💡 장점:
- 에러 핸들링 포함
- 비동기 흐름 자연스럽게 제어 가능
- 중간에 멈추거나 재시작 가능
❗ 단점:
- Node.js 10+ 필요
- 복잡한 로직에는 적합하지 않을 수 있음
✅ 결론: 어떤 방법을 선택할까?
방법장점단점추천 상황
stream.destroy() | 빠르고 단순 | 이후 데이터 다시 읽기 불가 | 간단한 데이터 처리 |
pause() & resume() | 비동기 흐름 제어, 순서 보장 | 코드 복잡성 증가 | 정교한 흐름 제어 필요 시 |
pipeline() | 에러 핸들링, 비동기 흐름 최적화 | Node.js 10+ 필요 | 대규모 스트림 작업 |
📊 💡 최종 추천
- 빠르게 끝내야 한다면 → destroy()
- 비동기 흐름을 제어하고 싶다면 → pause() & resume()
- 에러 핸들링과 확장성을 고려한다면 → pipeline()
💬 마무리
Node.js의 스트림은 효율적인 데이터 처리를 가능하게 하지만, 비동기 흐름 관리에 유의해야 합니다. 특히 대용량 CSV 처리에서는 pause()와 destroy()를 적절히 사용하여 메모리 사용을 줄이고, 필요 시 pipeline()으로 에러 핸들링까지 고려하세요. 🚀
💡 활용 팁:
- 기술 태그 (#NodeJS, #JavaScript, #StreamAPI) → 개발자 타겟팅
- 문제 해결 태그 (#ErrorHandling, #CodeOptimization) → 검색 최적화
- 커뮤니티 태그 (#DeveloperLife, #TechBlog) → 더 넓은 도달범위 확보
'개발라이프' 카테고리의 다른 글
JavaScript에서 forEach에서 break 사용하기: 완벽 가이드 (0) | 2025.01.31 |
---|---|
SQLAlchemy: bulk_insert_mappings와 connection.execute 비교 및 성능 분석 (0) | 2025.01.16 |
Jupyter Notebook에서 ModuleNotFoundError 해결 방법 (1) | 2024.12.20 |
bfloat16 (Brain Floating Point 16-bit) (1) | 2024.12.20 |
머신러닝에서 데이터 라벨링이란? (0) | 2024.12.18 |