sds-processors

Bun CI npm

Connector Architecture Typescript processors for handling operations over SDS streams. It currently exposes 5 functions:

This processor takes as input a stream of (batched) RDF data entities and wraps them as individual SDS records to be further processed downstream. By default, it will extract individual entities by taking every single named node subject and extracting a Concise Bounded Description (CBD) of that entity with respect to the input RDF graph.

Alternatively, a set of SHACL shapes can be given to concretely define and filter the type of entities and their properties, that want to be extracted and packaged as SDS records. This processor relies on the member extraction algorithm implemented by the W3C TREE Hypermedia community group.

If the js:timestampPath is specified, the set of SDS records will be streamed out in temporal order to avoid out of order writing issues downstream.

An example of how to use this processor within a Connector Architecture pipeline definition is shown next:

@prefix : <https://w3id.org/conn#>.
@prefix js: <https://w3id.org/conn/js#>.
@prefix sh: <http://www.w3.org/ns/shacl#>.

[ ] a js:Sdsify;
    js:input <inputChannelReader>;
    js:output <outputChannerWriter>;
    js:stream <http://ex.org/myStream>;
    js:timestampPath <http://ex.org/timestamp>;
    js:shapeFilter """
        @prefix sh: <http://www.w3.org/ns/shacl#>.
        @prefix ex: <http://ex.org/>.

        [ ] a sh:NodeShape;
            sh:targetClass ex:SomeClass;
            sh:property [ sh:path ex:someProperty ].
    """,
    """
        @prefix sh: <http://www.w3.org/ns/shacl#>.
        @prefix ex: <http://ex.org/>.

        [ ] a sh:NodeShape;
            sh:targetClass ex:SomeOtherClass;
            sh:property [ sh:path ex:someOtherProperty ].
    """.

This processor takes as input a stream of SDS records and SDS metadata and proceeds to bucketize them according to a predefined strategy (see example). The SDS metadata will be also transformed to reflect this transformation. Multiple SDS streams can be present on the incoming data channel.

This processor relies on the bucketizer implementations available in the TREEcg/bucketizers repository.

This processor takes a stream of raw entities (e.g., out from a RML transformation process) and creates versioned entities appending the current timestamp to the entity IRI to make it unique. It is capable of keeping a state so that unmodified entities are filtered.

This processor can be used to join multiple input streams or Reader Channels (js:input) and pipe their data flow into a single output stream or Writer Channel (js:output). The processor will guarantee that all data elements are delivered downstream and will close the output if all inputs are closed.

This a simple RDF data generator function used for testing. This processor will periodically generate RDF objects with 3 to 4 predicates.