my-rpc is a rpc framework based on netty
https://github.com/molamolaxxx/my-rpc
src/main/java/com/mola/rpc/core/spring
使用起来很简单:
@Configuration
public class RpcConsumerConfig {
@RpcConsumer(loadBalanceStrategy = LoadBalanceConstants.CONSISTENCY_HASHING_STRATEGY)
private OrderService orderService;
}
但是在consumer中,并没有OrderService的实现,注入的对象是jdk代理对象(Proxy)
因为需要根据注入bean的接口类型,动态创建代理类,所以使用FactoryBean实现
public class RpcConsumerFactoryBean implements FactoryBean {
@Resource
private RpcProxyInvokeHandler nettyProxyInvokeHandler;
private Class<?> consumerInterface;
public RpcConsumerFactoryBean(Class<?> consumerInterface) {
this.consumerInterface = consumerInterface;
}
@Override
public Object getObject() throws Exception {
Object proxyInstance = Proxy.newProxyInstance(
RpcConsumerFactoryBean.class.getClassLoader(),
new Class[]{consumerInterface},
nettyProxyInvokeHandler);
return proxyInstance;
}
@Override
public Class<?> getObjectType() {
return consumerInterface;
}
}
需要手动创建bean defination,通过ImportBeanDefinitionRegistrar导入到spring中
public class RpcConsumerImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry beanDefinitionRegistry) {
try {
for (String beanDefinitionName : beanDefinitionRegistry.getBeanDefinitionNames()) {
....
// 获取configure中的fieldName,作为beanName
Field[] fields = clazz.getDeclaredFields();
Set<String> alreadyAddedClazzNamesSet = Sets.newHashSet();
for (Field field : fields) {
RpcConsumer annotation = field.getAnnotation(RpcConsumer.class);
if (null == annotation) {
continue;
}
Class<?> type = field.getType();
if (alreadyAddedClazzNamesSet.contains(type.getName())) {
continue;
}
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition();
AbstractBeanDefinition consumerBeanDefinition = builder.getBeanDefinition();
// 设置bean的class类型为FactoryBean
consumerBeanDefinition.setBeanClass(RpcConsumerFactoryBean.class);
// 构造参数传入接口类型,用于代理
consumerBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(type);
RpcMetaData clientMeta = RpcMetaData.of(annotation.group(), annotation.version(), type);
clientMeta.setClientTimeout(annotation.timeout());
clientMeta.setLoadBalanceStrategy(annotation.loadBalanceStrategy());
BeanMetadataAttribute attribute = new BeanMetadataAttribute(CommonConstants.BEAN_DEF_CONSUMER_META,
clientMeta);
consumerBeanDefinition.addMetadataAttribute(attribute);
// 添加beanDefinition
beanDefinitionRegistry.registerBeanDefinition(field.getName(), consumerBeanDefinition);
alreadyAddedClazzNamesSet.add(type.getName());
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
在代理中,可以获取到调用的接口,调用的参数,来做后续的处理
@Override
public Object invoke(Object obj, Method method, Object[] args) throws Throwable {
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(this, args);
}
// 获取服务对应的元数据唯一key
RpcMetaData consumerMeta = rpcContext.getConsumerMeta(method.getDeclaringClass().getName());
if (null == consumerMeta) {
throw new RuntimeException("consumer invoke failed, consumerMeta is null, clazz = " + method.getDeclaringClass().getName());
}
List<AddressInfo> addressInfoList = consumerMeta.getAddressList();
if (CollectionUtils.isEmpty(addressInfoList)) {
throw new RuntimeException("consumer invoke failed, addressList is empty, meta = " + JSONUtil.toJSONString(consumerMeta));
}
// 地址过滤器
List<String> addressList = addressInfoList.stream()
.map(AddressInfo::getAddress)
.collect(Collectors.toList());
// 负载均衡策略
String targetProviderAddress = loadBalance.getTargetProviderAddress(addressList, consumerMeta.getLoadBalanceStrategy(), args);
if (null == targetProviderAddress) {
if (addressList.size() == 0) {
throw new RuntimeException("no provider available");
}
targetProviderAddress = addressList.get(0);
}
// 构建request
InvokeMethod invokeMethod = assemblyInvokeMethod(method, args);
RemotingCommand request = buildRemotingCommand(method, invokeMethod, consumerMeta.getClientTimeout(), targetProviderAddress);
// 执行远程调用
RemotingCommand response = nettyRemoteClient.syncInvoke(targetProviderAddress,
request, invokeMethod, consumerMeta.getClientTimeout());
// 服务端执行异常
if (response.getCode() == RemotingCommandCode.SYSTEM_ERROR) {
throw new RuntimeException(response.getRemark());
}
// 读取服务端返回结果
if (null == response) {
return null;
}
// response转换成对象
String body = (String) BytesUtil.bytesToObject(response.getBody());
Object res = RemotingSerializableUtil.fromJson(body, method.getReturnType());
return res;
}
(1)动态地址路由(feature)
服务维度配置路由规则,使用groovy脚本实现,脚本存储在configserver
入参:args
出参:ip、environment、group
(2)负载均衡:src/main/java/com/mola/rpc/core/strategy/balance
- 随机负载均衡
- round robin
- 一致性hash算法
#后代表虚拟节点,默认为每个地址分配10个虚拟节点
对request进行json化,进行hash运算,取第一个hash值大于其的虚拟节点
具体实现使用treemap:
public class ConsistencyHashingBalance implements LoadBalanceStrategy, AddressChangeListener {
/**
* 总虚拟节点个数
*/
private static final int TOTAL_VIRTUAL_NODE_NUM = 1000;
@Override
public String getTargetProviderAddress(RpcMetaData consumerMeta, Object[] args) {
SortedMap<Integer, String> virtualAddressNodeMap = consumerMeta.getVirtualAddressNodeMap();
if (null == virtualAddressNodeMap || virtualAddressNodeMap.size() == 0) {
throw new RuntimeException("virtualAddressNodeMap is empty");
}
int hash = getHash(JSONUtil.toJSONString(args));
SortedMap<Integer, String> tailMap = virtualAddressNodeMap.tailMap(hash);
if (tailMap.size() == 0) {
return getAddressFromVirtualNode(virtualAddressNodeMap.get(virtualAddressNodeMap.firstKey()));
}
return getAddressFromVirtualNode(tailMap.get(tailMap.firstKey()));
}
/**
* 截取虚拟节点,获取真实地址
* @param virtualAddressNode
* @return
*/
private String getAddressFromVirtualNode(String virtualAddressNode) {
String[] split = StringUtils.split(virtualAddressNode, "#");
if (split.length == 2) {
return split[0];
}
throw new RuntimeException("getAddressFromVirtualNode failed");
}
/**
* 重建hash索引
* @param consumerMeta
*/
public void rebuildHash(RpcMetaData consumerMeta) {
synchronized (consumerMeta) {
List<String> addressList = consumerMeta.getAddressList().stream().map(AddressInfo::getAddress).collect(Collectors.toList());
SortedMap<Integer, String> virtualAddressNodeMap = new TreeMap<>();
int totalNodeNum = TOTAL_VIRTUAL_NODE_NUM / addressList.size();
for (String address : addressList) {
for (int i = 0; i < totalNodeNum; i++) {
String virtualAddress = String.format("%s#%s", address, UUID.randomUUID());
virtualAddressNodeMap.put(getHash(virtualAddress), virtualAddress);
}
}
consumerMeta.setVirtualAddressNodeMap(virtualAddressNodeMap);
}
}
/**
* FNV1_32_HASH算法
* @param str
* @return
*/
private static int getHash(String str) {
final int p = 16777619;
int hash = (int) 2166136261L;
for (int i = 0; i < str.length(); i++) {
hash = (hash ^ str.charAt(i)) * p;
}
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
//如果算出来的值为负数则取其绝对值
if (hash < 0) {
hash = Math.abs(hash);
}
return hash;
}
@Override
public void afterAddressChange(RpcMetaData consumerMetaData) {
if (LoadBalanceConstants.CONSISTENCY_HASHING_STRATEGY.equals(consumerMetaData.getLoadBalanceStrategy())) {
this.rebuildHash(consumerMetaData);
}
}
}
采用基于reactor模式的netty框架实现,客户端使用worker-pipeline线程模型,服务端使用boss-worker-pipeline-biz线程模型
tcp连接存在分包粘包的问题,所以需要通过报文头(header)和报文体(body)划分数据报
(1)netty frame length
四字节,32位整型,标记数据帧的长度,让netty基于LengthFieldBasedFrameDecoder进行数据帧的划分
(2)message header length
四字节,32位整型,标记header body长度,用于读取buffer中的header body
(3)header body(frame length - 8 - header length)
json序列化的header体,包括请求唯一序号,请求状态码,附加信息等
(4)header body
hessian序列化的InvokeMethod对象,InvokeMethod是jvm维度执行方法的抽象,具有可序列化传输,可单独在jvm中执行的功能
public class InvokeMethod {
/** 调用方法 */
private String methodName;
/** 参数类型 */
private String[] parameterTypes;
/** 参数 */
private String[] arguments;
/** 返回类型 */
private String returnType;
/**
* 调用接口名称
*/
private String interfaceClazz;
public Object invoke(Object providerBean) {
try {
AssertUtil.notNull(providerBean, "providerBean is null, name = " + providerBean);
// 1、反序列化类型
Class<?>[] paramTypes = new Class<?>[this.parameterTypes.length];
for (int i = 0; i < this.parameterTypes.length; i++) {
paramTypes[i] = Class.forName(this.parameterTypes[i]);
}
// 2、反序列化参数tongshi
Object[] args = new Object[this.arguments.length];
for (int i = 0; i < this.arguments.length; i++) {
args[i] = RemotingSerializableUtil.fromJson(this.arguments[i], paramTypes[i]);
}
Method method = providerBean.getClass().getMethod(this.methodName, paramTypes);
// 3、反射调用provider
return method.invoke(providerBean, args);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
(1)同步调用
consumer中看似同步的调用其实是异步的网络请求,但是需要给用户看上去是同步的调用(非用户使用异步的场景下),发送请求的同时,需要阻塞线程等待返回
在客户端发送请求的同时,生成一个ResponseFuture,并以opaqueId为key存储在map中
/**
* 缓存对外所有请求
* opaque -> ResponseFuture
*/
protected final Map<Integer, ResponseFuture> responseMap = new ConcurrentHashMap<>(256);
ResponseFuture内部实现为发令枪,当发送请求后,调用waitResponse方法,传入超时时间,阻塞当前线程
caiyong当服务端返回时,取到opaqueId,取出客户端缓存的ResponseFuture,调用putResponse,让线程继续执行
private final CountDownLatch countDownLatch = new CountDownLatch(1);
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
public void putResponse(final RemotingCommand responseCommand) {
this.responseCommand = responseCommand;
this.countDownLatch.countDown();
}
(2)异步调用
实现方式,在@RpcConsumer上配置asyncMethods
@RpcConsumer(loadBalanceStrategy = LoadBalanceConstants.LOAD_BALANCE_ROUND_ROBIN_STRATEGY, group = "gray", asyncMethods = {"queryOrderList"})
private OrderService orderServiceGray;
// 异步回调
Async.from(orderServiceGray.queryOrderList(orderId))
.consume(list -> {
System.out.println(Thread.currentThread());
System.out.println(JSONUtil.toJSONString(list));
});
// 异步转同步
Async<List<Order>> async = Async.from(orderServiceGray.queryOrderList(orderId));
Thread.sleep(3000);
//do othder something
List<Order> orders = async.get();
异步调用做法是将ResponseFuture放入threadlocal上下文中,交给异步组件Async处理,交给async的好处是相比于hsf和dubbo可以实现较美观的范式
dubbo:
Result<ProductVO> result = productService.findProduct(dto);//dubbo异步调用,此时输出result是null
Future<Object> future = RpcContext.getContext().getFuture();//获取异步执行结果Future
//do othder something
try {
result = (Result<ProductVO>) future.get();//获取具体的异步执行结果
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return result;
my-rpc:
// 1、异步回调
Async.from(orderServiceGray.queryOrderList(orderId))
.consume(list -> {
System.out.println(Thread.currentThread());
System.out.println(JSONUtil.toJSONString(list));
});
// 2、异步转同步
Async<List<Order>> async = Async.from(orderServiceGray.queryOrderList(orderId));
Thread.sleep(3000);
//do othder something
List<Order> orders = async.get();
public class Async<T> {
private static final ThreadLocal<AsyncResponseFuture> asyncFutureThreadLocal = new ThreadLocal<>();
/**
* 生成异步器
* @param result
* @param <T>
* @return
*/
public static <T> Async<T> from(T result) {
if (null != result) {
throw new RuntimeException("please check if this method is an async method!");
}
return new Async();
}
public static void addFuture(AsyncResponseFuture responseFuture) {
asyncFutureThreadLocal.set(responseFuture);
}
/**
* 注册监听器
* @param consumer
*/
public void register(Consumer<T> consumer) {
try {
AsyncResponseFuture<T> responseFuture = asyncFutureThreadLocal.get();
AssertUtil.notNull(responseFuture, "responseFuture is null");
responseFuture.setConsumer(consumer);
} finally {
asyncFutureThreadLocal.remove();
}
}
public T sync(long timeout) {
try {
AsyncResponseFuture<T> responseFuture = asyncFutureThreadLocal.get();
AssertUtil.notNull(responseFuture, "responseFuture is null");
return responseFuture.get(timeout);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
asyncFutureThreadLocal.remove();
}
}
}
provider采用boss-worker-pipeline-biz线程模型,最终业务逻辑的执行放在biz线程池,针对单个服务流量特别大的情况,可以自己定制线程池,我们常说的线程池满也就是业务线程池溢出
协程不同于线程需要靠jvm进行系统调用进行切换,可以在用户态实现异步调用,避免线程切换带来的开销,并且能够调度更多的任务,JDK19已经在虚拟机层面支持协程。
@RpcProvider(interfaceClazz = OrderService.class, inFiber = true)
public class OrderServiceImpl implements OrderService {
@Resource
private RpcProperties rpcProperties;
@Override
public List<Order> queryOrderList(String code, List<String> idList) {
List<Order> orders = new ArrayList<>();
for (String s : idList) {
Order order = new Order();
order.setCode("UM1111111");
order.setId(s);
order.setDesc(NetUtils.getLocalAddress().getHostAddress() + ":" + rpcProperties.getServerPort());
orders.add(order);
}
return orders;
}
}
provider开启协程调用后,可以获得更高的吞吐量,缺点是线程相关的内容将无法支持
jdk8协程框架采用quasar支持,jdk19采用虚拟线程支持