
Developing a Real-Time Financial Analysis & Trading System

In this project, we have to process financial data in real time.

  • Ubuntu 22.04 focal
  • Docker
  • Kubernetes
  • metalLB
  • flask
  • python libraries

There are 4 steps to implement this project:

1.Data Generation:

  • generates simulated financial data.
  • use load balancing
  • send data to Data Ingestion

2. Data Ingestion &stream processing

  • receives the simulated data
  • validates it
  • use kafka for real time stream processing
  • send data to Processing


  • calculates the mandatory trading indicators
  • generates real-time trading signals.


Step1 : Generate Data

In this part, a series of synthetic data is created through the generator.py file and must be placed in an API endpoint that will be given to the next steps for data processing.


The generator.py script simulates financial data and sends it to an API endpoint. It generates stock prices, order book data, news sentiment, market data, and economic indicators. The script uses a single core to ensure maximum performance.

  1. Define Sample Stock Symbols: The script defines a list of sample stock symbols (AAPL, GOOGL, AMZN, MSFT, TSLA) that will be used to generate data.

  2. Define API Endpoint: The script defines the API endpoint ("http://localhost:5000/ingest") where the generated data will be sent.

  3. Generate Stock Price Data: The generate_data() function generates stock price data for a randomly selected stock symbol. It uses a random walk model to simulate price movements.

  4. Generate Additional Data Types: The generate_additional_data() function generates additional data types, such as order book data, news sentiment, market data, and economic indicators.

  5. Send Data to API: The send_data() function sends the generated data to the API endpoint in JSON format.

Preparing the Python Application for Deployment 🪡

instead of using API endpoint, we can use Flask application:

from flask import Flask, jsonify
app = Flask(__name__)

def get_data():
    data = generate_additional_data()
    return jsonify(data)

The Flask application, on the other hand, is used to expose the data for direct consumption. The @app.route("/") decorator defines a GET endpoint that will return the generated data in JSON format. This endpoint can be accessed by sending a GET request to the URL http://localhost:5000.


To run generator.py file and manage it, we need this file.

Themanager.sh script is a Bash script that helps manage the producer, including initializing, starting, and stopping it. Define Virtual Environment Directory: The VENV_DIR variable defines the directory where the virtual environment will be created. In this case, it's named venv.

Define Python Script Name: The SCRIPT_NAME variable defines the name of the Python script to run, which is generator.py in this case.

  • init() Function: This function initializes the project by creating a virtual environment (venv) if it doesn't exist, activating the virtual environment, installing the necessary dependencies from requirements.txt, and displaying a completion message.
python3 -m venv $VENV_DIR
source $VENV_DIR/bin/activate
pip install -r requirements.txt
  • start() Function: This function starts the Python script, generator.py, by activating the virtual environment if it's not already activated, running the script with the '&' operator to run it in the background, and displaying a start message.
  echo "Starting $SCRIPT_NAME..."
  python $SCRIPT_NAME &
  • stop() Function: This function stops the Python script, generator.py, by using pkill -f to kill any running instances of the script.
pkill -f $SCRIPT_NAME


So,run this instruction in the terminal :

cd ./home/***/project
bash manager.sh init
#activating the virtual environment

bash manager.sh start
# Run generator.py

bash manager.sh stop
#stop generator.py


Docker 🪄

1. Packaging the Application as a Docker Image🎁

A Dockerfile contains instructions for building a Docker image, which facilitates a consistent and reproducible environment for Kubernetes deployment.

FROM python:3.8.10
sudo docker build -t generatedate .
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY generator.py .

CMD ["python", "generator.py"]

It sets up a Python environment, copies requirements.txt and generator.py to /app, installs the required packages, and specifies the command to run the Flask app on port 5000.

2. Pushing the Image to a Container Registry 🚌

Build the Docker image with the following command:

sudo docker build -t generatedate .


Tag the Docker image using the following command:

sudo docker tag generatedate localhost:5100/generatedate
sudo docker push localhost:5100/generatedate

run docker

sudo docker run -p 5000:5000 localhost:5100/generatedate

build_container verify:

docker image ls


3. Install minikube and MetalLB 🐋

To install the latest minikube stable release on x86-64 Linux using binary download:

curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
sudo install minikube-linux-amd64 /usr/local/bin/minikube

Install kubectl binary with curl on Linux: Download the latest release with the command:

curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"

Validate the binary (optional), Download the kubectl checksum file:

curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl.sha256"

Validate the kubectl binary against the checksum file:

echo "$(cat kubectl.sha256)  kubectl" | sha256sum --check

If valid, the output is:

kubectl: OK

Install kubectl:

sudo install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl

Test to ensure the version you installed is up-to-date:

kubectl version --client

We use metalLB for load balancing.

MetalLB is a networking service for Kubernetes that allows you to point multiple pods in a Kubernetes cluster to a single domain name or IP. MetalLB uses several types of loopbacks and spokes to distribute traffic to pods.

install metalLB

go to the metalLB and download metallb-native.yaml .

and run this:

minikube start --driver=docker
kubectl apply -f metallb-native.yaml


4. Deploying the Python Application in Kubernetes🕹️

