/eland_es_analytics

Example using Eland (Python Client and Toolkit for DataFrames, Big Data, Machine Learning and ETL in Elasticsearch)

Primary LanguageJupyter Notebook

Overview

eland is Python Elasticsearch client for exploring and analyzing data residing in Elasticsearch with a familiar Pandas-compatible API.

Where possible the package uses existing Python APIs and data structures to make it easy to switch between numpy, pandas, scikit-learn to their Elasticsearch powered equivalents. In general, the data resides in Elasticsearch and not in memory, which allows Eland to access large datasets stored in Elasticsearch.Elasticsearch.

Installation

Eland can be installed from PyPI via pip:

pip install eland

JupyterLab is a web-based interactive development environment for Jupyter notebooks, code, and data. JupyterLab is flexible: configure and arrange the user interface to support a wide range of workflows in data science, scientific computing, and machine learning. JupyterLab is extensible and modular: write plugins that add new components and integrate with existing ones.

JupyterLab can be installed using pip

pip install jupyterlab

The Jupyter Notebook is an open-source web application that allows you to create and share documents that contain live code, equations, visualizations and narrative text. Uses include: data cleaning and transformation, numerical simulation, statistical modeling, data visualization, machine learning, and much more.

Jupyter Notebook can be installed using pip

pip install notebook

To run the notebook, run the following command at the Terminal (Mac/Linux) or Command Prompt (Windows):

jupyter notebook

Online Retail Analysis

The data used in this article is derived from a dataset referenced in kaggle. This dataset was randomized. items and other data are fabricated and any resemblance to real data is coincidental.

Download invoices.csv and let's get started.

import eland as ed
import pandas as pd
import matplotlib.pyplot as plt

# import elasticsearch-py client
from elasticsearch import Elasticsearch

# Function for pretty-printing JSON
def json(raw):
    import json
    print(json.dumps(raw, indent=2, sort_keys=True))

Let’s quickly go over the libraries I’ve imported:

  • Eland — to load the data from file or elasticsearch as an eland data frame and analyze the data.

  • Pandas — to load the data file as a Pandas data frame and analyze the data.

  • From Matplotlib I’ve imported pyplot in order to plot graphs of the data

Let's create an elasticsearch client using python offcial client

# Connect to an Elasticsearch instance
# here we use the official Elastic Python client
# check it on https://github.com/elastic/elasticsearch-py
es = Elasticsearch(
  ['http://localhost:9200'],
  http_auth=("es_kbn", "changeme")
)
# print the connection object info (same as visiting http://localhost:9200)
# make sure your elasticsearch node/cluster respond to requests
json(es.info())

Here we will load our dataset from a csv file into a pandas dataframe

# Load the dataset from the local csv file of call logs
pd_df = pd.read_csv("/home/telcos-ecs/eland_es_analytics/invoices.csv", sep=';', encoding = 'unicode_escape')
pd_df.info()

We can see tha type of the DataFrame returned is pandas.core.frame.DataFrame

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 541909 entries, 0 to 541908
Data columns (total 13 columns):
 #   Column             Non-Null Count   Dtype  
---  ------             --------------   -----  
 0   invoice_id         541909 non-null  object 
 1   item_id            541909 non-null  int64  
 2   item_model         541909 non-null  object 
 3   item_name          541909 non-null  object 
 4   item_brand         541909 non-null  object 
 5   item_vendor        541909 non-null  object 
 6   order_qty          541909 non-null  int64  
 7   invoice_date       541909 non-null  object 
 8   unit_price         541909 non-null  float64
 9   customer_id        541909 non-null  int64  
 10  country_name       541909 non-null  object 
 11  country_latitude   541909 non-null  float64
 12  country_longitude  541909 non-null  float64
dtypes: float64(3), int64(3), object(7)
memory usage: 53.7+ MB

Let's apply some tranformations to our dataset before indexing into elasticsearch

#converting the type of Invoice Date Field from string to datetime.
pd_df['invoice_date'] = pd.to_datetime(pd_df['invoice_date'])

# Arrange prices for phones
pd_df['unit_price'] = pd_df['unit_price'] * 10.00

# Rename the columns to be snake_case
pd_df.columns = [x.lower().replace(" ", "_") for x in pd_df.columns]

# Combine the 'latitude' and 'longitude' columns into one column 'location' for 'geo_point'
pd_df["country_location"] = pd_df[["country_latitude", "country_longitude"]].apply(lambda x: ",".join(str(item) for item in x), axis=1)

# Drop the old columns in favor of 'location'
pd_df.drop(["country_latitude", "country_longitude"], axis=1, inplace=True)

pd_df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 541909 entries, 0 to 541908
Data columns (total 12 columns):
 #   Column            Non-Null Count   Dtype         
---  ------            --------------   -----         
 0   invoice_id        541909 non-null  object        
 1   item_id           541909 non-null  int64         
 2   item_model        541909 non-null  object        
 3   item_name         541909 non-null  object        
 4   item_brand        541909 non-null  object        
 5   item_vendor       541909 non-null  object        
 6   order_qty         541909 non-null  int64         
 7   invoice_date      541909 non-null  datetime64[ns]
 8   unit_price        541909 non-null  float64       
 9   customer_id       541909 non-null  int64         
 10  country_name      541909 non-null  object        
 11  country_location  541909 non-null  object        
dtypes: datetime64[ns](1), float64(1), int64(3), object(7)
memory usage: 49.6+ MB

Let's load the dataframe into elasticsearch using eland

