1. 服务提供者
public interface EchoService {
String echo(String ping);
}
服务提供者实现
public class EchoServiceImpl implements EchoService {
@Override
public String echo(String ping) {
return "pong : " + ping;
}
}
使用BIO创建Socket服务端,服务消费者的每个请求都创建一个线程处理,序列化反序列化使用Java的默认序列化方案
public class RpcExporter {
private static Executor executor =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public static void export(String host, int port) throws IOException {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(host, port));
while (true) {
executor.execute(new ExporterTask(serverSocket.accept()));
}
}
}
public class ExporterTask implements Runnable {
private final Socket socket;
public ExporterTask(Socket socket) {
this.socket = socket;
}
public Object load(Class<?> serviceClass) {
return ServiceLoader.load(serviceClass).iterator().next();
}
@Override
public void run() {
ObjectInputStream is = null;
ObjectOutputStream os = null;
try {
is = new ObjectInputStream(socket.getInputStream());
String interfaceName = is.readUTF();
Class<?> service = Class.forName(interfaceName);
String methodName = is.readUTF();
Class<?>[] parameterTypes = (Class<?>[]) is.readObject();
Object[] args = (Object[]) is.readObject();
Method method = service.getMethod(methodName, parameterTypes);
Object result = method.invoke(load(service), args);
os = new ObjectOutputStream(socket.getOutputStream());
os.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (os != null) {
try {
os.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (is != null) {
try {
is.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
启动程序
public class Main {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
RpcExporter.export("localhost", 8080);
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
2. 服务消费者
使用动态代理封装socket细节
public class EchoInvocationHandler implements InvocationHandler {
private final Class<?> serviceClass;
private final InetSocketAddress addr;
public EchoInvocationHandler(Class<?> serviceClass, InetSocketAddress addr) {
this.serviceClass = serviceClass;
this.addr = addr;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = null;
ObjectOutputStream os = null;
ObjectInputStream is = null;
try {
socket = new Socket();
socket.connect(addr);
os = new ObjectOutputStream(socket.getOutputStream());
os.writeUTF(serviceClass.getName());
os.writeUTF(method.getName());
os.writeObject(method.getParameterTypes());
os.writeObject(args);
is = new ObjectInputStream(socket.getInputStream());
return is.readObject();
} finally {
if (socket != null) {
socket.close();
}
if (os != null) {
os.close();
}
if (is != null) {
is.close();
}
}
}
RPC调用
public class RpcImporter<S> {
public S importer(final Class<?> serviceClass, final InetSocketAddress addr) {
return (S) Proxy.newProxyInstance(serviceClass.getClassLoader(),
new Class[]{serviceClass},
new EchoInvocationHandler(serviceClass,addr));
}
}
public class RpcTest {
public static void main(String[] args) {
EchoService echoService = (EchoService) new RpcImporter().importer(EchoService.class, new InetSocketAddress("localhost", 8080));
System.out.println(echoService.echo("edgar"));
}
}