반응형

fs.createReadStream + csv-parser 사용 시 비동기 처리에 주의하세요!


📌 문제 상황: 비동기 스트림에서 예상치 못한 동작

CSV 파일을 읽고, 특정 수량(quantity)만큼 데이터를 처리하려고 했지만, 모든 데이터가 처리됨. 😲

목표:

  • CSV 데이터에서 특정 개수(quantity)만 읽고 중단
  • 비동기 함수(await)가 정확히 작동하도록 수정

💥 문제 발생 코드: 비동기 흐름 제어 실패

import fs from 'fs';
import csv from 'csv-parser';

let count = 0;
const quantity = 20;
const winDataList: any[] = [];

fs.createReadStream(filePath)
  .pipe(csv())
  .on("data", async (data: CsvRow) => {
    if (count < quantity) {
      const email = data.email;
      const point = data.point;
      const choice = data.choice;
      const choiceImage = { a: "imagea", b: "imageb" }[choice]!;

      winDataList.push({ email, point, choice, choiceImage });
      console.log("Processing:", email, point);

      await updateEmailFile(email, fileName); // 비동기 작업
      await updateEmail(email);              // 비동기 작업

      count++;
    } else {
      return; // ❌ 스트림은 계속 실행됨
    }
  });

⚠️ 문제 원인:

  1. csv-parser는 비동기 흐름을 기다리지 않음.
  2. await 사용했지만, 스트림은 비동기 작업과 무관하게 계속 실행.
  3. 결과: 모든 CSV 데이터가 처리됨. 😱

해결 방법: 스트림 흐름을 명확히 제어

### 🎯 방법 1: stream.destroy()로 스트림 종료

count가 quantity에 도달하면 스트림 자체를 종료하여 추가 데이터를 읽지 않도록 합니다.

import fs from 'fs';
import csv from 'csv-parser';

let count = 0;
const quantity = 20;
const winDataList: any[] = [];

const stream = fs.createReadStream(filePath)
  .pipe(csv());

stream.on("data", async (data: CsvRow) => {
  if (count < quantity) {
    const email = data.email;
    const point = data.point;
    const choice = data.choice;
    const choiceImage = { a: "imagea", b: "imageb" }[choice]!;

    winDataList.push({ email, point, choice, choiceImage });
    console.log("Processing:", email, point);

    await updateEmailFile(email, fileName);
    await updateEmail(email);

    count++;

    if (count >= quantity) {
      stream.destroy(); // ✅ 스트림 완전 종료
    }
  }
});

stream.on("close", () => {
  console.log("Stream closed after processing", count, "items.");
});

stream.on("error", (err) => {
  console.error("Stream error:", err);
});

💡 장점:

  • 간단하고 빠른 구현
  • quantity에 도달하면 즉시 스트림 종료

단점:

  • 스트림 완전 종료: 이후 데이터는 절대 다시 읽을 수 없음

### 🎯 방법 2: pause() & resume()으로 정교한 제어

각 데이터 처리 시 스트림을 일시정지하고, 비동기 작업 완료 후 다시 재개합니다.

import fs from 'fs';
import csv from 'csv-parser';

let count = 0;
const quantity = 20;
const winDataList: any[] = [];

const stream = fs.createReadStream(filePath)
  .pipe(csv());

stream.on("data", async (data: CsvRow) => {
  if (count < quantity) {
    stream.pause(); // ✅ 데이터 흐름 일시 정지

    const email = data.email;
    const point = data.point;
    const choice = data.choice;
    const choiceImage = { a: "imagea", b: "imageb" }[choice]!;

    winDataList.push({ email, point, choice, choiceImage });
    console.log("Processing:", email, point);

    await updateEmailFile(email, fileName);
    await updateEmail(email);

    count++;

    if (count >= quantity) {
      stream.destroy(); // ✅ 최대 수량 도달 시 스트림 종료
    } else {
      stream.resume(); // ✅ 다음 데이터 읽기 재개
    }
  }
});

stream.on("close", () => {
  console.log("Stream closed after processing", count, "items.");
});

stream.on("error", (err) => {
  console.error("Stream error:", err);
});

💡 장점:

  • 비동기 흐름 제어 가능 (데이터 순서 보장)
  • 중간에 작업 재개 가능

단점:

  • 코드 복잡성 증가

🎯 방법 3: pipeline()으로 스트림 관리 최적화

Node.js 10+에서는 **stream.pipeline()**을 사용해 에러 핸들링비동기 흐름 관리를 더 효율적으로 처리할 수 있습니다.

import { createReadStream } from 'fs';
import { pipeline } from 'stream';
import csv from 'csv-parser';
import { promisify } from 'util';

const asyncPipeline = promisify(pipeline);

async function processCSV(filePath: string, quantity: number) {
  let count = 0;
  const winDataList: any[] = [];

  await asyncPipeline(
    createReadStream(filePath),
    csv(),
    async function* (source) {
      for await (const data of source) {
        if (count >= quantity) break;

        const email = data.email;
        const point = data.point;
        const choice = data.choice;
        const choiceImage = { a: "imagea", b: "imageb" }[choice]!;

        winDataList.push({ email, point, choice, choiceImage });
        console.log("Processing:", email, point);

        await updateEmailFile(email, fileName);
        await updateEmail(email);

        count++;
      }
    }
  );

  console.log("Processed", count, "records");
}

processCSV(filePath, 20).catch(console.error);

💡 장점:

  • 에러 핸들링 포함
  • 비동기 흐름 자연스럽게 제어 가능
  • 중간에 멈추거나 재시작 가능

단점:

  • Node.js 10+ 필요
  • 복잡한 로직에는 적합하지 않을 수 있음

결론: 어떤 방법을 선택할까?

방법장점단점추천 상황

stream.destroy() 빠르고 단순 이후 데이터 다시 읽기 불가 간단한 데이터 처리
pause() & resume() 비동기 흐름 제어, 순서 보장 코드 복잡성 증가 정교한 흐름 제어 필요 시
pipeline() 에러 핸들링, 비동기 흐름 최적화 Node.js 10+ 필요 대규모 스트림 작업

📊 💡 최종 추천

  • 빠르게 끝내야 한다면 → destroy()
  • 비동기 흐름을 제어하고 싶다면 → pause() & resume()
  • 에러 핸들링과 확장성을 고려한다면 → pipeline()

💬 마무리

Node.js의 스트림은 효율적인 데이터 처리를 가능하게 하지만, 비동기 흐름 관리에 유의해야 합니다. 특히 대용량 CSV 처리에서는 pause()와 destroy()를 적절히 사용하여 메모리 사용을 줄이고, 필요 시 pipeline()으로 에러 핸들링까지 고려하세요. 🚀

 

#NodeJS #JavaScript #CSVParsing #StreamAPI #AsyncProgramming #DataProcessing #BackendDevelopment #NodeJSTips #FullStackDevelopment #WebDevelopment #Asynchronous #CodingTips #DeveloperLife #CodeOptimization #ErrorHandling #NodeJSTutorial #BackendEngineer #ProgrammingTips #SoftwareEngineering #TechBlog

💡 활용 팁:

  • 기술 태그 (#NodeJS, #JavaScript, #StreamAPI) → 개발자 타겟팅
  • 문제 해결 태그 (#ErrorHandling, #CodeOptimization) → 검색 최적화
  • 커뮤니티 태그 (#DeveloperLife, #TechBlog) → 더 넓은 도달범위 확보
728x90
반응형

+ Recent posts