# Load the data into elasticsearch
ed_df = ed.pandas_to_eland(
    pd_df=pd_df,
    es_client=es,

    # Where the data will live in Elasticsearch
    es_dest_index="es-invoices",

    # Type overrides for certain columns, this can be used to customize index mapping before ingest
    es_type_overrides={
        "invoice_id": "keyword",
        "item_id": "keyword",
        "item_model": "keyword",
        "item_name": "keyword",     
        "item_brand": "keyword",
        "item_vendor": "keyword",   
        "order_qty": "integer",
        "invoice_date": "date",
        "unit_price": "float",  
        "customer_id": "keyword",
        "country_name": "keyword",
        "country_location": "geo_point"  
    },

    # If the index already exists what should we do?
    es_if_exists="replace",

    # Wait for data to be indexed before returning
    es_refresh=True,
)
ed_df.info()

We can see tha type of the DataFrame returned is eland.dataframe.DataFrame

<class 'eland.dataframe.DataFrame'>
Index: 541909 entries, 1500 to 541908
Data columns (total 12 columns):
 #   Column            Non-Null Count   Dtype         
---  ------            --------------   -----         
 0   country_location  541909 non-null  object        
 1   country_name      541909 non-null  object        
 2   customer_id       541909 non-null  object        
 3   invoice_date      541909 non-null  datetime64[ns]
 4   invoice_id        541909 non-null  object        
 5   item_brand        541909 non-null  object        
 6   item_id           541909 non-null  object        
 7   item_model        541909 non-null  object        
 8   item_name         541909 non-null  object        
 9   item_vendor       541909 non-null  object        
 10  order_qty         541909 non-null  int64         
 11  unit_price        541909 non-null  float64       
dtypes: datetime64[ns](1), float64(1), int64(1), object(9)
memory usage: 64.0 bytes

Note that the data resides in Elasticsearch and not in memory, which allows Eland to access large datasets stored in Elasticsearch

We can get an eland data frame by reading directly the csv file and load to elasticsearch using eland

To get started, let’s create an eland.DataFrame by reading a csv file. This creates and populates the es-customers index in the local Elasticsearch cluster.

ed_df = ed.read_csv("/home/telcos-ecs/eland_es_analytics/invoices.csv",
                es_client=es,

    # Where the data will live in Elasticsearch
    es_dest_index="es-invoices",

    # Type overrides for certain columns, this can be used to customize index mapping before ingest
    es_type_overrides={
        "invoice_id": "keyword",
        "item_id": "keyword",
        "item_model": "keyword",
        "item_name": "keyword",     
        "item_brand": "keyword",
        "item_vendor": "keyword",   
        "order_qty": "integer",
        "invoice_date": "date",
        "unit_price": "float",  
        "customer_id": "keyword",
        "country_name": "keyword",
        "country_location": "geo_point"  
    },

    # If the index already exists what should we do?
    es_if_exists="replace",

    # Wait for data to be indexed before returning
    es_refresh=True,
)
ed_df.info()
<class 'eland.dataframe.DataFrame'>
Index: 541909 entries, 1500 to 541908
Data columns (total 12 columns):
 #   Column            Non-Null Count   Dtype         
---  ------            --------------   -----         
 0   country_location  541909 non-null  object        
 1   country_name      541909 non-null  object        
 2   customer_id       541909 non-null  object        
 3   invoice_date      541909 non-null  datetime64[ns]
 4   invoice_id        541909 non-null  object        
 5   item_brand        541909 non-null  object        
 6   item_id           541909 non-null  object        
 7   item_model        541909 non-null  object        
 8   item_name         541909 non-null  object        
 9   item_vendor       541909 non-null  object        
 10  order_qty         541909 non-null  int64         
 11  unit_price        541909 non-null  float64       
dtypes: datetime64[ns](1), float64(1), int64(1), object(9)
memory usage: 64.0 bytes

Here we see that the "_id" field was used to index our data frame.

ed_df.index.es_index_field

Next, we can check which field from elasticsearch are available to our eland data frame. columns is available as a parameter when instantiating the data frame which allows one to choose only a subset of fields from your index to be included in the data frame. Since we didn’t set this parameter, we have access to all fields.

ed_df.columns

Index(['country_location', 'country_name', 'customer_id', 'invoice_date',
       'invoice_id', 'item_brand', 'item_id', 'item_model', 'item_name',
       'item_vendor', 'order_qty', 'unit_price'],
      dtype='object')

Go to Kibana Dev Console and check our index es-invoices

GET es-invoices/_search?track_total_hits=true&size=1

The response should be like

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 541909,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "es-invoices",
        "_id" : "138000",
        "_score" : 1.0,
        "_source" : {
          "invoice_id" : "553464",
          "item_id" : 35310308,
          "item_model" : "SM-J110H/DS",
          "item_name" : "Samsung SM-J110H/DS",
          "item_brand" : "Samsung",
          "item_vendor" : "Samsung Korea",
          "order_qty" : 12,
          "invoice_date" : "2019-05-17T11:07:00",
          "unit_price" : 16.5,
          "customer_id" : 16218,
          "country_name" : "Morocco",
          "country_location" : "31.791702,-7.09262"
        }
      }
    ]
  }
}

Boolean Indexing

we also allow you to filter the data frame using boolean indexing. Under the hood, a boolean index maps to a terms query that is then passed to elasticsearch to filter the index.

# the construction of a boolean vector maps directly to an elasticsearch query
print(ed_df['country_name'] == 'Morocco')
ed_df[(ed_df['country_name'] == 'Morocco')].head(5)
{'term': {'country_name': 'Morocco'}}