xiaofateng/BinlogUpdatetoHive

update操作并没有生效和delta文件已经存在

Opened this issue · 5 comments

好,

本人在您的博客和git上,看到了你关于mysql数据实时增量导入hive的解决方案。本在学习了您的方案并进行了一些实验,发现了以下的问题, 劳烦大牛繁忙之中能够授业解惑,感激不尽!!!

  1. 一个update操作并没有修改原来的记录,而是变成了insert。 是不是我缺了什么配置声明,造成数据没有合并???

输入:
====INSERT,1,uuid1,link_uuid1
====INSERT,2,uuid2,link_uuid2
====INSERT,3,uuid3,link_uuid3
====INSERT,4,uuid4,link_uuid4
====UPDATE,1,x_uuid1,x_link_uuid1
====UPDATE,2,x_uuid2,x_link_uuid2
====UPDATE,3,x_uuid3,x_link_uuid3

数据插入后,hive中的数据:

> select * from debt_loan_order;

OK
4 uuid4 link_uuid4 20170803
1 uuid1 link_uuid1 20170803
1 x_uuid1 x_link_uuid1 20170803
2 uuid2 link_uuid2 20170803
2 x_uuid2 x_link_uuid2 20170803
3 uuid3 link_uuid3 20170803
3 x_uuid3 x_link_uuid3 20170803
Time taken: 0.43 seconds, Fetched: 7 row(s)

hive表:
create table test.debt_loan_order
(id bigint,uuid string,link_uuid string)
partitioned by(day int)
clustered by (id) into 4 buckets
row format delimited
fields terminated by ','
stored as orc TBLPROPERTIES ('transactional'='true');

  1. hive中的事务表,数据按第一列id 进行分成4个桶,但是当插入id为5的数据插入时,报错FileAlreadyExistsException: delta_0000027_0000027/bucket_00001 已经存在。 估计是id为5的记录会映射到bucket_00001,在此创建delta_0000027_0000027/bucket_00001 发现存在。 本的疑问是,为什么存在了,不直接将记录加入bucket_00001,非要再次创建,造成报错。
    具体异常信息:

rdd_data INSERT===5
17/08/06 10:42:38 INFO impl.PhysicalFsWriter: ORC writer created for path: hdfs://hnode7:8020/user/hive2/warehouse/test.db/debt_loan_order/day=20170803/delta_0000027_0000027/bucket_00001 with stripeSize: 8388608 blockSize: 268435456 compression: ZLIB bufferSize: 32768
org.apache.hive.hcatalog.streaming.mutate.worker.WorkerException: Failed to insert record 'MutableRecord [id=5, rowId={originalTxn: -1, bucket: 1, row: -1}] using mutator 'ObjectInspectorMutator [transactionId=27, partitionPath=hdfs://hnode7:8020/user/hive2/warehouse/test.db/debt_loan_order/day=20170803, bucketId=1]'.
at org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinator.insert(MutatorCoordinator.java:117)
at com.batch.BinlogtoHiveInBatchNoSpark$$anonfun$main$1.apply(BinlogtoHiveInBatchNoSpark.scala:124)
at com.batch.BinlogtoHiveInBatchNoSpark$$anonfun$main$1.apply(BinlogtoHiveInBatchNoSpark.scala:107)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at com.batch.BinlogtoHiveInBatchNoSpark$.main(BinlogtoHiveInBatchNoSpark.scala:107)
at com.batch.BinlogtoHiveInBatchNoSpark.main(BinlogtoHiveInBatchNoSpark.scala)
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: /user/hive2/warehouse/test.db/debt_loan_order/day=20170803/delta_0000027_0000027/bucket_00001 for client 10.10.106.23 already exists

本人跟了以下代码,,发现OrcRecordUpdater.insert时 有个this.acidOperationalProperties.isSplitUpdate() ,这个控制是否创建,但是不知道是哪个配置可以让这个逻辑生效。

at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:344)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:920)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:901)
at org.apache.orc.impl.PhysicalFsWriter.<init>(PhysicalFsWriter.java:90)
at org.apache.orc.impl.WriterImpl.<init>(WriterImpl.java:186)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.<init>(WriterImpl.java:94)
at org.apache.hadoop.hive.ql.io.orc.OrcFile.createWriter(OrcFile.java:320)
at org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater.addSimpleEvent(OrcRecordUpdater.java:372)
at org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater.insert(OrcRecordUpdater.java:436)
at org.apache.hive.hcatalog.streaming.mutate.worker.MutatorImpl.insert(MutatorImpl.java:54)
at org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinator.insert(MutatorCoordinator.java:114)

最后,再次感谢授业解惑!! 如果能添加本人qq【344589799】进行指导,更是感谢!!

同遇到这个问题,一个事务插入时候,插入超过bucket 的数据时候就会报 bucket文件已存在,不知道是哪里设置

cadl commented

@wind-org @ghzj0615

hive中的事务表,数据按第一列id 进行分成4个桶,但是当插入id为5的数据插入时,报错FileAlreadyExistsException: delta_0000027_0000027/bucket_00001 已经存在。 估计是id为5的记录会映射到bucket_00001,在此创建delta_0000027_0000027/bucket_00001 发现存在。 本的疑问是,为什么存在了,不直接将记录加入bucket_00001,非要再次创建,造成报错。
具体异常信息:

