This product is still in its infancy
The CSPS Digital Innovation Services under the Digital Academy is experimenting with Apache Kafka as part of the CORTEX project. There was a need to create a stream processor from which ML services can be added in a generic way. Asgard was created to fill this need.
Asgard is a stream processor for Apache Kafka which allows machine learning services and natural language processors for textual data to be added in a generic way to any kafka cluster. It listens to a configured topic and consumes messages containing the textual data. Asgard will processes this textual data using the implemented "ML Providers" and then produces a message on a seperate configured topic to be consumed by downstream applications (consumers in kafka speak).
You will obviously need a kafka instance to be able to use Asgard. You will need two topics for Asgard to work. One topic for the data that will be processed to be consumed by Asgard and the other for Asgard to put the processed data in. We recommend that these topics be used for nothing else as Asgard currently is not smart enough to decern on what it should consume and what it should not.
Asgard has a development environment and a production environment. To switch between the production and the development environment set the environment variable ASGARD_ENV
to either production
or development
. If the environment variable is not set then the environment will default to production. If the environment variable is set to any other value then production or development, the environment will default to development.
Asgard contains three main json configuration files. These can be found in the src/configs folder. default.json
is optional and is loaded regardless of the environment. development.json
or production.json
will then be loaded based on if the environment is development of production respectively. If there are the same keys in the default.json
file as either the development.json
or production.json
, it will be overwritten by the values in either of those files. For example if BROKER_HOST
is set in the default.json
, also set in the development.json
and the environment is development then the value for BROKER_HOST
in the development.json will be used to configure the application.
Most configuration values in the json config files can be subsituted for environment variables by adding the prefix ASGARD_
. There are some exceptions to this which will be noted in the configuration values list bellow.
BROKER_HOST
Description : The endpoint for the apache kafka broker in the form of <url>:<port>
Environment Variable Swappable: Yes
Development Environment Behaviour: If not in development.json and environment variable is not set. It will default to value localhost:9092
Production Environment Behanviour: If not in production.json and environment variable is not set. Application will fail to start
SCHEMA_REGISTRY
Description: The endpoint for the apache schema registry in the form of https|http://<url>:<port>
Environment Variable Swappable: Yes
Development Environment Behaviour: If not in development.json and environment variable is not set. It will default to value http://localhost:8081
Production Environment Behanviour: If not in production.json and environment variable is not set. Application will fail to start
CONSUMER_TOPIC
Description: The topic that asgard will listen to which will contain the data to be processed
Environment Variable Swappable: Yes
Development Environment Behaviour: If not in development.json and environment variable is not set. It will default to value text_data_to_be_processed
Production Environment Behanviour: If not in production.json and environment variable is not set. Application will fail to start
PRODUCER_TOPIC
Description: The topic that asgard will produce messages containing the processed data
Environment Variable Swappable: Yes
Development Environment Behaviour: If not in development.json and environment variable is not set. It will default to value text_data_processed
Production Environment Behanviour: If not in production.json and environment variable is not set. Application will fail to start
PRODUCER_SCHEMA
Description: The name of the file in the src/schemas
folder which contains the schema that will be used by Apache Avro Producer to serialize the message and downsteam consumers to deserialize it.
Environment Variable Swappable: Yes
Development Environment Behaviour: If not in development.json and environment variable is not set. It will default to value data_input.avsc
Production Environment Behanviour: If not in production.json and environment variable is not set. Application will fail to start
LOGGING_CONFIG
Description: The configuration for the python logging module. All the default handlers, filters and formatters from the python logging module can be used. A slack handler for posting logs to slack has also been implemented (see logging section). Here is an example logging configuration
{
"version": 1,
"formatters":{
"default": {
"class": "logging.Formatter",
"format": "LEVEL: %(levelname)s TIME: %(asctime)s FILENAMEL %(filename)s MODULE: %(module)s MESSAGES: %(message)s \n"
},
"slackFormatter": {
"class": "src.utils.logger.SlackFormatter"
}
},
"handlers" : {
"console": {
"class": "logging.StreamHandler",
"level": "NOTSET",
"formatter": "default"
},
"slack": {
"class": "src.utils.logger.SlackHandler",
"level": "ERROR",
"formatter": "slackFormatter"
},
"file": {
"class": "logging.FileHandler",
"filename": "./src/development-logs.log",
"level": "DEBUG",
"formatter": "default"
}
},
"loggers": {
"": {
"handlers": [
"console", "slack", "file"
],
"level": "NOTSET"
}
}
}
Environment Variable Swappable: No
Development Environment Behaviour: If not specified, logging will be shut off and messages will be printed to console instead
Production Environment Behaviour: If not specified, logging will be shut off and messages will be printed to console instead. It is highly recommended you configure the logging for the production environment (see logging section)
ML_PROVIDERS
Description: A string list of ML Providers to be used to process the text. The items must be the lower case class names of the implemented MLProvider (See relevant section)
Environment Variable Swappable: No
Development Environment Behaviour: The Google Natural Language service is enabled by default if this configuration value isn't set. In this case you will need to set up a projet on GCP, enable this service, create a service account and download the credentials which can be refrenced by the GOOGLE_APPLICATION_CREDENTIALS environment variable. See the google docs on this.
Production Environment Behaviour: The application will not start if this configuration is not set
ASGARD_SLACK_URL
( Environment Variable Only )
Description: The webhook slack url which will be used by the logging slack handler to post logs to slack (See relavent section)
Behavior: There is no specific behavior relevant to the different environments but if the slack handler is configured in the LOGGING_CONFIG
then it must be set or the application will not start
Once you are finished providing the configurations listed above, navigate to the asgard directory and then
$ python application.py
Asgard is now up and running, listening to the CONSUMER_TOPIC
topic and consuming messages , processing them using the ML_PROVIDERS
and the producing the output using the PRODUCER_SCHEMA
to the PRODUCER_TOPIC
If you are in development mode, time taken to process a message will be printed in the terminal
You can see if Asgard is working correctly or not by running the producer simulator. You can do this using this command
$ python tests/simulate_producer.py <optional: num_messages> <broker> <schema_registry> <schema_path> <topic>
For example for an infinite producer
$ python tests/simulate_producer.py "localhost:9092" "http://localhost:8081" "./src/schemas/data_input.avsc" "text_data_to_be_processed"
or for a producer which only produces 10 messages
$ python tests/simulate_producer.py 10 "localhost:9092" "http://localhost:8081" "./src/schemas/data_input.avsc" "text_data_to_be_processed"
This producer will produce messages with the poem
Do not go gentle into that good night,
Old age should burn and rave at close of day;
Rage, rage against the dying of the light.
Though wise men at their end know dark is right,
Because their words had forked no lightning they
Do not go gentle into that good night.
Good men, the last wave by, crying how bright
Their frail deeds might have danced in a green bay,
Rage, rage against the dying of the light.
Wild men who caught and sang the sun in flight,
And learn, too late, they grieved it on its way,
Do not go gentle into that good night.
Grave men, near death, who see with blinding sight
Blind eyes could blaze like meteors and be gay,
Rage, rage against the dying of the light.
And you, my father, there on the sad height,
Curse, bless, me now with your fierce tears, I pray.
Do not go gentle into that good night.
Rage, rage against the dying of the light.
Asgard currently expects data according to the following Avro Schema
{
"namespace": "cortex.streamprocessors.asgard",
"type": "record",
"name": "InputData",
"doc": "This schema is used to deserialize asgard data processesing requests",
"fields": [
{"name": "uid", "type": ["string", "long"]},
{"name": "data", "type": {
"name": "items", "type": "array", "items": {
"name": "item",
"type": "record",
"fields": [
{"name": "uid", "type": ["null", "string", "long"], "default": null},
{"name": "text", "type": "string"}
]
}
}
}
]
}
This is an example of a valid message to Asgard
{
"uid": "Arbitrary unique identifier which upsteam/downstream producers/consumers can use as a refrence",
"data": [
{
"uid": "Arbitrary unique identifier which the producer can use as a refrence",
"text": "Text to be processed"
}
]
}
The UID field is not used by Asgard at all. It is there so that the upstream and downstream applications can use it as a foreign key to relate the processed data to whatever arbitrary relationship they have.
Asgard will produce a message containing the processed data according to the following schema. This can be changed to reflect your needs. However this is the default provided
{
"namespace": "cortex.streamprocessors.asgard",
"type": "record",
"name": "OutputData",
"doc": "This schema is used to serialize processed data by asgard",
"fields": [
{"name": "uid", "type": ["string", "long"]},
{"name": "data", "type": {
"name": "processedText",
"type": "array",
"items": {
"name": "providersProcessedText",
"type": "array",
"items": {
"name": "processedTextProvider",
"type": "record",
"fields": [
{ "name": "uid", "type": ["string", "null"]},
{ "name": "text", "type": "string"},
{ "name": "language", "type": "string"},
{ "name": "sentimentScore", "type": "double"},
{ "name": "magnitudeScore", "type": "double" },
{ "name": "provider", "type": "string"},
{ "name": "sentences", "type": {
"name": "processedSentences",
"type": "array",
"items": {
"name": "processedSentence",
"type": "record",
"fields": [
{"name": "text", "type": "string"},
{"name": "sentimentScore", "type": "double"},
{"name": "magnitudeScore", "type": "double"}
]
}
}
}
]
}
}
}
}
]
}
An example of the structure of a message produced by Asgard according to this schema
{
"uid": "Arbitrary unique identifier which upsteam/downstream producers/consumers can use as a refrence",
"data":[
[
{
"uid":"Arbitrary unique identifier which upsteam/downstream producers/consumers can use as a refrence",
"text": "The original text sent to Asgard to process",
"language": "Detected language of the text",
"sentimentScore": "Analyze sentiment score",
"magnitudeScore": "Analyzed magnitude score",
"provider": "The name of the ML provider who processed this data",
"sentences":[
{
"text": "Text of a extracted sentence in the text",
"sentimentScore": "The sentiment score analyzed for this sentence",
"magnitudeScore": "The magnitude score analyzed for this sentence"
}
]
}
]
]
}
Asgard will produce a double array of the processed text. You can think of this as a matrix where the rows are the text provided in the array of the input message and the columns are the outputs different ML Providers which have been enabled to process the data.
Text Index | ML Provider 1 | ML Provider 2 |
---|---|---|
1 | { "uid":"Arbitrary unique identifier which upsteam/downstream producers/consumers can use as a refrence", "text": "The original text sent to Asgard to process", "language": "Detected language of the text", "sentimentScore": "Analyze sentiment score", "magnitudeScore": "Analyzed magnitude score", "provider": "ML Provider 1", "sentences":[ { "text": "Text of a extracted sentence in the text", "sentimentScore": "The sentiment score analyzed for this sentence", "magnitudeScore": "The magnitude score analyzed for this sentence" } ] } | { "uid":"Arbitrary unique identifier which upsteam/downstream producers/consumers can use as a refrence", "text": "The original text sent to Asgard to process", "language": "Detected language of the text", "sentimentScore": "Analyze sentiment score", "magnitudeScore": "Analyzed magnitude score", "provider": "ML Provider 2", "sentences":[ { "text": "Text of a extracted sentence in the text", "sentimentScore": "The sentiment score analyzed for this sentence", "magnitudeScore": "The magnitude score analyzed for this sentence" } ] } |
Translated to a JSON Structure
[
[
{
"uid":"Arbitrary unique identifier which upsteam/downstream producers/consumers can use as a refrence",
"text": "The original text sent to Asgard to process",
"language": "Detected language of the text",
"sentimentScore": "Analyze sentiment score",
"magnitudeScore": "Analyzed magnitude score",
"provider": "ML Provider 1",
"sentences":[
{
"text": "Text of a extracted sentence in the text",
"sentimentScore": "The sentiment score analyzed for this sentence",
"magnitudeScore": "The magnitude score analyzed for this sentence"
}
]
},
{
"uid":"Arbitrary unique identifier which upsteam/downstream producers/consumers can use as a refrence",
"text": "The original text sent to Asgard to process",
"language": "Detected language of the text",
"sentimentScore": "Analyze sentiment score",
"magnitudeScore": "Analyzed magnitude score",
"provider": "ML Provider 2",
"sentences":[
{
"text": "Text of a extracted sentence in the text",
"sentimentScore": "The sentiment score analyzed for this sentence",
"magnitudeScore": "The magnitude score analyzed for this sentence"
}
]
}
]
]
This can be done in a few simple steps
Step 1: Implement a class in the providers folder
Step 2: Follow the template of the google.py provided which configures a MLProvider for the Google Natural Language API
import traceback
from .base_provider import MLProvider
from google.cloud import language #pylint: disable=no-name-in-module
from google.cloud.language import enums #pylint: disable=no-name-in-module
from google.cloud.language import types #pylint: disable=no-name-in-module
from google.api_core.exceptions import GoogleAPICallError, RetryError #pylint: disable=no-name-in-module
class Google(MLProvider):
def __init__(self, **kwargs):
"""
Instatiate the Google Natural Language API ML Providor
"""
super().__init__(**kwargs)
if "logging_enabled" in kwargs:
del kwargs["logging_enabled"]
self.__client = language.LanguageServiceClient(
**kwargs
)
def process(self, text):
"""
Process textual data
Parameters
----------
text: str
The textual data to be processed
Returns
-------
dict
The processed data
"""
document = types.Document( #pylint: disable=no-member
content = text,
type = enums.Document.Type.PLAIN_TEXT
)
try:
sentiment = self.__client.analyze_sentiment(document=document)
except GoogleAPICallError as e:
super().__log_msg(
"An error occured with the Google Natural Language API Call {}". format(e),
"Text which was to be processed {}".format(text),
"Stack trace {}".format(traceback.format_exc()),
delimeter="\n",
level="ERROR"
)
return e
except ValueError as e:
super().__log_msg(
"Bad arguments provided {}".format(e),
"Text which was to be processed {}".format(text),
"Stack trace {}".format(traceback.format_exc()),
delimeter="\n",
level="ERROR"
)
return e
except Exception as e:
super().__log_msg(
"An unknown error has occured {}".format(e),
"Text which was to be processed {}".format(text),
"Stack trace {}".format(traceback.format_exc()),
delimeter="\n",
level="ERROR"
)
return e
data = {}
data["text"] = text
data["language"] = sentiment.language
data["sentimentScore"] = sentiment.document_sentiment.score
data["magnitudeScore"] = sentiment.document_sentiment.magnitude
data["provider"] = "google"
data["sentences"] = [
{
"text": sentence.text.content,
"sentimentScore": sentence.sentiment.score,
"magnitudeScore": sentence.sentiment.magnitude
} for sentence in sentiment.sentences ]
return data
All MLProvider classes must
- Inherit the MLProvider base class
- Implement a process method which takes a single argument
text
produces a structure that is compliant with your producer schema
Step 3: Import it in the src/machine_learning_providers/providers/init.py
from .base_provider import MLRegistry
from .base_provider import MLProvider
from .google import Google
Step 4: Enable it by adding it's lower case name to the ML_PROVIDERS
list configuration
{
"ML_PROVIDERS": [
"google"
]
}
That's it ! All Processors which we call ML Providers can be implemented in this way. Nothing to import and no extra code to write to use. Asgard will automatically call this class to process the data. Providers can simply be swapped or added by adding them or removing them from the ML_PROVIDERS
list
More information on this soon !