- Introduction - MAT.OCS.Streaming library
- C# Read/Write Samples
- C# Model sample
- Python Read/Write Samples
- Python Model sample
This API provides infrastructure for streaming data around the ATLAS technology platform.
Using this API, you can:
- Subscribe to streams of engineering values - no ATLAS recorder required
- Inject parameters and aggregates back into the ATLAS ecosystem
- Build up complex processing pipelines that automatically process new sessions as they are generated
With support for Apache Kafka, the streaming API also offers:
- Late-join capability - clients can stream from the start of a live session
- Replay of historic streams
- Fault-tolerant, scalable broker architecture
Be sure to look at our support knowledgebase on Zendesk: https://mclarenappliedtechnologies.zendesk.com/
This pre-release version of the API demonstrates the event-based messaging approach, for sessions and simple telemetry data.
Future versions will model all ATLAS entities, and likely offer better support for aggregates and predictive models.
You need to install the following Nuget packages from MAT source
- MAT.OCS.Streaming
- MAT.OCS.Streaming.Kafka
- MAT.OCS.Streaming.Mqtt
- MAT.OCS.Streaming.Codecs.Protobuf
Basic samples demonstrate the simple usage of Advanced Streams, covering all the bare-minimum steps to implement Telematry Data and Telemetry Samples read and write to and from Kafka or Mqtt streams.
First of all you need to configure the dependencies
const string brokerList = "localhost:9092"; // The host and port where the Kafka broker is running
const string groupName = "dev"; // The group name
const string topicName = "data_in"; // The existing topic's name in the Kafka broker. The *_annonce topic name must exist too. In this case the data_in_announce
var dependencyServiceUri = new Uri("http://localhost:8180/api/dependencies/"); // The URI where the dependency services are running
var client = new KafkaStreamClient(brokerList); // Create a new KafkaStreamClient for connecting to Kafka broker
var dataFormatClient = new DataFormatClient(new HttpDependencyClient(dependencyServiceUri, groupName)); // Create a new DataFormatClient
The DependencyService is used to handle requests for AtlasConfigurations and DataFormats. You must provide an URI for this service. The DataFormatClient handles the data formats through the DependencyService for the given group name.
If you want to connect to MQTT, create a client of MqttStreamClient instead of KafkaStreamClient:
var client = new MqttStreamClient(new MqttConnectionConfig(brokerList, "userName", "password"));
Create a stream pipeline using the KafkaStreamClient and the topicName. Stream the messages .Into your handler method
var pipeline = client.StreamTopic(topicName).Into(streamId => // Stream Kafka topic into the handler method
Create a SessionTelemetryDataInput with the actual stream id and the dataFormatClient
var input = new SessionTelemetryDataInput(streamId, dataFormatClient);
In this example we bind the DataInput to the handler method using the default feed and simply print out some details about the incoming data.
input.DataInput.BindDefaultFeed(ParameterId).DataBuffered += (sender, e) => // Bind the incoming feed and take the data
{
var data = e.Buffer.GetData();
// In this sample we consume the incoming data and print it
var time = data.TimestampsNanos;
for (var i = 0; i < data.Parameters.Length; i++)
{
Trace.WriteLine($"Parameter[{i}]:");
var vCar = data.Parameters[i].AvgValues;
for (var j = 0; j < time.Length; j++)
{
var fromMilliseconds = TimeSpan.FromMilliseconds(time[j].NanosToMillis());
Trace.WriteLine($"{fromMilliseconds:hh\\:mm\\:ss\\.fff}, { new string('.', (int)(50 * vCar[j])) }");
}
}
};
In this example we bind the SamplesInput to the handler method and simply print out some details
input.SamplesInput.AutoBindFeeds((s, e) => // Take the input and bind feed to an event handler
{
var data = e.Data;// The event handler here only takes the samples data
Trace.WriteLine(data.Parameters.First().Key); // and prints some information to the debug console
Trace.WriteLine(data.Parameters.Count);
});
You can optionally handle the StreamFinished event.
input.StreamFinished += (sender, e) => Trace.WriteLine("Finished"); // Handle the steam finished event
In order to successfully read and consume the stream, make sure to wait until connected and wait for the first stream. Optionally you can tell the pipeline to wait for a specific time while the stream is being idle, before exiting from the process.
if (!pipeline.WaitUntilConnected(TimeSpan.FromSeconds(30), CancellationToken.None)) // Wait until the connection is established
throw new Exception("Couldn't connect");
pipeline.WaitUntilFirstStream(TimeSpan.FromMinutes(1), CancellationToken.None); // Wait until the first stream is ready to read.
pipeline.WaitUntilIdle(TimeSpan.FromMinutes(5), CancellationToken.None); // Wait for 5 minutes of the pipeline being idle before exit.
First of all you need to configure the dependencies
const string brokerList = "localhost:9092"; // The host and port where the Kafka broker is running
const string groupName = "dev"; // The group name
const string topicName = "data_in"; // The existing topic's name in the Kafka broker. The *_annonce topic name must exist too. In this case the data_in_announce
var dependencyServiceUri = new Uri("http://localhost:8180/api/dependencies/"); // The URI where the dependency services are running
var client = new KafkaStreamClient(brokerList); // Create a new KafkaStreamClient for connecting to Kafka broker
var dataFormatClient = new DataFormatClient(new HttpDependencyClient(dependencyServiceUri, groupName)); // Create a new DataFormatClient
var httpDependencyClient = new HttpDependencyClient(dependencyServiceUri, groupName); // DependencyClient stores the Data format, Atlas Configuration
var atlasConfigurationId = new AtlasConfigurationClient(httpDependencyClient).PutAndIdentifyAtlasConfiguration(AtlasConfiguration); // Uniq ID created for the AtlasConfiguration
var dataFormat = DataFormat.DefineFeed().Parameter(ParameterId).BuildFormat(); // Create a dataformat based on the parameters, using the parameter id
var dataFormatId = dataFormatClient.PutAndIdentifyDataFormat(dataFormat); // Uniq ID created for the Data Format
The DependencyService is used to handle requests for AtlasConfigurations and DataFormats. You must provide an URI for this service. The DataFormatClient handles the data formats through the DependencyService for the given group name. DataFormat is required when writing to stream, as it is used to define the structre of the data and dataFormatId is used to retrieve dataformat from the dataFormatClient.
AtlasConfigurationId is needed only if you want to display your data in Atlas10.
If you want to connect to MQTT, create a client of MqttStreamClient instead of KafkaStreamClient:
var client = new MqttStreamClient(new MqttConnectionConfig(brokerList, "userName", "password"));
Open the output topic using the preferred client (KafkaStreamClient or MqttStreamClient) and the topicName.
using (var outputTopic = client.OpenOutputTopic(topicName)) // Open a KafkaOutputTopic
{
}
Create a SessionTelemetryDataOutput and configure session output properties.
var output = new SessionTelemetryDataOutput(outputTopic, dataFormatId, dataFormatClient);
output.SessionOutput.AddSessionDependency(DependencyTypes.DataFormat, dataFormatId); // Add session dependencies to the output
output.SessionOutput.AddSessionDependency(DependencyTypes.AtlasConfiguration, atlasConfigurationId);
output.SessionOutput.SessionState = StreamSessionState.Open; // set the sessions state to open
output.SessionOutput.SessionStart = DateTime.Now; // set the session start to current time
output.SessionOutput.SessionIdentifier = "data_" + DateTime.Now; // set a custom session identifier
You must add dataFormatId and atlasConfigurationId to session dependencies to be able to use them during the streaming session.
Once it is done, send the session details to the output.
output.SessionOutput.SendSession();
You will need TelemetryData to write to the output. In this example we generate some random telemetryData for the purpose of demonstration.
var telemetryData = GenerateData(10, (DateTime)output.SessionOutput.SessionStart); // Generate some telemetry data
Bind the feed to DataOutput by its name. You can use the default feedname or use a custom one.
const string feedName = ""; // As sample DataFormat uses default feed, we will leave this empty.
var outputFeed = output.DataOutput.BindFeed(feedName); // bind your feed by its name to the Data Output
Enqueue and send the telemetry data.
Task.WaitAll(outputFeed.EnqueueAndSendData(telemetryData)); // enqueue and send the data to the output through the outputFeed
You will need TelemetrySamples to write to the output. In this example we generate some random telemetrySamples for the purpose of demonstration.
var telemetrySamples = GenerateSamples(10, (DateTime)output.SessionOutput.SessionStart); // Generate some telemetry samples
Bind the feed to SamplesOutput by its name. You can use the default feedname or use a custom one.
const string feedName = ""; // As sample DataFormat uses default feed, we will leave this empty.
var outputFeed = output.SamplesOutput.BindFeed(feedName); // bind your feed by its name to the SamplesOutput
Task.WaitAll(outputFeed.SendSamples(telemetrySamples)); // send the samples to the output through the outputFeed
Once you sent all your data, don't forget to set the session state to closed
output.SessionOutput.SessionState = StreamSessionState.Closed; // set session state to closed. In case of any unintended session close, set state to Truncated
output.SessionOutput.SendSession(); // send session
Advanced samples cover the usual use cases for reading, writing and reading and linking telemetry data in an structured and organized code. According to that each sample .cs file has a Write(), Read() and ReadAndLink() methods and all of the sample files rely on the same structure. You can use them as working samples copying to your application code. The .cs files in the Samples folder have documenting and descriptive comments, but lets take a look at a simple and a more complex sample in depth.
First of all you need to create or use an AtlasConfiguration. You need to set the details what AppGroupId, ParameterGroupId, ParameterID you want to use.
Once you have your AtlasConfiguration design, you need to set details for the DependencyService URI, the stream broker address, the group name and the output topic name where you want to write. The DependencyService is used to handle requests for AtlasConfigurations and DataFormats, you must provide an URI for this service.
A KafkaStreamAdapter is used to manage Kafka streams.
Using the KafkaStreamAdapter you must open and output topic in Kafka.
You need to persist your configuration, setup your DataFormat and setup a streaming session to be able to write to your broker. The Writer.cs code sample covers all of these, you only need to use a Writer object with the required parameters.
In this example we are usign the default feed name, which is the empty string. You must take care about opening and closing the session, otherwise it would mark the session as truncated, just as if any error would occur during session usage.
In the samples we are working with random generated data, while in real scenarios they would come as real data from external systems.
To write to the output topic you only need to invoke the Write method on your writer object, passing in the feed name and the telemetry data. The Writer object already "knows" your AtlasConfiguration, the DataFormat and the output topic name.
First of all you need to create or use an AtlasConfiguration. You need to set the details what AppGroupId, ParameterGroupId, ParameterID you want to use.
Once you have your AtlasConfiguration design, you need to set details for the DependencyService URI, the stream broker address, the group name and the output topic name where you want to write. The DependencyService is used to handle requests for AtlasConfigurations and DataFormats, you must provide an URI for this service.
A KafkaStreamAdapter is used to manage Kafka streams.
Using the KafkaStreamAdapter you must open and output stream in Kafka.
You need connect to the DependencyClient and the DataFormatClient and perist the StreamPipelineBuilder that was created by the KafkaStreamAdapter. Using a Reader object takes care about it.
You can start reading a stream with the Read method on your reader object. This takes 2 arguments, the first one is more trivial, it is the parameter id. The second must be a user specified method, aligning to the TelemetryDataHandler delegate. With the help of this you can handle the streamed Telemetry Data as you would like to. In another example you will see how can you link it directly to another output topic.
Lastly, in our sample code we invoke the Write() method while the streaming session is live, to have some input to see that the streaming is working. Our sample delegates, called as Models are in the Models.cs and in this example we use the TraceData method to trace the streamed telemetry data deatils.