関係者 TaggedInputSplit /** * An {@link InputSplit} that tags another InputSplit with extra data for use * by {@link DelegatingInputFormat}s and {@link DelegatingMapper}s. */ DelegatingMapper /** * An {@link Mapper} that delegates behavior of paths to multiple other * mappers. */ DelegatingInputFormat /** * An {@link InputFormat} that delegates behavior of paths to multiple other * InputFormats. */ Driver 1. job.setMapperClass(DelegatingMapper.class) 2. job.setInputFormatClass(DelegatingInputFormat.class) DelegatingInputFormat 1. List<InputSplit> getSplits 1. splits.add(new TaggedInputSplit(pathSplit, conf, format.getClass(), mapperClass)) Mapper起動 1. MapTask 2. DelegatingMapper 3. DelegatingMapper#setup 4. TaggedInputSplit inputSplit = context.getInputSplit() 5. mapper = inputSplit.getMapperClass() の Reflection 6. mapper.run() MongoDBに適用してみたよ! こんな風にして試してみる .addInputPath(job, uriA, inputformat, mapperA) .addInputPath(job, uriB, inputformat, mapperB) .addInputPath(job, uriC, inputformat, mapperC) DelegatingInputFormatがいけてない・・・ どのPath(URI)に対して、どのMapperをマップするかがかけない仕様・・・ splitに載せる情報がおかしいので、当然、処理もおかしい DelegatingInputFormatごと修正 URIとMapperの情報を載せてsplitを生成するように変更 うまく動いた! その他のMongoDB使うときの変更点 1. Shard環境が変だったので修正→取り込まれ済み 2. Queryを指定した場合に、無いことが明白なChunkにもリクエストを投げて、応答を待ち、map処理が走る(3秒程度)→修正中 3. BSONDecoderをもうちょっとシンプルにしたい→検討中 hadongo MongoConfigUtil.setOutputURI( conf, "mongodb://localhost:27017/multiple.myout"); Job job = new Job(conf, "multipleJob"); MongoMultipleInputs.addInputPath(job, "mongodb://localhost:27017/multiple.in1", MongoInputFormat.class, MultipleMapper.class, "{name: {$lte: 10}}"); MongoMultipleInputs.addInputPath(job, "mongodb://localhost:27017/multiple.in2", MongoInputFormat.class, MultipleMapper2.class, "{name: {$gt:10, $lte: 20}}"); MongoMultipleInputs.addInputPath(job, "mongodb://localhost:27017/multiple.in3", MongoInputFormat.class, MultipleMapper3.class); log.info(job.getConfiguration().get("mongo.input.request")); job.setJarByClass(MultipleDriver.class); job.setReducerClass( MultipleReducer.class ); job.setOutputFormatClass( MongoOutputFormat.class ); job.setMapOutputKeyClass( BSONWritable.class ); job.setMapOutputValueClass( BSONWritable.class ); job.setOutputKeyClass( BSONWritable.class ); job.setOutputValueClass( BSONWritable.class );