基于spring-boot和akka的集成
- 将ActorSystem纳入spring的管辖范围,能够在代码中自动注入ActorSystem
- 支持Remote Actor的自动创建
##如何使用
- 在Spring Boot项目的pom.xml中添加以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-akka</artifactId>
<version>1.3.1.RELEASE</version>
</dependency>
- 在application.properties添加Akka Actor的配置如下:
spring.akka.systemName=ClientActorSystem
spring.akka.config=client.conf
spring.akka.actorBeanClass=com.alibaba.akka.TestActor
spring.akka.actorName=ClientHandler
- spring boot启动及编写Remote Actor
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(Application.class);
application.run(args);
}
package com.alibaba.akka;
import com.alibaba.boot.akka.ActorBean;
import akka.actor.UntypedActor;
@ActorBean
public class TestActor extends UntypedActor {
public void onReceive(Object arg0) throws Exception {
System.out.println(arg0);
}
}
启动容器后,就能够使用spring-boot的autoconfig功能,会自动创建ActorSystem及暴露远程Actor(TestActor)
- 如何与远程Actor连接
public class PingClientSystemMain {
public static void main(String[] args) throws InterruptedException {
final ActorSystem system = ActorSystem.create("PingLookupSystem", ConfigFactory.load("pingRemoteLookup"));
final ActorRef actor = system.actorOf(Props.create(PingLookupActor.class,
"akka.tcp://ClientActorSystem@127.0.0.1:2552/user/ClientHandler"),
"PingLookupActor");
TimeUnit.SECONDS.sleep(5);
for (int i = 0; i < 1000000; i++) {
actor.tell(TaskProtos.Ping.newBuilder().setId(UUID.randomUUID().toString()).setNow(System.currentTimeMillis()).build(),
ActorRef.noSender());
}
}
}
public class PingLookupActor extends UntypedActor {
private final String path;
private ActorRef calculator = null;
public PingLookupActor(String path){
this.path = path;
sendIdentifyRequest();
}
private void sendIdentifyRequest() {
getContext().actorSelection(path).tell(new Identify(path), getSelf());
getContext().system().scheduler().scheduleOnce(Duration.create(3, TimeUnit.SECONDS), getSelf(),
ReceiveTimeout.getInstance(), getContext().dispatcher(),
getSelf());
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof ActorIdentity) {
calculator = ((ActorIdentity) message).getRef();
if (calculator == null) {
System.out.println("Remote actor not available: " + path);
} else {
getContext().watch(calculator);
getContext().become(active, true);
}
} else if (message instanceof ReceiveTimeout) {
sendIdentifyRequest();
} else {
System.out.println("Not ready yet");
}
}
Procedure<Object> active = message -> {
if (message instanceof TaskProtos.Ping) {
TaskProtos.Ping request = (TaskProtos.Ping) message;
calculator.tell(message, getSelf());
} else if (message instanceof TaskProtos.PingResponse) {
TaskProtos.PingResponse result = (TaskProtos.PingResponse) message;
System.out.println(result.toBuilder().toString());
} else if (message instanceof Terminated) {
System.out.println("Calculator terminated");
sendIdentifyRequest();
getContext().unbecome();
} else if (message instanceof ReceiveTimeout) {
} else {
unhandled(message);
}
};
}
##关于spring-boot开发知识 spring-boot学习笔记