A Node/Typescript SDK that pushes data to and pulls data from an instance of an RStreams bus.
Full documentation and getting started guide here: rstreams-site-url/rstreams-node-sdk/
For this SDK API documentation, get started with the RStreamsSdk.
Send events one at a time to an instance of the RStreams bus.
import { ConfigurationResources, RStreamsSdk } from "leo-sdk";
import { PersonRaw, PersonRawResults } from "../lib/types";
import axios from "axios";
async function main() {
const rsdk: RStreamsSdk = new RStreamsSdk();
const person = await getRandomPerson();
await rsdk.putEvent('rstreams-example.load-people', 'rstreams-example.people', person);
}
async function getRandomPerson(): Promise<PersonRaw> {
const NUM_EVENTS = 1;
const url = `https://randomuser.me/api/?results=${NUM_EVENTS}&exc=login,registered,phone,cell,picture,id&noinfo`;
const {data, status} = await axios.get<PersonRawResults>(url);
if (status !== 200) {
throw new Error('Unable to get randomPeople from https://randomuser.me API: ' + status);
}
console.log('Person: ' + data.results[0].name.first + ' ' + data.results[0].name.last);
return data.results[0];
}
(async () => {
await main();
})()
Read events from one queue, modify them and send them on to another queue of the RStreams bus.
Enrich Operation documentation
import { EnrichOptions, RStreamsSdk } from "leo-sdk";
import { Person, PersonRaw } from "../lib/types";
import axios from "axios";
async function main() {
const rsdk: RStreamsSdk = new RStreamsSdk();
const opts: EnrichOptions<PersonRaw, Person> = {
id: 'rstreams-example.people-to-peopleplus',
inQueue: 'rstreams-example.people',
outQueue: 'rstreams-example.peopleplus',
start: 'z/2022/04/20',
config: {
limit: 2
},
transform: async (person: PersonRaw) => {
const p: Person = translate(person);
await addCountryCode(p);
return p;
}
};
await rsdk.enrichEvents<PersonRaw, Person>(opts);
}
interface CountryCode {cca2: string;}
/**
* @param person The person to add addr.countryCode to by calling a public API to
* turn a country name in a 2 digit country code (iso cca2)
*/
async function addCountryCode(person: Person): Promise<void> {
const url = `https://restcountries.com/v3.1/name/${person.addr.country}?fullText=true&fields=cca2`;
const cc: CountryCode = await axios.get(url);
person.addr.countryCode = cc.cca2;
}
/**
* @param p The type from the public API we want to modify
* @returns The new type that is flatter and gets rid of some attributes don't need
*/
function translate(p: PersonRaw): Person {
return {
gender: p.gender,
firstName: p.name.first,
lastName: p.name.last,
email: p.email,
birthDate: p.dob.date,
nationality: p.nat,
addr: {
addr1: p.location.street.number + ' ' + p.location.street.name,
city: p.location.city,
state: p.location.state,
country: p.location.country,
postcode: p.location.postcode,
longitude: p.location.coordinates.longitude,
latitude: p.location.coordinates.latitude,
tzOffset: p.location.timezone.offset,
tzDesc: p.location.timezone.description
}
}
}
(async () => {
await main();
})()
Stream data down from an RStreams bus queue at scale, optionally modify it and send it to another source such as a database or file.
Offload Operation documentation
import { OffloadOptions, RStreamsSdk } from "leo-sdk";
import { Person } from "../lib/types";
import axios, { AxiosResponse } from "axios";
async function main() {
const rsdk: RStreamsSdk = new RStreamsSdk();
const opts: OffloadOptions<Person> = {
id: 'rstreams-example.offload-one-peopleplus',
inQueue: 'rstreams-example.people',
start: 'z/2022/04/20',
limit: 2,
transform: async (person: Person) => {
await savePerson(person);
return true;
}
};
await rsdk.offloadEvents<Person>(opts);
}
interface PostResponse {
success: boolean;
}
/**
* @param person Save the person to another system.
*/
async function savePerson(person: Person): Promise<void> {
const url = `https://run.mocky.io/v3/83997150-ab13-43da-9fb9-66051ba06c10?mocky-delay=500ms`;
const {data, status}: AxiosResponse<PostResponse, any> = await axios.post<PostResponse>(url, person);
if (status !== 200 || !data || data.success !== true) {
throw new Error('Saving person to external system failed');
}
}
(async () => {
await main();
})()