tuplejump/calliope

UUID transform - java.nio.charset.MalformedInputException: Input length = 1

tsindot opened this issue · 2 comments

First, great project and am enjoying working with it. I'm about 99.99999% sure this is an error on my part not of Calliope.

I have a fairly simple column family in Cassandra 2.0.6 that looks like this:

CREATE TABLE IF NOT EXISTS report (
  owner_id uuid, 
  report_id text,
  ts timeuuid,
  device_id text,
  tstat_data map<text, text>,
  PRIMARY KEY ((owner_id), report_id, ts)
) WITH
  clustering order by(report_id DESC, ts DESC);

I have a spec that looks like this and is failing with the below stack trace. I had the same issue when using the CqlPagingInputFormat myself and fixed it by using the UUIDSerializer, but that does not seem to work here. I’m sure this has to be doable with Calliope, just not sure what I am missing here.


  "calliope" should {
    "should be able to build and process RDD[U]" in { 

      import Cql3CRDDTransformers._

      val cas = CasBuilder.cql3.withColumnFamily(TEST_KEYSPACE, TEST_INPUT_COLUMN_FAMILY)

      val casrdd = sc.cql3Cassandra[Report](cas)

      val result = casrdd.collect().toList

      result must have length (3)
      result should contain(Report(UUID.fromString("62c36092-82a1-3a00-93d1-46196ee77204"), 
          "reset_mode", "3769f4e0-f969-11e3-bab7-d9c59d15e0ae", "deviceid12345"))
    }

……

object Cql3CRDDTransformers {

  import com.tuplejump.calliope.utils.RichByteBuffer._

  implicit def row2String(key: ThriftRowKey, row: ThriftRowMap): List[String] = {
    row.keys.toList
  }

 implicit def ReportToKeys(r: Report): CQLRowKeyMap = {
   println("here")
    Map("owner_id" -> r.ownerId)
  }

  implicit def cql3Row2Report(keys: CQLRowKeyMap, values: CQLRowMap): Report = {
    println(s"cql3Row2Report, keys : $keys, values : $values")
    Report(UUIDSerializer.instance.deserialize(keys.get("owner_id").get), keys.get("report_id").get, keys.get("ts").get, values.get("device_id").get)
  }

  implicit def cql3Row2ReportFields(keys: CQLRowKeyMap, values: CQLRowMap): (UUID, String, String) = {
    println(s"cql3RowReportFields, keys : $keys, values : $values")
    (UUIDSerializer.instance.deserialize(keys.get("owner_id").get), keys.get("report_id").get, values.get("device_id").get)
  }
}

case class Report(ownerId: UUID, reportType: String, ts: String, deviceId: String)

Stacktrace:

[spark-akka.actor.default-dispatcher-4] [INFO] [2014-06-23 12:46:36,367] a.e.s.Slf4jLogger: Slf4jLogger started
[spark-akka.actor.default-dispatcher-3] [INFO] [2014-06-23 12:46:36,408] Remoting: Starting remoting
[spark-akka.actor.default-dispatcher-4] [INFO] [2014-06-23 12:46:36,563] Remoting: Remoting started; listening on addresses :[akka.tcp://spark@10.141.0.218:60675]
[spark-akka.actor.default-dispatcher-5] [INFO] [2014-06-23 12:46:36,566] Remoting: Remoting now listens on addresses: [akka.tcp://spark@10.141.0.218:60675]
[main] [INFO] [2014-06-23 12:46:36,744] o.e.j.s.Server: jetty-8.1.14.v20131031
[main] [INFO] [2014-06-23 12:46:36,761] o.e.j.s.AbstractConnector: Started SocketConnector@0.0.0.0:60677
[main] [INFO] [2014-06-23 12:46:36,769] o.e.j.s.Server: jetty-8.1.14.v20131031
[main] [INFO] [2014-06-23 12:46:36,770] o.e.j.s.AbstractConnector: Started SocketConnector@0.0.0.0:60678
[main] [INFO] [2014-06-23 12:46:37,053] o.e.j.s.Server: jetty-8.1.14.v20131031
[main] [INFO] [2014-06-23 12:46:37,064] o.e.j.s.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
2014-06-23 12:46:37.160 java[12263:1703] Unable to load realm info from SCDynamicStore
cql3Row2Report, keys : Map(owner_id -> java.nio.HeapByteBuffer[pos=94 lim=110 cap=1217], report_id -> java.nio.HeapByteBuffer[pos=134 lim=144 cap=1217], ts -> java.nio.HeapByteBuffer[pos=161 lim=177 cap=1217]), values : Map(device_id -> java.nio.HeapByteBuffer[pos=201 lim=214 cap=1217], tstat_data -> java.nio.HeapByteBuffer[pos=239 lim=263 cap=1217])
[Executor task launch worker-0] [ERROR] [2014-06-23 12:46:40,076] o.a.s.e.Executor: Exception in task ID 209
java.nio.charset.MalformedInputException: Input length = 1
    at java.nio.charset.CoderResult.throwException(CoderResult.java:277) ~[na:1.7.0_21]
    at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:798) ~[na:1.7.0_21]
    at org.apache.cassandra.utils.ByteBufferUtil.string(ByteBufferUtil.java:167) ~[cassandra-all-2.0.6.jar:2.0.6]
    at org.apache.cassandra.utils.ByteBufferUtil.string(ByteBufferUtil.java:124) ~[cassandra-all-2.0.6.jar:2.0.6]
    at com.tuplejump.calliope.utils.RichByteBuffer$.ByteBuffer2String(RichByteBuffer.scala:45) ~[calliope_2.10.jar:0.9.4-EA-SNAPSHOT]
    at ies.fortcampbell.batch.calliope.Cql3CRDDTransformers$.cql3Row2Report(SparkCalliopeSpec.scala:99) ~[test-classes/:na]
    at ies.fortcampbell.batch.calliope.SparkCalliopeSpec$$anonfun$1$$anonfun$apply$2$$anonfun$2.apply(SparkCalliopeSpec.scala:45) ~[test-classes/:na]
    at ies.fortcampbell.batch.calliope.SparkCalliopeSpec$$anonfun$1$$anonfun$apply$2$$anonfun$2.apply(SparkCalliopeSpec.scala:45) ~[test-classes/:na]
    at com.tuplejump.calliope.cql3.KVUnmarshaller.unmarshall(Cql3CassandraRDD.scala:41) ~[calliope_2.10.jar:0.9.4-EA-SNAPSHOT]
    at com.tuplejump.calliope.cql3.Cql3CassandraRDD$$anon$1.next(Cql3CassandraRDD.scala:97) ~[calliope_2.10.jar:0.9.4-EA-SNAPSHOT]
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[na:na]
    at com.tuplejump.calliope.cql3.Cql3CassandraRDD$$anon$1.foreach(Cql3CassandraRDD.scala:66) ~[calliope_2.10.jar:0.9.4-EA-SNAPSHOT]
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) ~[na:na]
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) ~[na:na]
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) ~[na:na]
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) ~[na:na]
    at com.tuplejump.calliope.cql3.Cql3CassandraRDD$$anon$1.to(Cql3CassandraRDD.scala:66) ~[calliope_2.10.jar:0.9.4-EA-SNAPSHOT]
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) ~[na:na]
    at com.tuplejump.calliope.cql3.Cql3CassandraRDD$$anon$1.toBuffer(Cql3CassandraRDD.scala:66) ~[calliope_2.10.jar:0.9.4-EA-SNAPSHOT]
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) ~[na:na]
    at com.tuplejump.calliope.cql3.Cql3CassandraRDD$$anon$1.toArray(Cql3CassandraRDD.scala:66) ~[calliope_2.10.jar:0.9.4-EA-SNAPSHOT]
    at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717) ~[spark-core_2.10-1.0.0.jar:1.0.0]
    at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717) ~[spark-core_2.10-1.0.0.jar:1.0.0]
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080) ~[spark-core_2.10-1.0.0.jar:1.0.0]
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080) ~[spark-core_2.10-1.0.0.jar:1.0.0]
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) ~[spark-core_2.10-1.0.0.jar:1.0.0]
    at org.apache.spark.scheduler.Task.run(Task.scala:51) ~[spark-core_2.10-1.0.0.jar:1.0.0]
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) ~[spark-core_2.10-1.0.0.jar:1.0.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_21]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_21]
    at java.lang.Thread.run(Thread.java:722) [na:1.7.0_21]
[Result resolver thread-1] [WARN] [2014-06-23 12:46:40,082] o.a.s.s.TaskSetManager: Lost TID 209 (task 0.0:209)

TIA for the assistance I’m sure I am just missing something basic here but it is not jumping out at me right now.

Regards,
Todd

@tsindot
Can you give these UUID serde a try?

implicit def ByteBuffer2UUID(buffer: ByteBuffer): UUID = new UUID(buffer.getLong(buffer.position()), buffer.getLong(buffer.position() + 8))

implicit def UUID2ByteBuffer(uuid: UUID): ByteBuffer = ByteBufferUtil.bytes(uuid)

@milliondreams

The implicit's provided do work as does the UUIDSerializer, the actual field causing the problem was the "ts", timestamp, just found it after adding second test that only referenced 3 fields.

Thanks for the quick response.

Regards,
Todd