Port Spark-BAM features
tomwhite opened this issue · 6 comments
I have spent a couple of weeks looking at Spark-BAM and how we might incorporate the improvements in Hadoop-BAM itself so that ADAM and GATK can take advantage of them without relying on two libraries.
First, thank you @ryan-williams for the work you have done in Spark-BAM! Testing all the BAM files you could get your hands on is one of those great ideas that makes so much sense in hindsight.
The issues with Hadoop-BAM that Ryan identified are not all easily remedied with bug fixes. For example, supporting long reads would be very difficult with the current approach, since the BAM record guesser assumes that a read start will be found in a fixed number of BGZF blocks (3). While this number could be increased, it’s not clear how to make it work scalably, particularly since the logic has a very tight coupling with BGZF block boundary searching. Ryan’s approach is to separate the two problems: finding BGZF block boundaries and finding BAM record start positions, so they can be composed safely.
I have created a candidate that implements Spark-BAM's features, written in ~1000 lines of Java.
- BgzfBlockSource finds BGZF block boundaries using BgzfBlockGuesser (an existing Hadoop-BAM class that works well) and returns a JavaRDD.
- BamSource uses BgzfBlockSource to find the first BGZF block boundary in each partition, then looks for the first read starting in each partition (using BamRecordGuesser), before opening a BAM reader to iterate over the reads. Unlike the existing Hadoop-BAM code, this can search an arbitrarily number of bytes into the partition.
- BamRecordGuesser has the fixes detailed in Spark-BAM.
- Intervals are supported.
- The choice of a Hadoop or NIO filesystem for all filesystem access is strictly enforced using a FileSystemWrapper abstraction. While I realise the irony of creating yet another FS abstraction, in my defence this is an internal implementation detail.
- An enhanced version of htsjdk’s SeekableBufferedStream provides buffering for backward seeks which is critical for good performance on the Cloud.
This is still just a prototype, but I have tested it on HDFS and GCS with promising results that mirror Spark-BAM. Also, as an experiment I ran GATK’s ReadsSparkSource unit test against this code and all tests passed.
My proposed home for this would be a new (optional) package in Hadoop BAM. I could be convinced of other approaches if anyone has ideas.
@tomwhite There also seems to be an issue with CRAMs not just BAMs that needs some attention.
More details on the issue can be found here: broadinstitute/gatk#4506
There is a big delay (30-40 minutes) when processing CRAMs compared to BAMs using the GATK spark tools. These are 30x WGS samples. No such delay occurs for BAMs.
So I am curious how does Spark-Bam time compares for processing CRAMs and BAMs. Would this help with the GATK performance issue? Or is this slow cram processing on the GATK end of things.
@jjfarrell thanks for bringing this to my attention. The slow processing is in Hadoop-BAM. My candidate code is only for BAM files at the moment, but adding an efficient CRAM reader would be useful (and a lot easier than BAM since no guessing is required).
I've now aded VCF support, and support for reading CRAM which uses .crai indexes (which works a lot faster). See https://github.com/tomwhite/Hadoop-BAM/tree/new-bam-other-formats.
Hi @tomwhite! I see in your branch that you've integrated a file merger; we have all of this code in ADAM already (both sequential and parallel). In the spirit of not reinventing the wheel, is there a good way for us to avoid re-adding this code in Hadoop-BAM? We'd generally be glad to move the code upstream from ADAM into bdg-utils if that would make it more accessible. This code has been thoroughly tested in real world usage with ADAM, is more functionally complete than the code you have (e.g., supports merging CRAM as well), and from a cursory overview, I'm fairly certain that your implementation contains errors (e.g., the concat method will throw an error if any block other than the last block is not precisely the size of a HDFS block, and you don't appear to have any code that shuffles bytes around to meet that invariant).
I see that you linked to this from the project governance ticket. As you note in your README, this code is fairly logically distinct from Hadoop-BAM. I know you're in an initial prototyping stage, but I think that any additional work to flesh this architecture out would benefit from a team approach. To be direct about it, I see a few architectural issues in this codebase (around how you're managing header file metadata, how you're writing the data sinks) that would hinder us from adopting it inside of both ADAM/BDG and Databricks (wearing both my OSS and corporate hats).
Anyways, I'd love to see how I and others could contribute to make this effort a win for all of us in the genomics-on-Spark community, but as much as we've been pushing from BDG towards a sustained commitment to a more collaborative project governance model (#180), there frankly hasn't been much progress. I think the ball is in your court to propose a route forward.
Hi @fnothaft , I hope we can collaborate on this common code. As you've seen in the governance ticket, I've scheduled a call to discuss what to do next. I think there are a variety of views and requirements, but I hope we can find a compromise somewhere.
the concat method will throw an error if any block other than the last block is not precisely the size of a HDFS block
I think that restriction was lifted in Hadoop 2.7.0: https://issues.apache.org/jira/browse/HDFS-3689
I would like to hear more about the architectural issues that you've uncovered.
The way I see it, everyone has gone off and done their own thing to explore the problem space. That is fine but now we have a lot of different approaches that miss out on lessons learned in the other ones. There are only 25 or so folks in the world doing genomics on Spark, and maybe only 10 of them are any good at it (I exclude myself from this group); I don't see any reason to have 25 different codebases.