Nodejs Mysql 과 StreamsteemCreated with Sketch.

in #kr-dev6 years ago

Node.js Stream 을 이용하여 .pipe() 을 거는 코딩을 했다. 꿀잼이었다.

Node.js 의 Stream 문서에 나와있는 인터페이스를 잘 활용하면 어렵지 않게 할 수 있다.

Stream 에는 크게 Readable Stream 과 Writable Stream 이 있고, 이 두개를 잘 합친 Duplex 와 Transform 이 있다.

Node.js Mysql Stream

내가 하고 싶은 작업은 mysql DB 의 row 들을 csv 포맷으로 S3에 덤프를 뜨는 것이었다.

그러기 위해서 Readable Stream 2개를 연결해야 하는데, 커스텀 포매팅이 필요했다.

이를 Transform stream 을 이용하여 구현하였다.

const Transform = require('stream').Transform;

let s3 = new AWS.S3({...});

let transformStream = new Transform({
    writableObjectMode: true
});

transformStream._transform = function transfrom(chunk, encoding, callback) {
    let line = Object.keys(chunk).map((key) => {
        if(chunk[key] instanceof Date) return moment(chunk[key]).format('YYYY-MM-DD HH:mm:ss');
        return chunk[key];
    }).join(',') + '\n';
    this.push(line);
    callback(null);
}

function mysqlToS3(sql, cb) {
    conn = mysql.createConnection(...);

    conn.query(sql)
        .stream()
        .pipe(transformStream);

    s3.upload({
        Bucket: BUCKET_NAME,
        Key: KEY_NAME,
        Body: transformStream,
        ACL: '...'
    }, (err, data) => {
        console.log(data);
        cb(err);
    });
}

이런 식으로 pipe 을 이용하여 row 들을 S3로 stream 할 수 있었다.

Transform stream 을 쓰지 않을 땐 이걸 file 로 떨군 뒤 다시 읽었는데, 생각해보니 굳이 그럴 필요가 없어서 이 방법이 더 좋았던 것 같다.

objectMode and mysql

stream 의 옵션 중 하나인 objectMode는 stream 을 object 로 전달할 것인가? 아니면 Buffer/string 으로 전달할 것인가? 에 대한 옵션이다.

writableObjectMode: true

위의 코드에서 이 옵션을 제거하게 되면 동작을 하지 않는다.

잘 생각해보면 이유를 알 수 있다.

objectMode 를 끄게 되면 serialized 된 포맷으로 전송되게 된다. 이 data stream 을 보내는 주체는 Mysql Server 이며, 따라서 objectMode: false 인 경우 chunk 를 사용자가 직접 파싱해야 한다.

그 파서는 Mysql data Parser 와 똑같다. 즉, Mysql spec 을 잘 이해하고 있어야 해당 chunk 를 파싱할 수 있는 것이다.

따라서 nodejs 의 mysql 모듈의 경우는 강제적으로 objectMode: true 로 세팅해버린다.

Query.prototype.stream = function(options) {
    var self = this,
        stream;

    options = options || {};
    options.objectMode = true; //// 요기다
    stream = new Readable(options);

    stream._read = function() {
        self._connection && self._connection.resume();
    };

    this.on('result',function(row,i) {
        if (!stream.push(row)) self._connection.pause();
        stream.emit('result',row,i);  // replicate old emitter
    });

    /* 이하 생략 */

잘 생각해보면 맞는 구현인 것 같다. mysql chunk 를 직접 파싱하는 일은 절대 필요가 없다.

Further Study

Node.js Stream 역시 EventEmitter 의 wrapper 이다. 따라서 EventEmitter 를 더 보아야 더 깊은 이해가 가능할 듯 하다.

Coin Marketplace

STEEM 0.28
TRX 0.12
JST 0.033
BTC 70153.57
ETH 3783.62
USDT 1.00
SBD 3.72