DataFrame API written in Nim, enabling fast out-of-core data processing.
NimData is inspired by frameworks like Pandas/Spark/Flink/Thrill, and sits between the Pandas and the Spark/Flink/Thrill side. Similar to Pandas, NimData is currently non-distributed, but shares the type-safe, lazy API of Spark/Flink/Thrill. Thanks to Nim, it enables elegant out-of-core processing at native speed.
NimData's core data type is a generic DataFrame[T]
. The methods
of a data frame can be categorized into generalizations
of the Map/Reduce concept:
- Transformations: Operations like
map
orfilter
transform one data frame into another. Transformations are lazy and can be chained. They will only be executed once an action is called. - Actions: Operations like
count
,min
,max
,sum
,reduce
,fold
,collect
, orshow
perform an aggregation of a data frame, and trigger the processing pipeline.
For a complete reference of the supported operations in NimData refer to the module docs.
The following tutorial will give a brief introduction of the main functionality based on this German soccer data set.
To create a data frame which simply iterates over the raw text content
of a file, we can use DF.fromFile
:
let dfRawText = DF.fromFile("examples/Bundesliga.csv")
The operation is lazy, so nothing happens so far.
The type of the dfRawText
is a plain DataFrame[string]
.
We can still perform some initial checks on it:
echo dfRawText.count()
# => 14018
The count()
method is an action, which triggers the line-by-line reading of the
file, returning the number of rows. We can re-use dfRawText
with different
transformations/actions. The following would filter the file to the first
5 rows and perform a show
action to print the records.
dfRawText.take(5).show()
# =>
# "1","Werder Bremen","Borussia Dortmund",3,2,1,1963,1963-08-24 09:30:00
# "2","Hertha BSC Berlin","1. FC Nuernberg",1,1,1,1963,1963-08-24 09:30:00
# "3","Preussen Muenster","Hamburger SV",1,1,1,1963,1963-08-24 09:30:00
# "4","Eintracht Frankfurt","1. FC Kaiserslautern",1,1,1,1963,1963-08-24 09:30:00
# "5","Karlsruher SC","Meidericher SV",1,4,1,1963,1963-08-24 09:30:00
Each action call results in the file being read from scratch.
Now let's parse the CSV into type-safe tuple objects using map
.
The price for achieving compile time safety is that the schema
has to be specified once for the compiler.
Fortunately, Nim's meta programming capabilities make this very
straightforward. The following example uses the schemaParser
macro. This macro automatically generates a parsing function,
which takes a string
as input and returns a type-safe tuple
with fields corresponding to the schema
definition.
Since our data set is small and we want to perform multiple operations on it,
it makes sense to persist the parsing result into memory.
This can be done by using cache()
method.
As a result, all operations performed on df
will not have to re-read
the file, but read the already parsed data from memory.
Spark users note: In contrast to Spark, cache()
is currently implemented
as an action.
const schema = [
strCol("index"),
strCol("homeTeam"),
strCol("awayTeam"),
intCol("homeGoals"),
intCol("awayGoals"),
intCol("round"),
intCol("year"),
dateCol("date", format="yyyy-MM-dd hh:mm:ss")
]
let df = dfRawText.map(schemaParser(schema, ','))
.map(record => record.projectAway(index))
.cache()
What happens here, is that the schemaParser
macro constructs a specialized parsing function,
which takes a string as input and returns a named tuple with fields corresponding to the schema
definition. The resulting tuple would for instance have a field record.homeGoals
of type int64
.
A first benefit of having a compile-time schema is that the parser can produce highly optimized machine code, resulting in a very fast parsing performance.
Note that our files contains a date column, which uses a non-standard date format. This can be
handled by specifying an appropriate date format string. For some reason, the file also contains
an index
column which is rather boring (technically it's a quoted string of integer indices).
We can get rid of the column by using the projectAway
macro. In general this macro can be used
to transform a tuple/schema into a reduced version with certain columns removed.
Other useful type-safe schema transformation macros are projectTo
, which instead keeps certain fields,
and addFields
, which extends the tuple/schema by new fields.
We can perform the same checks as before, but this time the data frame contains the parsed tuples:
echo df.count()
# => 14018
df.take(5).show()
# =>
# +------------+------------+------------+------------+------------+------------+------------+
# | homeTeam | awayTeam | homeGoals | awayGoals | round | year | date |
# +------------+------------+------------+------------+------------+------------+------------+
# | "Werder B… | "Borussia… | 3 | 2 | 1 | 1963 | 1963-08-2… |
# | "Hertha B… | "1. FC Nu… | 1 | 1 | 1 | 1963 | 1963-08-2… |
# | "Preussen… | "Hamburge… | 1 | 1 | 1 | 1963 | 1963-08-2… |
# | "Eintrach… | "1. FC Ka… | 1 | 1 | 1 | 1963 | 1963-08-2… |
# | "Karlsruh… | "Meideric… | 1 | 4 | 1 | 1963 | 1963-08-2… |
# +------------+------------+------------+------------+------------+------------+------------+
Note that instead of starting the pipeline from dfRawText
and using
caching, we could always write the pipeline from scratch:
DF.fromFile("examples/Bundesliga.csv")
.map(schemaParser(schema, ','))
.map(record => record.projectAway(index))
.take(5)
.show()
Data can be filtered by using filter
. For instance, we can filter the data to get games
of a certain team only:
df.filter(record =>
record.homeTeam.contains("Freiburg") or
record.awayTeam.contains("Freiburg")
)
.take(5)
.show()
# =>
# +------------+------------+------------+------------+------------+------------+------------+
# | homeTeam | awayTeam | homeGoals | awayGoals | round | year | date |
# +------------+------------+------------+------------+------------+------------+------------+
# | "Bayern M… | "SC Freib… | 3 | 1 | 1 | 1993 | 1993-08-0… |
# | "SC Freib… | "Wattensc… | 4 | 1 | 2 | 1993 | 1993-08-1… |
# | "Borussia… | "SC Freib… | 3 | 2 | 3 | 1993 | 1993-08-2… |
# | "SC Freib… | "Hamburge… | 0 | 1 | 4 | 1993 | 1993-08-2… |
# | "1. FC Ko… | "SC Freib… | 2 | 0 | 5 | 1993 | 1993-09-0… |
# +------------+------------+------------+------------+------------+------------+------------+
Or search for games with many home goals:
df.filter(record => record.homeGoals >= 10)
.show()
# =>
# +------------+------------+------------+------------+------------+------------+------------+
# | homeTeam | awayTeam | homeGoals | awayGoals | round | year | date |
# +------------+------------+------------+------------+------------+------------+------------+
# | "Borussia… | "Schalke … | 11 | 0 | 18 | 1966 | 1967-01-0… |
# | "Borussia… | "Borussia… | 10 | 0 | 12 | 1967 | 1967-11-0… |
# | "Bayern M… | "Borussia… | 11 | 1 | 16 | 1971 | 1971-11-2… |
# | "Borussia… | "Borussia… | 12 | 0 | 34 | 1977 | 1978-04-2… |
# | "Borussia… | "Arminia … | 11 | 1 | 12 | 1982 | 1982-11-0… |
# | "Borussia… | "Eintrach… | 10 | 0 | 8 | 1984 | 1984-10-1… |
# +------------+------------+------------+------------+------------+------------+------------+
Note that we can now fully benefit from type-safety: The compiler knows the exact fields and types of a record. No dynamic field lookup and/or type casting is required. Assumptions about the data structure are moved to the earliest possible step in the pipeline, allowing to fail early if they are wrong. After transitioning into the type-safe domain, the compiler helps to verify the correctness of even long processing pipelines, reducing the risk of runtime errors.
Other filter-like transformation are:
take
, which takes the first N records as already seen.drop
, which discard the first N records.filterWithIndex
, which allows to define a filter function that take both the index and the elements as input.
A DataFrame[T]
can be converted easily into a seq[T]
(Nim's native dynamic
arrays) by using collect
:
echo df.map(record => record.homeGoals)
.filter(goals => goals >= 10)
.collect()
# => @[11, 10, 11, 12, 11, 10]
A DataFrame of a numerical type allows to use functions like min
/max
/mean
.
This allows to get things like:
echo "Min date: ", df.map(record => record.year).min()
echo "Max date: ", df.map(record => record.year).max()
echo "Average home goals: ", df.map(record => record.homeGoals).mean()
echo "Average away goals: ", df.map(record => record.awayGoals).mean()
# =>
# Min date: 1963
# Max date: 2008
# Average home goals: 1.898130974461407
# Average away goals: 1.190754743900699
# Let's find the highest defeat
let maxDiff = df.map(record => (record.homeGoals - record.awayGoals).abs).max()
df.filter(record => (record.homeGoals - record.awayGoals) == maxDiff)
.show()
# =>
# +------------+------------+------------+------------+------------+------------+------------+
# | homeTeam | awayTeam | homeGoals | awayGoals | round | year | date |
# +------------+------------+------------+------------+------------+------------+------------+
# | "Borussia… | "Borussia… | 12 | 0 | 34 | 1977 | 1978-04-2… |
# +------------+------------+------------+------------+------------+------------+------------+
A data frame can be transformed into a sorted data frame by the sort()
method.
Without specifying any arguments, the operation would sort using default
comparison over all columns. By specifying a key function and the sort order,
we can for instance rank the games by the number of away goals:
df.sort(record => record.awayGoals, SortOrder.Descending)
.take(5)
.show()
# =>
# +------------+------------+------------+------------+------------+------------+------------+
# | homeTeam | awayTeam | homeGoals | awayGoals | round | year | date |
# +------------+------------+------------+------------+------------+------------+------------+
# | "Tasmania… | "Meideric… | 0 | 9 | 27 | 1965 | 1966-03-2… |
# | "Borussia… | "TSV 1860… | 1 | 9 | 29 | 1965 | 1966-04-1… |
# | "SSV Ulm" | "Bayer Le… | 1 | 9 | 25 | 1999 | 2000-03-1… |
# | "Rot-Weis… | "Eintrach… | 1 | 8 | 32 | 1976 | 1977-05-0… |
# | "Borussia… | "Bayer Le… | 2 | 8 | 10 | 1998 | 1998-10-3… |
# +------------+------------+------------+------------+------------+------------+------------+
The DataFrame[T].unique()
transformation filters a data frame to unique elements.
This can be used for instance to find the number of teams that appear in the data:
echo df.map(record => record.homeTeam).unique().count()
# => 52
Pandas user note: In contrast to Pandas, there is no differentiation between
a one-dimensional series and multi-dimensional data frame (unique
vs drop_duplicates
).
unique
works the same in for any hashable type T
, e.g., we might as well get
a data frame of unique pairs:
df.map(record => record.projectTo(homeTeam, awayTeam))
.unique()
.take(5)
.show()
# =>
# +------------+------------+
# | homeTeam | awayTeam |
# +------------+------------+
# | "Werder B… | "Borussia… |
# | "Hertha B… | "1. FC Nu… |
# | "Preussen… | "Hamburge… |
# | "Eintrach… | "1. FC Ka… |
# | "Karlsruh… | "Meideric… |
# +------------+------------+
The DataFrame[T].valueCounts()
transformation extends the functionality of
unique()
by returning the unique values and their respective counts.
The type of the transformed data frame is a tuple of (key: T, count: int)
,
where T
is the original type.
In our example, we can use valueCounts()
for instance to find the most
frequent results in German soccer:
df.map(record => record.projectTo(homeGoals, awayGoals))
.valueCounts()
.sort(x => x.count, SortOrder.Descending)
.map(x => (
homeGoals: x.key.homeGoals,
awayGoals: x.key.awayGoals,
count: x.count
))
.take(5)
.show()
# =>
# +------------+------------+------------+
# | homeGoals | awayGoals | count |
# +------------+------------+------------+
# | 1 | 1 | 1632 |
# | 2 | 1 | 1203 |
# | 1 | 0 | 1109 |
# | 2 | 0 | 1092 |
# | 0 | 0 | 914 |
# +------------+------------+------------+
This transformation first projects the data onto a named tuple of
(homeGoals, awayGoals)
. After applying valueCounts()
the data
frame is sorted according to the counts. The final map()
function
is purely for cosmetics of the resulting table, projecting the nested
(key: (homeGaols: int, awayGoals: int), counts: int)
tuple back
to a flat result.
Data frames can be opened and inspected in the browser by using df.openInBrowser()
,
which offers a simple Javascript based data browser:
Note that the viewer uses static HTML, so it should only be applied to small or heavily filtered data frames.
NimData requires to have Nim installed. On systems where a C compiler and git is available, the best method is to compile Nim from the GitHub sources. Modern versions of Nim include Nimble (Nim's package manager), and building them both would look like:
# clone Nim
git clone https://github.com/nim-lang/Nim.git
cd Nim
# build the C sources
git clone --depth 1 https://github.com/nim-lang/csources.git
cd csources
sh build.sh
cd ../
# build Nim & Nimble
bin/nim c koch
./koch boot -d:release
./koch nimble
# add ./bin to path
export PATH=$PATH:`readlink -f ./bin`
With Nim and Nimble installed, installing NimData becomes:
$ nimble install NimData
This will download the NimData source from GitHub and put it in ~/.nimble/pkgs
.
A minimal NimData program would look like:
import nimdata
echo DF.fromRange(0, 10).collect()
To compile and run the program use nim -r c test.nim
(c
for compile, and -r
to run directly after compilation).
More meaningful benchmarks are still on the todo list. This just shows a few first results. The benchmarks will be split into small (data which fits into memory so we can compare against Pandas or R easily) and big (where we can only compare against out-of-core frameworks).
All implementations are available in the benchmarks folder.
The test data set is 1 million rows CSV with two int and two float columns. The test tasks are:
- Parse/Count: Just the most basic operations -- iterating the file, applying parsing, and return a count.
- Column Averages: Same steps, plus an additional computation of all 4 column means.
The results are average runtime in seconds of three runs:
Task | NimData | Pandas | Spark (4 cores) | Dask (4 cores) |
---|---|---|---|---|
Parse/Count | 0.165 | 0.321 | 1.606 | 0.182 |
Column Averages | 0.259 | 0.340 | 1.179 | 0.622 |
Note that Spark internally caches the file over the three runs, so the first iteration is much slower (with > 3 sec) while it reaches run times of 0.6 sec in the last iterations (obviously the data is too small to justify the overhead anyway).
- More transformations:
- map
- filter
- flatMap
- sort
- unique
- valueCounts
- groupBy (reduce)
- groupBy (transform)
- join (inner)
- join (outer)
- concat/union
- window
- More actions:
- numerical aggergations (count, min, max, sum, mean)
- collect
- show
- openInBrowser
- More data formats/sources
- csv
- gzipped csv
- parquet
- S3
- REPL or Jupyter kernel?
- Plotting (maybe in the form of Bokeh bindings)?
This project is licensed under the terms of the MIT license.