/flink-spector

Framework for Apache Flink unit tests

Primary LanguageJavaApache License 2.0Apache-2.0

Flinkspector

This project provides a framework to define unit tests for Apache Flink data flows. The framework executes the data flows locally and verifies the output using predefined expectations.

Features include:

  • Concise DSL to define test scenarios.
  • Powerful matchers to express expectations.
  • Test base for JUnit.
  • Test stream windowing with timestamped input.

Check out the wiki to learn how Flinkspector can assist you in developing Flink jobs.

Examples

###Minimal:

class Test extends DataSetTestBase {
    
    @org.junit.Test
    public myTest() {
		DataSet<Integer> dataSet = createTestDataSet(asList(1,2,3))
		    .map((MapFunction<Integer,Integer>) (value) -> {return value + 1});

		ExpectedRecords<Integer> expected = 
		    new ExpectedRecords<Integer>().expectAll(asList(2,3,4))

		assertDataSet(dataSet, expected);
    }

}

###Streaming:

@org.junit.Test
public void testWindowing() {

	// Define the input DataStream:	
	DataStream<Tuple2<Integer, String>> testStream =
			createTimedTestStreamWith(Tuple2.of(1, "fritz"))
					.emit(Tuple2.of(1, "hans"), after(15, seconds))	
					.emit(Tuple2.of(1, "heidi"), before(5, seconds))	
					.emit(Tuple2.of(3, "peter"), after(20, seconds), times(10))	
					.repeatAll(after(10, seconds), times(1))
					.close();

		
	// Lets you query the output tuples like a table:
	OutputMatcher<Tuple2<Integer, String>> matcher =
			//define keys for the values in your tuple:
			new MatchTuples<Tuple2<Integer, String>>("value", "name")
					.assertThat("value", is(3))
					.assertThat("name", either(is("fritz")).or(is("peter")))
					.onEachRecord();
	
	assertStream(someWindowAggregation(testStream), matcher);
}

You can find more extensive examples here:

Getting started

Manual Build:

  1. Clone this repo: git clone https://github.com/ottogroup/flink-spector.

Note: The current build works with Flink version 1.0.0. If you're using an older version, clone the matching branch.

  1. Build with maven: maven install.
  2. Include in your project's pom.xml:
<dependency>
    <groupId>org.flinkspector</groupId>
    <articaftId>flinkspector-dataset</artifactId>
    <version>0.1-SNAPSHOT</version>
</dependency>

or for the Flink DataStream API:

<dependency>
    <groupId>org.flinkspector</groupId>
    <articaftId>flinkspector-datastream</artifactId>
    <version>0.1-SNAPSHOT</version>
</dependency>

Origins

The project was conceived at the Business Intelligence department of Otto Group.

Build Status

Build Status

License

Licensed under the Apache License 2.0