可能需要将 bucketId 分组 insert。

https://cwiki.apache.org/confluence/display/Hive/HCatalog+Streaming+Mutation+API

文档中有写:

Yes, all mutated records have existing RecordIdentifiers and must be grouped by [partitionValues, bucketId] and sorted by lastTxnId. These record coordinates initially arrive in an order that is effectively random.

lijif commented

您好,我想我问一下这个能实现同步删除到hive吗

好,

本人在您的博客和git上,看到了你关于mysql数据实时增量导入hive的解决方案。本在学习了您的方案并进行了一些实验,发现了以下的问题, 劳烦大牛繁忙之中能够授业解惑,感激不尽!!!

  1. 一个update操作并没有修改原来的记录,而是变成了insert。 是不是我缺了什么配置声明,造成数据没有合并???

输入:
====INSERT,1,uuid1,link_uuid1
====INSERT,2,uuid2,link_uuid2
====INSERT,3,uuid3,link_uuid3
====INSERT,4,uuid4,link_uuid4
====UPDATE,1,x_uuid1,x_link_uuid1
====UPDATE,2,x_uuid2,x_link_uuid2
====UPDATE,3,x_uuid3,x_link_uuid3

数据插入后,hive中的数据:

> select * from debt_loan_order;

OK
4 uuid4 link_uuid4 20170803
1 uuid1 link_uuid1 20170803
1 x_uuid1 x_link_uuid1 20170803
2 uuid2 link_uuid2 20170803
2 x_uuid2 x_link_uuid2 20170803
3 uuid3 link_uuid3 20170803
3 x_uuid3 x_link_uuid3 20170803
Time taken: 0.43 seconds, Fetched: 7 row(s)

hive表:
create table test.debt_loan_order
(id bigint,uuid string,link_uuid string)
partitioned by(day int)
clustered by (id) into 4 buckets
row format delimited
fields terminated by ','
stored as orc TBLPROPERTIES ('transactional'='true');

  1. hive中的事务表,数据按第一列id 进行分成4个桶,但是当插入id为5的数据插入时,报错FileAlreadyExistsException: delta_0000027_0000027/bucket_00001 已经存在。 估计是id为5的记录会映射到bucket_00001,在此创建delta_0000027_0000027/bucket_00001 发现存在。 本的疑问是,为什么存在了,不直接将记录加入bucket_00001,非要再次创建,造成报错。
    具体异常信息:

rdd_data INSERT===5
17/08/06 10:42:38 INFO impl.PhysicalFsWriter: ORC writer created for path: hdfs://hnode7:8020/user/hive2/warehouse/test.db/debt_loan_order/day=20170803/delta_0000027_0000027/bucket_00001 with stripeSize: 8388608 blockSize: 268435456 compression: ZLIB bufferSize: 32768
org.apache.hive.hcatalog.streaming.mutate.worker.WorkerException: Failed to insert record 'MutableRecord [id=5, rowId={originalTxn: -1, bucket: 1, row: -1}] using mutator 'ObjectInspectorMutator [transactionId=27, partitionPath=hdfs://hnode7:8020/user/hive2/warehouse/test.db/debt_loan_order/day=20170803, bucketId=1]'.
at org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinator.insert(MutatorCoordinator.java:117)
at com.batch.BinlogtoHiveInBatchNoSpark$$anonfun$main$1.apply(BinlogtoHiveInBatchNoSpark.scala:124)
at com.batch.BinlogtoHiveInBatchNoSpark$$anonfun$main$1.apply(BinlogtoHiveInBatchNoSpark.scala:107)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at com.batch.BinlogtoHiveInBatchNoSpark$.main(BinlogtoHiveInBatchNoSpark.scala:107)
at com.batch.BinlogtoHiveInBatchNoSpark.main(BinlogtoHiveInBatchNoSpark.scala)
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: /user/hive2/warehouse/test.db/debt_loan_order/day=20170803/delta_0000027_0000027/bucket_00001 for client 10.10.106.23 already exists

本人跟了以下代码,,发现OrcRecordUpdater.insert时 有个this.acidOperationalProperties.isSplitUpdate() ,这个控制是否创建,但是不知道是哪个配置可以让这个逻辑生效。

at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:344)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:920)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:901)
at org.apache.orc.impl.PhysicalFsWriter.<init>(PhysicalFsWriter.java:90)
at org.apache.orc.impl.WriterImpl.<init>(WriterImpl.java:186)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.<init>(WriterImpl.java:94)
at org.apache.hadoop.hive.ql.io.orc.OrcFile.createWriter(OrcFile.java:320)
at org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater.addSimpleEvent(OrcRecordUpdater.java:372)
at org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater.insert(OrcRecordUpdater.java:436)
at org.apache.hive.hcatalog.streaming.mutate.worker.MutatorImpl.insert(MutatorImpl.java:54)
at org.apache.hive.hcatalog.streaming.mutate.worker.MutatorCoordinator.insert(MutatorCoordinator.java:114)

最后,再次感谢授业解惑!! 如果能添加本人qq【344589799】进行指导,更是感谢!!

你好,以你的实验结果来看,这种方式的update、delete操作性能如何?另外不能update是不是和建表时,表的文件不是orc有关?