Netflix/aegisthus

Any non-pig decoder/reader for reading aegisthus output json?

ggjoshi opened this issue · 9 comments

I am evaluating aegisthus for converting sstable files to json. I got it to run and I see some output as pasted below but it seems to be encoded in some way i.e. it is not human readable. I saw that there is a pig loader for the output json. But in our data pipeline we use map reduce and it will be useful if we can read and process data in map reduce. Is there any reader/decoder for the aegisthus output json?

fffaa747-5d2d-d775-9be1-3b0866ddbbe4 {"fffaa747-5d2d-d775-9be1-3b0866ddbbe4":
{"deletedAt":-9223372036854775808,"columns":[["\u0000\u0003DUA\u0000\u0000\b\u0000\u0000\u0001QZָ\u0000\u0000\u0000\nIMPRESSION\u0000\u0000\u0006MOBILE\u0000\u0000\tP_21_MORE\u0000\u0000\t0008:004g\u0000\u0000\u0007UNKNOWN\u0000\u0000\u0005count\u0000","00018000b44de3a6dc034ebb8e9721a9839e97a4000000000000000100000000000000c8",1445301293089000,"c",-9223372036854775808],["\u0000\u0003DUA\u0000\u0000\b\u0000\u0000\u0001QZָ\u0000\u0000\u0000\nIMPRESSION\u0000\u0000\u0006MOBILE\u0000\u0000\tP_21_MORE\u0000\u0000\t0009:003n\u0000\u0000\u0007UNKNOWN\u0000\u0000\u0005count\u0000","00018000b44de3a6dc034ebb8e9721a9839e97a4000000000000000100000000000000c8",1445301293089000,"c",-9223372036854775808],["\u0000\u0003DUA\u0000\u0000\b\u0000\u0000\u0001QZָ\u0000\u0000\u0000\nIMPRESSION\u0000\u0000\u0006MOBILE\u0000\u0000\tP_21_MORE\u0000\u0000\t000a:004f\u0000\u0000\u0007UNKNOWN\u0000\u0000\u0005count\u0000","00018000b44de3a6dc034ebb8e9721a9839e97a4000000000000000100000000000000c8",1445301293089000,"c",-9223372036854775808],["\u0000\u0003DUA\u0000\u0000\b\u0000\u0000\u0001QZָ\u0000\u0000\u0000\nIMPRESSION\u0000\u0000\u0006MOBILE\u0000\u0000\tP_21_MORE\u0000\u0000\t000e:004d\u0000\u0000\u0007UNKNOWN\u0000\u0000\u0005count\u0000","00018000b44de3a6dc034ebb8e9721a9839e97a4000000000000000100000000000000c8",1445301293089000,"c",-9223372036854775808]]}

For the encoded output you can set the hadoop property "aegisthus.columntype" to whatever type your
column is defined as. Aegisthus defaults to BytesType. I think your output might be "UTF8Type" but I am not sure.

Most of the types are in this package https://github.com/apache/cassandra/tree/trunk/src/java/org/apache/cassandra/db/marshal They can also be composite types like "CompositeType(UTF8Type,TimeUUIDType)".

As far as the generic Hadoop readers/decoders go, I would like to make a Hadoop InputFormat that can read the Aegisthus JSON per row format or the Aegisthus compacted SSTable format however there are currently no plans to do this.

The Aegisthus JSON per row format is pretty simple I can describe that if you are interested in writing a parser yourself.

Thanks @danielbwatson

  1. I see in ( https://github.com/apache/cassandra/tree/trunk/src/java/org/apache/cassandra/db/marshal ) that there is a type called CounterColumnType. I can try to use that for my scenario. For the following table definition, do you think the options "aegisthus.columntype=CounterColumnType -D aegisthus.keytype=UTF8Type -D aegisthus.column_value_type=CounterColumnType " are appropriate?

CREATE TABLE analytics_counters_weekly (
access_attribute varchar,
matrix_type varchar,
time_bucket timestamp,
event_type varchar,
platform varchar,
position varchar,
attribute_key varchar,
medium varchar,
count counter,
PRIMARY KEY ( (access_attribute, matrix_type), time_bucket, event_type, platform, position, attribute_key, medium)
) WITH CLUSTERING ORDER BY (time_bucket DESC);

  1. Yes I am interested in writing hadoop input format to read the json data output from Aegisthus for our offline processing use case. Can you describe how it would work?

On issue (#30), I see they have asked similar question about the column names and output values not showing up correctly. I saw that you mentioned that it will be fixed. Is that fix available; if so which branch?

For the table i mentioned in my last comment; can you specify what is the right aegisthus.columntype, aegisthus.column_value_type and aegisthus.keytype that I should specify?

@danielbwatson do you have any comments on my questions in the previous posts on this issue?

@ggjoshi Sorry but I am not sure what types you should use for this table. What I would do is create an instance of this table and then insert just one row into it. Compact the table and grab the compacted sstable. Then I would create a simple test using the example setup here and try the different converter options out until I had the output that I was looking for.

This code is the code that outputs the JSON per row format if you would like to work on an input reader for it.

The output is rowkey\tJSON. Where the json format is:

{
  "keyName": {
   "deletedAt": 111, // Number here
   "columns": [ // outputs all of the columns in one of these 4 formats
      ["columnName", "columnValue", 111 ], // Where 111 is the timestamp
      ["columnName", "columnValue", 111, "d"], // if the column is deleted
      ["columnName", "columnValue", 111, "e", 222, 333], // if the column is expiring (222 is TTL and 333 is local delete time)
      ["columnName", "columnValue", 111, "c", 444] // if the column is a counter column (444 is timestamp of last delete)
    ]
  }
}

There was once an AegisthusInputFormat that read the row per json format. You could look in git history at that and see if it is helpful for you.

Ok; so per your suggestion, I created a table with couple of columns like below:-

CREATE TABLE dev.gjoshi_test_table (
name text PRIMARY KEY,
city text
)

And added couple of rows to it.

cqlsh> select * FROM dev.gjoshi_test_table
... ;

name | city
--------+-----------
foo | bar

Now when I decode the sstable data using the Aegisthus tool.

hadoop-mr1 jar ./aegisthus-hadoop-0.3.4.jar com.netflix.Aegisthus -D "aegisthus.columntype=CompositeType(UTF8Type,UTF8Type)" -D aegisthus.keytype=UTF8Type -D "aegisthus.column_value_type=UTF8Type" -D mapred.reduce.tasks=1 -inputDir /data/gjoshi/cassandra-data/gjoshi_test_table -output /data/gjoshi/cassandra-data/gjoshi_test_table_decoded_2

I see the following:-

hdfs dfs -cat /data/gjoshi/cassandra-data/gjoshi_test_table_decoded_2/aeg-00000

foo {"foo":{"deletedAt":-9223372036854775808,"columns":[["","",1446587641221438],["city","bar",1446587641221438]]}}

Is this expected behavior? The name column is not in the output. the city column has proper value in the output.

@ggjoshi Sorry for the delay, I tested and got the same results. Here are my results with a slightly more more complex table:

DROP KEYSPACE testdata;
CREATE KEYSPACE testdata WITH replication = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 2};
DROP TABLE testdata.population_table;
CREATE TABLE testdata.population_table (
    city varchar,
    state varchar,
    year int,
    metro_population bigint,
    metro_rank int,
    PRIMARY KEY ( (city, state), year)
) WITH CLUSTERING ORDER BY (year DESC);
INSERT INTO testdata.population_table (city,state,year,metro_population,metro_rank) VALUES ('San Francisco','CA',2014,4594060,11);
INSERT INTO testdata.population_table (city,state,year,metro_population,metro_rank) VALUES ('Atlanta','GA',2014,5614323,9);
INSERT INTO testdata.population_table (city,state,year,metro_population,metro_rank) VALUES ('Miami','FL',2014,5929819,8);

And these converters

aegisthus.keytype=CompositeType(UTF8Type,UTF8Type)
aegisthus.columntype=CompositeType(ReversedType(Int32Type),UTF8Type)
aegisthus.column_value_type=LongType

I get these results:

Miami:FL    {"Miami:FL":{"deletedAt":-9223372036854775808,"columns":[["2014:","",1446765673311000],["2014:metro_population","5929819",1446765673311000],["2014:metro_rank","00000008",1446765673311000]]}}
Atlanta:GA  {"Atlanta:GA":{"deletedAt":-9223372036854775808,"columns":[["2014:","",1446765673307000],["2014:metro_population","5614323",1446765673307000],["2014:metro_rank","00000009",1446765673307000]]}}
San Francisco:CA    {"San Francisco:CA":{"deletedAt":-9223372036854775808,"columns":[["2014:","",1446765673279000],["2014:metro_population","4594060",1446765673279000],["2014:metro_rank","0000000b",1446765673279000]]}}

You can see from the column values that only the long was converted and the int was still output in BytesType format. This could definitely be improved, I just don't have the time to do it at the moment.

Thanks @danielbwatson for your examples. Those helped me understand the column type, key type and column value type better. So key type is the partition key type, column type consists of all fields in primary key other than the partition key fields. Column value type is the value of the column which in my case is the counter.

So I created the table below; added couple of rows to it with count values as 16 and 5000020.

CREATE TABLE dev.analytics_counters_hourly (
access_attribute text,
matrix_type text,
time_bucket timestamp,
event_type text,
platform text,
position text,
attribute_key text,
medium text,
"count" counter,
PRIMARY KEY ((access_attribute, matrix_type), time_bucket, event_type, platform, position, attribute_key, medium)
)

access_attribute | matrix_type | time_bucket | event_type | platform | position | attribute_key | medium | count
------------------+-------------+--------------------------+------------+----------+----------+---------------+--------+---------
foo | foo | 1970-01-01 00:00:00+0000 | EventType | Platform | Position | AttKey | mobile | 16
AccessAttribute | MatrixType | 1970-01-01 00:00:00+0000 | EventType | Platform | Position | AttKey | mobile | 5000020

Then I ran the following command per my understanding of key type and column type. It gave me the right column name. But it still gives me some binary output for counter column value.

After looking at the value with various examples; I figured out that the value is still being output as bytes type and last 8 bytes printed in the value are the actual counter value(64 bit counter value). Thus the foo:foo column has value 0x10 or 16. whereas the AccessAttribute:MatrixType column has value 0x4c4b54 or 5000020.

Do you think my understanding above of the counter value is correct?

hadoop-mr1 jar ./aegisthus-hadoop-0.3.4.jar com.netflix.Aegisthus -D "aegisthus.columntype=CompositeType(TimestampType, UTF8Type, UTF8Type, UTF8Type, UTF8Type, UTF8Type, UTF8Type)" -D aegisthus.keytype="CompositeType(UTF8Type, UTF8Type)" -D "aegisthus.column_value_type=BytesType" -D mapred.reduce.tasks=1 -inputDir /data/gjoshi/cassandra-data/gjoshi_hourly_table -output /data/gjoshi/cassandra-data/gjoshi_hourly_table_decoded_9

Output:-

[gjoshi@watson-batch-hbase1-prod mr]$ hdfs dfs -cat /data/gjoshi/cassandra-data/gjoshi_hourly_table_decoded_9/aeg-00000

foo:foo {"foo:foo":{"deletedAt":-9223372036854775808,"columns":[["1970-01-01 00:00Z:EventType:Platform:Position:AttKey:mobile:count","00018000fe13bfa7e4ad4e608a88f80d1f6f46ce00000000000000100000000000000010",1446840948701000,"c",-9223372036854775808]]}}

AccessAttribute:MatrixType {"AccessAttribute:MatrixType":{"deletedAt":-9223372036854775808,"columns":[["1970-01-01 00:00Z:EventType:Platform:Position:AttKey:mobile:count","0001800095aa0b85ee3c4f15960425f0a07e63e1000000000000001500000000004c4b54",1446844419916000,"c",-9223372036854775808]]}}

From the Javadoc and the code in total from CounterContext I think that there can be multiple counts. And the one you are seeing is the last count.