Hazelcast Jet Cheatsheet
Useful after reading Hazelcast Jet Get Started
Dependency
Maven:
<dependency>
<groupId>com.hazelcast.jet</groupId>
<artifactId>hazelcast-jet</artifactId>
<version>4.3</version>
</dependency>
Gradle:
dependencies {
compile 'com.hazelcast.jet:hazelcast-jet:4.3'
}
Launching an app in a cluster
- Download hazelcast jet distributtion
- Start cluster node
$ bin/jet-start
- Submit job
$ bin/jet submit path/to/job.jar
You can start several nodes in the network and they will find each other using Multicast to form a cluster. If any node dies, Hazelcast will heal itself and resume running jobs.
To list and stop jobs:
bin/jet list-jobs
bin/jet cancel <job> # Can use name or UUID
Hello World
JetInstance jet = Jet.bootstrappedInstance(); // Connect or create instance
Pipeline p = Pipeline.create();
p.readFrom(TestSources.itemStream(10))
.withoutTimestamps()
.filter(event -> event.sequence() % 2 == 0)
.setName("filter out odd numbers")
.writeTo(Sinks.logger());
jet.newJob(p).join(); // Submit job
Pipeline
Pipeline p = Pipeline.create();
p.readFrom(source)
/* Apply some stages */
.writeTo(sink);
You can fork a pipeline to continue processing same data differently:
Pipeline lines = Pipeline.create().readFrom(Sources.files("file.txt"));
lines.someStagesA().writeTo(sinkA);
lines.someStagesB().writeTo(sinkB);
You can do union of pipelines. They are interleaved in arbitrary order.
BatchStage<String> left = p.readFrom(leftSource);
BatchStage<String> right = p.readFrom(rightSource);
left.merge(right)
.writeTo(sink);
Sources
Where the data comes from.
Batch Sources (Finite elements like an existing list)
They return a BatchSource<T>
Sources.files(dir); // Each file in dir
Sources.jdbc(query, resultLambda); // DB connection
Sources.map(imap); // From Hazelcast IMap, using name string or actual IMap object
Streaming sources (Unbound)
They return a StreamSource<T>
Sources.mapJournal(imap,initPosition); // Changes to IMap as they come
Sources.fileWatcher(dir); // Lines added to files in dir
Sources.socket(host,port,charset); // Text received from socket
Sources.jmsQueue(); Sources.jmsTopic(); // JMS Queue
KafkaSources.kafka(kafkaConnectionProperties,"topic");
TestSources.items(arrayOrIterable); // Given data for testing
TestSources.itemStream(perSecond, generatorLambda); // Stream items returned by lamda
Sinks
Sinks.noop(); // Discard
Sinks.logger(); // To cluster log
Sinks.logger(format); // With custom format
Sinks.map(name); // To Hazelcast IMap (by name or java object)
Sinks.map(name,keyLambda,valueLambda); // Applying a transform
Sinks.mapWithMerging(); Sinks.mapWithUpdating(); // Combine to existing data
Sinks.observable(observable); // To notify an observable (Useful to get notifications in your app)
Sinks.list(list); // To IList
Sinks.socket(host,port,stringLambda); // Write to socket
Sinks.jmsQueue(queue, connectionFactory);
Sinks.jmsTopic(topic, connectionFactory);
Sinks.jdbc(query,db); // Update query to SQL database
Sinks.json(dir); // Each item appended as a json line.
Sinks.cache(cache); // To Hazelcast ICache
Timestamps
To characterize a StreamStage
withNativeTimestamps()
: They come from source (e.g. Kafka)withTimestamps(lambda)
: Get from the event object using lambda.withIngestionTimestamps()
: When the item was received for processind, no matter when it was generated.withoutTimestamps()
: Don't care, will not use window aggregation or they will be assigned at a later stage withaddTimestamps(lambda)
.
Stateless Stages
Functional, same input=same output without keeping internal state. Easily parallelizable.
.map(lambda); // Convert one object to another
.filter(predicate); // Keep only elements where predicate returns true
.flatMap(lamda); // You can return several items for each input using a Traverser(similar to iterator)
.mapUsingIMap(imap,keyLambda,outLambda); // Look up each item in IMap
.mapUsingReplicatedMap(rmap,keyFn,outFn); // Look up each item in ReplicatedMap (that is duplicated fully on each node)
.mapUsingService(service,outFn); // Lookup each item in a service, fn receives original item and service output to produce stage output
.mapUsingServiceAsync(); // If service is async (returns CompletableFuture)
.mapUsingServiceBatched(); // Service receives a list of input and returns a list of results
.mapUsingPython(); // Call python code
.merge(other); // Do union to results of other source, interleaved.
.hashJoin(batchStage,JoinClause.onKeys(leftKeyFn,rightKeyFn), outFN); // Build hash of results of other pipeline, then join elements to current using condition, pass both sides to outFN to generate output
.innerHashJoin(...); // Same with inner semantic, id appeared in both sides
.hashJoin2(...); // Join to two other sources at once.
Stateful Stages
They accumulate/transform data or depend on what came before.
Aggregations
pipeline
.aggregate(aggregateOp); // aggregate whole input (one output)
.rollingAggregate(aggOp); // One output per item received (incremental)
.groupingKey(User::getId).aggregate(aggOp); // Group items with same property, one output per group
.window(windowDefinition).aggregate(aggOp); // One output per window
.window(w).groupingKey(keyFn).aggregate(agg); // One output per window/group
.setEarlyResultsPeriod(period); // For windowed, start producing results earlier even if window is bigger
.aggregate2(aggOp, secondStage, secondAggOp) // Join to other stream and aggregate at the same time.
WindowDefinition
Note: Apply to timestamped pipelines
WindowDefinition
.tumbling(timeunit) // Exclusive windows. [1,2,3] [4,5,6]
.sliding(winsize,slide) // E.g. 30 seconds window, slide by 1 second:
// [0,30), [1,31), [2,32) ...
.sessionWindow(idleTime) // Assume window is closed when didn't receive anything for some time
AggregateOperations
. Javadoc
AggregateOperations
.counting() // Count elements on each group
.averagingLong() // Average
.averagingDouble()
.summingLong() // Sum
.summingDouble()
.minBy(comparator) // Minimum of group using comparator
.maxBy(comparator)
.bottomN(n, comparator) // Keep several minimum
.topN(n, comparator) // Keep several top
.linearTrend(xFn,yFn) // Slope of change (linear regresssion of items within window)
.concatenating(delim) // Build a string with an optional delimiter
.toList() .toSet() toMap(keyFn,valFn) // Gather to collection
.allOf(aggOpA,AggOpB) // Calculate several AggregateOperations in one stage
.pickAny() // Choose a random specimen
.reducing(emtpy, get, combine, /*optional*/ deduct) // Custom reduction
Other Stateful Operations
.distinct() // Remove duplicates, can be applied by window
.sort(/*optional*/comparator) // Sort input using natural order or explicit comparator
.mapStateful(createFn,modifyFn); // Like map, but keep mutable state between calls. Good for finding patterns.
Service
ServiceFactories.
.sharedService(ctx->obj); // Create a Thread-safe instance, once per node
.nonSharedService(ctx->obj); // Create an object, once per parallel jet processor (can be per node)
ServiceFactory
.toNonCooperative(); // Declare that service does blocking calls, jet uses separate thread.
.withAttachedFile(id,file); // Attach file to context so service knows where to read from
.withAttachedDireectory(id,dir); // Attach file to context so service knows where to read from
.withDestroyContextFn(destroyFn); // Attach cleanup lambda
Helper classes
Traverser
Similar to iterator that implements next()
but returns null to finish. Easier to implement using lambdas.
Traversers
.empty()
.singleton(value)
.elements(a,b ...)
.iterator(it)
.stream(stream)
Many functions that end with Ex
are equivalent to Java ones but serializable