Jinlong Yang
upstream nginx.ops.http.cdc {
server 0.0.0.0:80;
balancer_by_lua_file lualib/velar/dynamic.lua;
}
server {
listen 9000;
location ^~ /cdc {
proxy_pass http://nginx.ops.http.cdc;
}
}
http {
....
lua_shared_dict store 100m;
init_by_lua_block {
local init = require 'velar.init_check'
init.check()
}
init_worker_by_lua_block {
local init = require 'velar.init_timer'
init.timer()
local hc = require 'velar.healthcheck'
hc.positive_check()
}
....
}
lualib
└── velar
├── dynamic.lua
├── healthcheck.lua
├── init_check.lua
├── init_timer.lua
└── util
├── qconf.lua
└── util.lua
init_by_lua:
* nginx启动时, 从qconf读取所有配置, 进行配置检查
init_worker_by_lua:
* 启动定时器, 每秒从qconf获取变化的配置, 解析路由规则, 通过排列组合, 穷举每种规则, 匹配得到对应规则的实例信息,使其【事先】写入到nginx共享内存。
* 这样做的好处: 在定时器中, 已经将名字服务路由规则中的各种匹配规则都写入共享内存, 当upstream被访问的时候, 可直接匹配规则, 从共享内存获取实例. 以达到静态配置upstream时的访问速度, 提升性能。
balancer_by_lua_file:
* 获取当前访问的upstream、idc、pubenv、abclass, 从共享内存获取实例信息, 进行路由逐层匹配, 最后经过wrr返回最终的实例
healthcheck: 按需启动对配置的检查
set /nginx/ops/http/cdc '{"service_name": "com.ops.http.cdc", "service_default_port": "5000", "service_connect_retry": 1, "service_timestamp": 1681819575, "service_check_enabled": "yes", "service_check_method": "tcp", "service_check_url": "", "service_check_header": "", "service_check_interval": 1, "service_check_window": 3, "service_route_rule": {"route": ["pubenv", "idc", "abclass"], "route_rule": {"pubenv": {"1": "sandbox", "2": "smallflow", "default": "default"}, "idc": {"dx": "dx", "m6": "m6", "default": "dx"}, "abclass": {"0-10": "seta", "11-100": "setb", "default": "setb"}}}, "service_instances": [{"idc": "dx", "hostname": "dx-ops00.dx", "ip": "10.12.20.189", "port": "", "pubenv": "sandbox", "weight": 1, "abclass": "default"}, {"idc": "m6", "hostname": "dx-ops01.dx", "ip": "10.12.20.41", "port": "", "pubenv": "smallflow", "weight": 1, "abclass": "seta"}, {"idc": "m6", "hostname": "dx-ops02.dx", "ip": "10.12.16.250", "port": "", "pubenv": "default", "weight": 2, "abclass": "setb"}, {"idc": "m6", "hostname": "dx-ops03.dx", "ip": "10.12.25.0", "port": "", "pubenv": "default", "weight": 4, "abclass": "setb"}]}'
curl -H 'x-idc:m6' http://10.12.26.255:9000/cdc
curl -H 'x-idc:m6' -H 'x-pubenv:1' http://10.12.26.255:9000/cdc
curl -H 'x-idc:m6' -H 'x-pubenv:2' http://10.12.26.255:9000/cdc
curl -H 'x-idc:m6' --cookie 'abclass=1665295730_09' http://10.12.26.255:9000/cdc
curl -H 'x-idc:m6' --cookie 'abclass=1665295730_64' http://10.12.26.255:9000/cdc
# -*- coding:utf-8 -*-
from flask import Flask
from werkzeug.serving import run_simple
app = Flask(__name__)
@app.route("/cdc")
def cdc():
host = '10.12.20.189'
return "NOTE: cdc host: %s accessed." % host
if __name__ == '__main__':
run_simple('0.0.0.0', 5000, app)
-- 初始全局变量保存请求
upstream_wrr_dict = {}
function swrr()
for _, item in pairs(instances) do
local current_weight = upstream_wrr_dict[item.ip]
if not current_weight then
current_weight = 0
end
-- 填充current_weight字段
item.current_weight = current_weight
end
local index = 0 -- 请求到来选择服务器的索引
local sum_weight = 0 -- 累加所有服务器的权重
for i=1, #instances do
instances[i].current_weight = instances[i].current_weight + instances[i].weight
sum_weight = sum_weight + instances[i].weight
if index == 0 or instances[index].current_weight < instances[i].current_weight then
index = i
end
-- 记录当前服务器的current_weight
local ip = instances[i].ip
upstream_wrr_dict[ip] = instances[i].current_weight
end
local ip = instances[index].ip
print('当前请求机器: ' .. ip)
upstream_wrr_dict[ip] = upstream_wrr_dict[ip] - sum_weight
end
每个服务器都有两个权重变量:
weight: 配置文件中指定的该服务器的权重, 这个值是固定不变的;
current_weight: 服务器目前的权重, 一开始为0, 之后会动态调整;
每次当请求到来, 选取服务器时, 遍历数组中所有服务器.
对于每个服务器, 让它的current_weight增加它的weight;
同时累加所有服务器的weight, 并保存为sum_weight;
遍历完所有服务器之后, 如果该服务器的current_weight是最大的, 就选择这个服务器处理本次请求.
最后把选中的服务器的current_weight减去sum_weight.