1단계: MongoDB 커서
커서를 설정하는 방법은 다음과 같습니다(스니펫 재사용).
const cursor = userObject?.data?.serviceProviderName === 'ZYRO' ? zyroTransactionModel.find(query).cursor() : finoTransactionModel.find(query).cursor(); console.log("Cursor created successfully");
2단계: ZIP 파일 설정
yazl 라이브러리를 사용하여 CSV 데이터를 ZIP 파일로 스트리밍합니다.
const yazl = require('yazl'); const zipfile = new yazl.ZipFile(); reply.raw.writeHead(200, { "Content-Type": "application/zip", "Content-Disposition": "attachment; filename=transactions.zip", }); zipfile.outputStream.pipe(reply.raw); const cleanup = async () => { console.log("Cleaning up resources..."); zipfile.end(); // Finalize ZIP await cursor.close(); }; reply.raw.on("close", cleanup); reply.raw.on("error", cleanup);
3단계: 동적 CSV 스트림 생성
CSV 데이터를 동적으로 생성하고 ZIP 파일로 스트리밍합니다.
const createNewCSVStream = (headers) => { const csvStream = new Readable({ read() {} }); csvStream.push(headers.join(",") + "\n"); // Add headers return csvStream; }; const filteredHeaders = getHeaders(transactionDownloadFields, userObject?.state?.auth?.role); const currentCSVStream = createNewCSVStream(filteredHeaders); zipfile.addReadStream(currentCSVStream, "transactions_part_1.csv");
4단계: MongoDB 데이터를 CSV로 스트리밍
MongoDB의 데이터를 CSV로 직접 스트리밍합니다.
cursor.on('data', (doc) => { const csvRow = filteredHeaders.map(header => doc[header.key] || '').join(','); currentCSVStream.push(csvRow + '\n'); // Write row }); cursor.on('end', () => { currentCSVStream.push(null); // End the stream zipfile.end(); // Finalize the ZIP });
5단계: MongoDB 커서에서 데이터 처리
MongoDB 커서에서 문서를 스트리밍하고, 필요에 따라 변환하고, CSV 스트림에 동적으로 행을 씁니다.
try { for await (const doc of cursor) { if (clientDisconnected) { console.log("Client disconnected. Stopping processing..."); break; } streamedCount++; rowCount++; let row = ""; const filteredHeaders = getHeaders( transactionDownloadFields, userObject?.state?.auth?.role ); for (let i = 0; i < filteredHeaders.length; i++) { const field = filteredHeaders[i]; // Fetch the corresponding field configuration from transactionDownloadFields const originalField = transactionDownloadFields.find((f) => f.value === field.value); // Get the value from the transaction document let value = getValueFromTransaction(doc, field.value); // Apply transformation if the field has a transform function if (originalField?.transform) { value = originalField.transform(value); } // Enclose the value in double quotes value = value !== undefined ? `"${value}"` : '"N/A"'; row += (i > 0 ? "," : "") + value; } row += "\n"; currentCSVStream.push(row); // Check if the row count has reached the threshold for the current CSV file if (rowCount >= MAX_ROWS_PER_FILE) { console.log(`Threshold reached for file ${fileIndex - 1}. Starting new file...`); currentCSVStream.push(null); // End the current CSV stream currentCSVStream = createNewCSVStream(); // Start a new stream rowCount = 0; // Reset the row count } } // Finalize the current CSV stream if it has data if (currentCSVStream) { currentCSVStream.push(null); } // Finalize the ZIP file zipfile.end(); console.log(`Successfully streamed ${streamedCount} rows across ${fileIndex - 1} files.`); } catch (error) { console.error("Error during processing:", error); if (!headersSent) reply.status(500).send({ error: "Failed to generate ZIP file" }); } finally { // Cleanup: Close the MongoDB cursor await cursor.close().catch((err) => console.error("Error closing cursor:", err)); }
요약
await...of를 사용하여 문서 반복:
MongoDB 커서에서 문서를 하나씩 효율적으로 스트리밍합니다.
모든 데이터를 메모리에 로드하지 않고 실시간 처리가 가능합니다.
filteredHeaders를 반복하여 각 행을 동적으로 구성합니다.
transactionDownloadFields에 정의된 경우 변환 함수를 사용하여 변환을 적용합니다.
행 임계값 및 파일 분할:
임계값(MAX_ROWS_PER_FILE)을 기준으로 행 수를 모니터링합니다.
현재 CSV 스트림을 종료하고 임계값에 도달하면 새 CSV 스트림을 시작합니다.
처리 중 문제가 발생하면 오류 응답을 기록하고 보냅니다.
finally 블록에서 MongoDB 커서를 닫아 적절한 정리를 보장합니다.
현재 CSV 스트림을 종료하려면 null을 푸시합니다.
모든 행이 처리되면 ZIP 파일을 완성합니다.
위 내용은 스토리지에서 스트림으로: MongoDB 데이터를 사용자에게 직접 전달의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!