├── run_kmeans.py
├── requirment.txt
├── README.md
├── output
│ ├── summaries
│ │ ├── kmeans2
│ │ │ ├── summery2.txt
│ │ │ └── input_data2.txt
│ │ └── kmeans1
│ │ ├── summery1.txt
│ │ └── input_data1.txt
│ └── plots
│ ├── Wkmeans2.png
│ └── Wkmeans1.png
├── Kmeans
│ ├── K_means.py
│ └── __init__.py
├── data
│ └── data_128000_8_25_2noshuffle
└── conf
├── spark_start.py
└── __init__.py
Automated Testing
In order to test with Spark, we use the pyspark Python package, which is bundled with the Spark JARs required to programmatically start-up and tear-down a local Spark instance, on a per-test-suite basis (we recommend using the setUp and tearDown methods in unittest.TestCase to do this once per test-suite). Note, that using pyspark to run Spark is an alternative way of developing with Spark as opposed to using the PySpark shell or spark-submit.
Given that we have chosen to structure our ETL jobs in such a way as to isolate the 'Transformation' step into its own function (see 'Structure of an ETL job' above), we are free to feed it a small slice of 'real-world' production data that has been persisted locally - e.g. in tests/test_data or some easily accessible network directory - and check it against known results (e.g. computed manually or interactively within a Python interactive console session).
To execute the example unit test for this project run,
pipenv run python -m unittest tests/test_*.py
download with command
wget https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
Tar file
tar -xvzf spark-*
move to /opt/
mv spark-3.0.1-bin-hadoop2.7/ /opt/spark
root@ubuntu1804:~# echo "export SPARK_HOME=/opt/spark" >> ~/.profile
root@ubuntu1804:~# echo "export PATH=$PATH:/opt/spark/bin:/opt/spark/sbin" >> ~/.profile
root@ubuntu1804:~# echo "export PYSPARK_PYTHON=/usr/bin/python3" >> ~/.profile
source ~/.profile
export SPARK_HOME=/opt/spark
export PATH=$SPARK_HOME/bin:$PATH:$SPARK_HOME/python:$PATH
export PATH=$PATH:$SPARK_HOME/bin
export PYTHONPATH=$SPARK_HOME/PYTHON:$PYTHONPATH
export PYSPARK_PYTHON=python3.8
create new project with python3.8 for example and pip env
follow path -> edit configurations -> environment variables and specific SPARK_HOME adn PYTHONPATH py4j
SPARK_HOME =/opt/spark;PYTHONPATH=/opt/spark/python/lib/py4j-0.10.9-src.zip
pip intstall requirment.txt
pip install pyspark
In cluster job run master for manage and slave fo UI support
./start-master.sh -h 192.168.80.128 -p 8080
./start-slave.sh spark://192.168.80.128:8080
SPARK_MASTER = 'spark://192.168.80.128:8080'
SPARK_APP_NAME = 'Kmeans'
The written code has items that are read in a range of 1000 from the file and a kmeans step is applied Each item with the kmeans ++ method Is selected.
The center of each cluster is considered as a property and merges with a thousand other points, of which another thousand points weigh 1 and their prediction is null. The previous step is repeated in the same way.
kmeans = KMeans(k=50, maxIter=1, featuresCol='features', initSteps=3, weightCol='weight')
def setParams(self, featuresCol="features", predictionCol="prediction", k=2,
initMode="k-means||", initSteps=2, tol=1e-4, maxIter=20, seed=None,
distanceMeasure="euclidean", weightCol=None):
initMode="k-means|| => Kmmeans ++
weightCol='weight' set weight column
kmm = kmeans.fit(vector)
centers = kmm.clusterCenters()
[192028.64430965524,121590.81737107041]
[190810.6852672185,119556.44188337866]
[190971.6772525595,122856.41394189643]
we have one dataframe with prediction column and weight we must know prediction cluster center so change to dataframe cluster centers array
dcenters = [e.tolist() for e in centers] dfcenters = sc.parallelize(dcenters).toDF([])
now we merge perdiction dataframe so we have for each cluster center prediction and weight.
# +--------------------+----------+------+------+
# | clusterCenters array -----> features | id
# +--------------------+----------+------+------+
makedataframe
: Converts any multidimensional data to a custom data frame regardless of the number of items.
def makedataframe(dataframe):
items = rowsplit(dataframe)
for item in range(0, items):
dataframe = dataframe.withColumn('f_{}'.format(item), split(dataframe.value, ' ').getItem(item).cast('double'))
vectorModel
:
We convert all created columns, each of which is a dimension of our point, into a property features
def vectorModel(items):
mylist = []
for item in range(0, items):
col = "f_{}".format(item)
mylist.append(col)
# print('list = ' + mylist.__str__())
vecAssembler = VectorAssembler(inputCols=mylist, outputCol="features")
return vecAssembler
merge Previous Kmeans centers with more weight to next vectors
customUnion(vector, result)