Streaming tableA to tableB and updating streamed tableA records
iamvanja opened this issue · 0 comments
iamvanja commented
Hi,
I am wondering about the following scenario in MySQL.
const query = db.query(`SELECT * FROM table_a WHERE is_migrated=?`, [0])
query
.stream()
.pipe(
stream.Transform({
objectMode: true,
transform: (row, encoding, callback) => {
callback(null, processRow(row))
}
})
.on('error', err => console.log('transform pipe error', err))
.on('end', () => console.log('transform pipe end'))
)
.pipe(etl.mysql.upsert(pool, 'db_name', 'table_b'))
// ... for all migrated records from table_a set is_migrated=1
.promise()
.then(data => console.log('done', data))
.catch(err => console.error('err:', err))
select from tableA where is_migrated=0
do transformations for each row
insert them to tableB
update tableA for all inserted records to tableB and set is_migrated=1
I have an idea of building an array of IDs and then doing a separate UPDATE, but it seems hacky, especially since the array could grow very quickly due to a big data set. Is there a better way of doing it?
Also, is it possible to use INSERT ... ON DUPLICATE KEY UPDATE
for upsert
? REPLACE INTO
becomes an issue in our use case where it would eat up auto_increment IDs very quickly.