ZJONSSON/node-etl

Streaming tableA to tableB and updating streamed tableA records

iamvanja opened this issue · 0 comments

Hi,

I am wondering about the following scenario in MySQL.

const query = db.query(`SELECT * FROM table_a WHERE is_migrated=?`, [0])

query
  .stream()
  .pipe(
    stream.Transform({
      objectMode: true,
      transform: (row, encoding, callback) => {
        callback(null, processRow(row))
      }
    })
    .on('error', err => console.log('transform pipe error', err))
    .on('end', () => console.log('transform pipe end'))
  )
  .pipe(etl.mysql.upsert(pool, 'db_name', 'table_b'))
  // ... for all migrated records from table_a set is_migrated=1 
  .promise()
  .then(data => console.log('done', data))
  .catch(err => console.error('err:', err))

select from tableA where is_migrated=0
do transformations for each row
insert them to tableB
update tableA for all inserted records to tableB and set is_migrated=1

I have an idea of building an array of IDs and then doing a separate UPDATE, but it seems hacky, especially since the array could grow very quickly due to a big data set. Is there a better way of doing it?

Also, is it possible to use INSERT ... ON DUPLICATE KEY UPDATE for upsert? REPLACE INTO becomes an issue in our use case where it would eat up auto_increment IDs very quickly.