Vert.x Eventbus

最后更新:2019-03-14

Eventbus

Vert.x提供类三种类型的事件总线

  • 发布/订阅
  • 点对点
  • 请求/响应

Vert.x在创建Vertx对象时会创建一个Eventbus对象,我们先看EventBusImpl(单机版的Eventbus)的实现

  private void createAndStartEventBus(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
    if (options.isClustered()) {
      eventBus = new ClusteredEventBus(this, options, clusterManager, haManager);
    } else {
      eventBus = new EventBusImpl(this);
    }
    eventBus.start(ar2 -> {
      if (ar2.succeeded()) {
	// If the metric provider wants to use the event bus, it cannot use it in its constructor as the event bus
	// may not be initialized yet. We invokes the eventBusInitialized so it can starts using the event bus.
	metrics.eventBusInitialized(eventBus);

	if (resultHandler != null) {
	  resultHandler.handle(Future.succeededFuture(this));
	}
      } else {
	log.error("Failed to start event bus", ar2.cause());
      }
    });
  }

在创建EventBus之后,会通过EventBus的start方法注册一个回调函数,在EventBus真正启动成功之后会执行这个回调函数.

  public synchronized void start(Handler<AsyncResult<Void>> completionHandler) {
    if (started) {
      throw new IllegalStateException("Already started");
    }
    started = true;
    completionHandler.handle(Future.succeededFuture());
  }

Message接口

Message接口用于定义Eventbus中传递的消息.它主要有下列属性: address:事件发送的地址 replyAddress:事件回应的地址 headers:消息头,通常包含一些公共参数,例如再service-proxy组件中可以将调用对方法放在消息头中 body:消息体,它可以是任意类型的对象.Vertx.x默认仅支持基础类型,JsonObject,JsonArray这几种类型.如果需要支持其他类型的对象需要实现该对象的编解码类并注册到eventbus中.

MessageImpl是Message的实现类,里面主要包括3个参数,sentBody,receivedBody,messageCodec。其中receivedBody是通过messageCodec.transfrom(sentBody)得到 message的reply方法会调用eventbus的sendReply方法发送响应数据

发送事件

EventBus的publish(…)方法和send(…)用于发送事件,两种直接对区别是send方法可以注册一个回调函数用于处理事件消费者发回的响应. 这些方法最终会调用createMessage方法来创建一个Message对象.

  protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
    Objects.requireNonNull(address, "no null address accepted");
    MessageCodec codec = codecManager.lookupCodec(body, codecName);
    @SuppressWarnings("unchecked")
    MessageImpl msg = new MessageImpl(address, null, headers, body, codec, send, this);
    return msg;
  }   该方法首先要从CodecManager中找到对应对Codec对象.然后创建一个Message对象.

再创建完Message之后, 如果回调函数replyHandler不为null,createReplyHandlerRegistration方法会创建一个HandlerRegistration对象将回调函数与回应地址绑定.回应地址是一个自增长的序列.

处理事件

EventBus的consumer方法可以为事件注册一个处理类.它首先会创建一个MessageConsumer对象,然后将处理类注册到该对象

CodecManager

HandlerRegistration

SendContextImpl

a

Edgar

Edgar
一个略懂Java的小菜比