/twitter-producer

Sample Apache Kafka producer which consumes tweets and publishes them to a Kafka cluster.

Primary LanguageJavaMIT LicenseMIT

twitter-producer

Sample Apache Kafka producer which consumes tweets and publishes them to a Kafka cluster.

The following contains some sample KSQL queries:

show topics;

create stream feed_raw (
  text varchar,
  created_at varchar,
  user struct<
    id bigint, 
    name varchar, 
    screen_name varchar, 
    followers_count integer, 
    created_at varchar
  >,
  entities struct<
    hashtags array<
        struct<
            text varchar, 
            indices array<integer>
        >
    >, 
    user_mentions array<
        struct<
            name varchar, 
            id bigint
        >
    >
  >,
  lang varchar
) with (
  kafka_topic='feed_raw', value_format='json'
);

show streams;

select text from feed_raw emit changes;

select * from  FEED_RAW where lang= 'en' emit changes;

select text, lang from feed_raw where lang='de' emit changes;

create stream feed_restructured as select
  text,
  user,
  created_at,
  entities->hashtags as hashtags,
  entities->user_mentions as user_mentions,
  lang as language
  from feed_raw emit changes;

# show that no topics was created

# query new stream:
select text, hashtags from feed_restructured emit changes;

#select text, hashtags, ARRAY_LENGTH(hashtags) as l  from feed_restructured emit changes;



create stream tags as select user->name, explode(hashtags)->text as tag, language from feed_restructured emit changes;



  create table language_counts as select language, count(*) as count from  FEED_RESTRUCTURED group by  LANGUAGE emit changes;

describe language_counts;

select * from language_counts emit changes;

select lcase(tag), count(*) from tags window tumbling (size 20 seconds) group by lcase(tag) emit changes;

  create table language_descriptions (rowkey varchar  key, name varchar) with (kafka_topic='lang_table', value_format='json', partitions=1);


insert into language_descriptions values ('de', 'Deutsch');
insert into language_descriptions values ('en', 'Englisch');
insert into language_descriptions values ('ru', 'Russisch');


create table language_count_enriched with (kafka_topic='lce', value_format='json') as select d.name, c.count from LANGUAGE_COUNTS c inner join  LANGUAGE_DESCRIPTIONS d on c.rowkey = d.rowkey   emit changes;

Different way creating types:

create type user_mention as struct<name varchar, id bigint>;

create type hash_tag as struct<text varchar,  indices array<integer>>;

create type entity as struct<hashtags array<hash_tag>, user_mentions array<user_mention>>;

create type user as struct<id bigint, 
    name varchar, 
    screen_name varchar, 
    followers_count integer, 
    created_at varchar
>;

create stream feed_raw (
  text varchar,
  created_at varchar,
  user user,
  entities entity,
  lang varchar
) with (
  kafka_topic='feed_raw', value_format='json'
);

select user->name, user->followers_count, entities->hashtags, text from feed_raw emit changes;

When we try to add a field which is not there, null value will be filled:

create stream feed_raw_non_existing_fields (
  text varchar,
  created_at varchar,
  user user,
  entities entity,
  lang varchar,
  unknown_text varchar,
  unknown_int integer,
  unknown_struct struct<a varchar, b integer>
) with (
  kafka_topic='feed_raw', value_format='json'
);

select user->name, unknown_text, unknown_int, unknown_struct, text from feed_raw_non_existing_fields emit changes;

Now that we have seen how we can de-construct a struct using the -> syntax, how can we build up a struct?

select 
struct(language := lang, user := user->name)  as meta,
text as content
from feed_raw emit changes;

create stream summarized_json with (format='JSON') as select struct(language := lang, user := user->name) as meta, text as content from feed_raw emit changes;

Observe that a topic with the name in the stream in upper-case and a null key was created.

To add keys, one can add a key modifier:

create stream summarized_avro with (format='Avro', kafka_topic='summarized', partitions=6) as
select
user->name key,
struct(language := lang, user := user->name)  as meta,
text as content
from feed_raw 
partition by user->name
emit changes;

Observe that the output does not contain the key-field name in the value: // insert screnshot NB:

In order to control to case of the key-names, once can use backticks:

create stream summarized_json_lowercase with (format='JSON') as
select
struct(`language` := lang, `user` := user->name)  as `meta`,
text as `content`
from feed_raw emit changes;

create stream summarized_json_lowercase_sr with (format='JSON_SR') as select struct(language := lang, user := user->name) as meta, text as content from feed_raw emit changes;