The HDFS file upload utility class may have the possibility of data loss.
Minnull opened this issue · 5 comments
经过对hdfs文件上传时的代码逻辑分析,看代码存在可能丢数据的风险
def upload(localPath: String, remotePath: String, namenode: String = null): Unit = {
try {
val localFile = new File(localPath)
if (!localFile.exists() || localFile.length() <= 0) {
return
}
} catch {
case e: Throwable =>
LOG.warn("check for empty local file error, but you can ignore this check error. " +
"If there is empty sst file in your hdfs, please delete it manually",
e)
}
val system = getFileSystem(namenode)
system.copyFromLocalFile(new Path(localPath), new Path(remotePath))
}
(1)校验文件不存在,看调用逻辑,只有文件生成后才会上传文件,如果上传的时候文件不存在了,也没有抛出异常和记录,这部分代码相当于把异常吃掉了,感觉存在丢数据风险
if (!localFile.exists() || localFile.length() <= 0) {
return}
(2)catch阶段感觉存在同样未对异常处理的问题
} catch {
case e: Throwable =>
LOG.warn("check for empty local file error, but you can ignore this check error. " +
"If there is empty sst file in your hdfs, please delete it manually",
e)
}
(3)经过测试,发现目前文件存在并发情况下被其他任务删除的场景下,hdfs会上传存在size为0的文件,影响ingest
解决办法
个人认为是否需要把这些异常统一抛出去,executor执行情况下收到异常将会kill容器,重试task保证数据完整性
期望回复
感谢回复,我会尽可能的参与修复这个问题的工作之中。
不急不急哈~~~ take your time :)