To run your Flask application in Kubernetes, you need to create a deployment. A deployment is defined in a YAML file and specifies details like the Docker image for the application, the number of replicas, and other settings. In Kubernetes, a deployment manages a set of identical pods, where each pod represents a single instance of a running process in a cluster.

apiVersion: apps/v1
kind: Deployment
 name: generatordate-deployment
 replicas: 3 # Set the desired number of replicas (pods) for your application
   app: generatedate
    app: generatedate
   - name: generatedate
     image: localhost:5000/generatedate # Replace with your actual Docker image name
     - containerPort: 5000

Run the kubectl command to apply the deployment

kubectl create deployment generatordate-server --image=localhost:5100/generatedate:latest



kubectl get pods

get pods

activating metalLB addon After start minikube, we must activate the metallb addon. To view all the available addons, run this command:

minikube addons list

addon list

Addons are activated with minikube addons enable:

metallb active

stop and remove deployment

kubectl delete pod <pod_name>
kubectl delete deployment generatordate-server


5. Exposing the Deployment as a Service🧮

To make the Flask app accessible from outside the Kubernetes cluster, create a service to expose the deployment.

In Kubernetes, a service is an abstraction layer that enables communication between a set of pods and external clients. It provides a stable IP address and DNS name for a set of pods, so that other pods or external clients can reliably access the application running in the pod. A service can have different types, such as ClusterIP, NodePort, and LoadBalancer.

apiVersion: v1
kind: Service
  name: generatedate-service
  type: LoadBalancer
    app: generatedate
  sessionAffinity: None
      timeoutSeconds: 10800
  - name: generatedate
    protocol: TCP
    port: 5000
    targetPort: 5000
    # If you set the `spec.type` field to `NodePort` and you want a specific port number,
    # you can specify a value in the `spec.ports[*].nodePort` field.

6. Create metallb config file Ⓜ️

Next you need to create ConfigMap, which includes an IP address range for the load balancer. The pool of IPs must be dedicated to MetalLB's use.

You can't reuse for example the Kubernetes node IPs or IPs controlled by other services.

apiVersion: v1
kind: ConfigMap
  namespace: metallb-system
  name: config
  config: |
    - name: default
      protocol: layer2
      - <ip-address-range-start>-<ip-address-range-stop>

7. running all manifests and checking result⛓️

$kubectl apply -f mykube/


$kubectl get service generatedate-service

This command returns the external IP address of the LoadBalancer service. You can use it to access the Flask REST API from a web browser or HTTP client outside the Kubernetes cluster.

8. result🎥


for result, minikube ip is and docker ip is So , when we go to the minikube ssh and "crul", we can see result(dataset) :


Screenshot 2024-02-03 220225

Step2 : Ingestion and stream processing 📼

for processing data in realtime, we need stream processing services like : apache kafka,apache spark ... . we use apache kafka. we need to pull a Kafka image and integrate it into this setup for real-time data processing.

Deploying Kafka in Minikube:

1. Pull the Kafka Docker Image:

docker pull confluentinc/cp-kafka:latest

pull kafka

2. Create a Kafka Deployment:

Create a kafka-deployment.yaml file with the following content:

apiVersion: apps/v1
kind: Deployment
  name: kafka
  replicas: 1
      app: kafka
        app: kafka
      - name: kafka
        image: confluentinc/cp-kafka:latest
        - containerPort: 9092
          name: kafka
          value: zookeeper:2181
          value: true
          value: 30000
            port: 9092
          initialDelaySeconds: 15
          periodSeconds: 20
          successThreshold: 1
          timeoutSeconds: 5
            port: 9092
          initialDelaySeconds: 15
          periodSeconds: 20
          successThreshold: 1
          timeoutSeconds: 5

3. Create a Kafka Service:

Create a kafka-service.yaml file with the following content:

apiVersion: v1
kind: Service
  name: kafka
    app: kafka
  - port: 9092
    targetPort: 9092
  type: NodePort

4. Apply the Deployments:

kubectl apply -f kafka/

apply kafka



5. Deploying Kafka Connect:

Ensure to update the BOOTSTRAP_SERVERS environment variable in the deployment to point to Kafka service:

  value: http://kafka:9092

according to this command ,we must update generator.py . we create generator-update.py

and for this file we use confluent_kafka library,that we run this command:

pip install confluent-kafka

in dockerfile ,generator.py file is included in docker image via COPY instructions:

COPY generator.py .

+  you'll need to rebuild your Docker image.

⚠️Don't forget to update dockerfile with new app file:

COPY generator-update.py .

Step3 : process ⚒️

Now, we can processes the data by calculating financial indicators. Indicators are:

Moving Average (MA) Exponential Moving Average (EMA) Relative Strength Index (RSI).

so for this instruction,we get data from kafka and process them by calculating financial.then we can show it: 1.wrtie kafka consumer for get data:

from kafka import KafkaConsumer
import pandas as pd

consumer = KafkaConsumer('my_topic',
                         value_deserializer=lambda x: x.decode('utf-8')

for message in consumer:
    data = message.value.decode("utf-8")
    df = pd.read_json(data)
    print("Received data: ", df.head())

2.write calculation functions 3.write Kafka Producer to get result and show in broker:

from kafka import KafkaProducer

producer = KafkaProducer(

producer.send("topic_name", value="Hello, World!".encode("utf-8"))

