Vert.x的服务发现模块

最后更新:2019-03-14

前部分内容基本来自于官方文档

Vert.x提供了服务发现组件,每个服务都被封装为一个Record对象

服务提供者:

  • 发布一个服务
  • 注销一个服务
  • 更新已发布的服务

服务消费者:

  • 查询服务
  • 使用选定的服务
  • 一旦用户用完服务,释放服务
  • 监听服务的发布、注销和修改

核心概念

Service records 服务记录

用来描述服务提供者发布的服务的对象。它包含名称,元数据,地址(描述服务在哪里)等属性

Service Provider and publisher 服务提供者和发布者

用来发布或注销一个服务

Service Consumer 服务消费者

服务消费者在服务发现中查找服务(可能返回0个或多个服务记录)。从这些记录中,消费者可以获得ServiceReference对象,用来表示消费者和提供者之间的绑定。这个对象允许消费者使用该服务,并释放服务。需要注意释放服务引用来清理对象和更新服务

Service object 服务对象

服务对象是提供访问服务的对象。它可以以各种形式出现,如代理、客户端,甚至有些不存在的服务类型。服务对象的性质取决于服务类型。

Service types 服务类型

服务可以是各种类型的服务,比如功能性服务,数据库,REST API等。每个服务类型定义了: ​

  • 这个服务的地址(URL,事件总线地址,IP/DNS)
  • 服务对象的性质(服务代理,HTTP客户端,消息消费者)

某些服务类型由服务发现组件实现并提供,我们也可以添加自己的服务类型。

Service events 服务事件

每次发布或注销服务时,总会在事件总线上触发一个announce事件。这个事件包括已经修改的服务记录。此外为了跟踪谁在使用服务,每次ServiceReference的绑定和释放都会触发一个usage事件

Backend

服务发现使用分布式结构存储服务记录,因此群集的所有成员都可以访问所有记录。也可以实现你自己的servicediscoverybackend SPI来存储服务记录.

使用

创建一个服务发现实例


	ServiceDiscovery discovery = ServiceDiscovery.create(vertx);

使用默认配置创建了一个服务发现实例,发布和注销服务时触发的事件地址为vertx.discovery.announce,绑定和释放ServiceReference时触发的事件地址为vertx.discovery.usage

我们可以使用自定义的配置


    ServiceDiscovery discovery = ServiceDiscovery.create(vertx,
                new ServiceDiscoveryOptions()
                        .setAnnounceAddress("service-announce")
                        .setName("my-name"));

Vertx.官方目前提供了下面几种类型的服务发现(ServiceType)

  • HttpEndpoint restAPI
  • EventBusService 服务代理
  • MessageSource 消息
  • JDBCDataSource JDBC

发布服务

发布服务的过程如下:

  • 为服务提供者创建服务记录(Record)
  • 发布这个服务记录
  • 保存已发布的服务记录,用来注销或者修改服务

    Record record = new Record()
            .setType("eventbus-service-proxy")
            .setLocation(new JsonObject().put("endpoint", "the-service-address"))
            .setName("my-service")
            .setMetadata(new JsonObject().put("some-label", "some-value"));
    discovery.publish(record, ar -> {
      if (ar.succeeded()) {
        Record publishedRecord = ar.result();
        System.out.println(record.getRegistration());
        System.out.println(publishedRecord.getLocation());
        System.out.println(publishedRecord.getType());
        System.out.println(publishedRecord.getMetadata());
        System.out.println(publishedRecord.getName());
      } else {

      }
    });

服务发布之后,会给服务记录生成一个唯一的ID(registration)

Vert.x也提供了一下基本服务类型用于创建服务记录(后面再描述)

注销服务

在服务发布之后,可以使用registration将服务注销


	discovery.unpublish(record.getRegistration(), ar -> {
	  if (ar.succeeded()) {
	    // Ok
	  } else {
	    // cannot un-publish the service, may have already been removed, or the record is not published
	  }
	});

查找服务

服务消费者可以通过getRecord搜索单个服务记录,或者通过getRecords搜索所有匹配的服务记录。

服务消费者可以通过一个过滤器来匹配服务,过滤器有两种:

  • Function<Record, Boolean> filter,以Record为参数,返回一个bool的函数
  • JsonObject,对JsonObject中的每个属性检查,必须所有的属性都匹配的记录才是要搜索的服务记录。

  • { “name” = “a” } => 匹配所有name属性为a的记录
  • { “color” = “*” } => 匹配所有设置了color属性的记录
  • { “color” = “red” } => 匹配所有color属性为red的记录

搜索全部记录


    discovery.getRecords(r -> true, ar -> {
        List<Record> records = ar.result();
        for (Record record : records) {
        System.out.println(record.getName() + ":" + record.getMetadata());
        }
    });

或者


    discovery.getRecords((JsonObject) null, ar -> {
        List<Record> records = ar.result();
        for (Record record : records) {
            System.out.println(record.getName() + ":" + record.getMetadata());
        }
    });

输出

	some-rest-api4:{"num":1}
	some-rest-api3:{"color":"white"}
	some-rest-api1:{}
	some-rest-api2:{"color":"red"}

搜索所有http类型的记录


    discovery.getRecords(r -> r.getType().equals("http-endpoint"), ar -> {
        List<Record> records = ar.result();
        for (Record record : records) {
            System.out.println(record.getName() + ":" + record.getMetadata());
        }
    });

或者


    discovery.getRecords(new JsonObject().put("type", "http-endpoint"), ar -> {
        List<Record> records = ar.result();
        for (Record record : records) {
            System.out.println(record.getName() + ":" + record.getMetadata());
        }
    });

输出

	some-rest-api4:{"num":1}
	some-rest-api3:{"color":"white"}
	some-rest-api1:{}
	some-rest-api2:{"color":"red"}

搜索所有名称为some-rest-api1的记录


    discovery.getRecords(r -> r.getName().equals("some-rest-api1"), ar -> {
        List<Record> records = ar.result();
        System.out.println("some-rest-api1");
        for (Record record : records) {
            System.out.println("-" + record.getName());
        }
    });

    discovery.getRecords(new JsonObject().put("name", "some-rest-api1"), ar -> {
        List<Record> records = ar.result();
        for (Record record : records) {
            System.out.println(record.getName() + ":" + record.getMetadata());
        }
    });

搜索所有color=red的记录


    discovery.getRecords(r -> "red".equals(r.getMetadata().getString("color")), ar -> {
        List<Record> records = ar.result();
        for (Record record : records) {
            System.out.println(record.getName() + ":" + record.getMetadata());
        }
    });

    discovery.getRecords(new JsonObject().put("color", "red"), ar -> {
        List<Record> records = ar.result();
        for (Record record : records) {
            System.out.println(record.getName() + ":" + record.getMetadata());
        }
    });

搜索所有包含color属性的记录


    discovery.getRecords(r -> r.getMetadata().containsKey("color"), ar -> {
        List<Record> records = ar.result();
        for (Record record : records) {
            System.out.println(record.getName() + ":" + record.getMetadata());
        }
    });

    discovery.getRecords(new JsonObject().put("color", "*"), ar -> {
        List<Record> records = ar.result();
        for (Record record : records) {
            System.out.println(record.getName() + ":" + record.getMetadata());
        }
    });

只搜索单个记录


    discovery.getRecord(r -> true, ar -> {
        Record record = ar.result();
        System.out.println(record.getName() + ":" + record.getMetadata());
    });

getRecords和getRecord默认只会查询status=UP的记录,如果需要查询其他状态的记录,有两种方式:

  • 使用JsonObject过滤status属性,可以指定某个具体的status或者使用*表示全部
  • 使用Function过滤器,并将includeOutOfService参数设为true

获取服务引用

一旦你获取到了一个服务记录,就可以获取到一个ServiceReference以及服务对象。 ServiceReference用来表示与服务提供者之间的绑定关系


    ServiceReference serviceReference = discovery.getReference(record);
    HttpClient httpClient = serviceReference.get();
    // You need to path the complete path
    httpClient.getNow(record.getLocation().getString("root"), response -> {
      // Dont' forget to release the service
      serviceReference.release();
    });

一旦使用完ServiceReference,需要释放它.

我们也可以用discovery.getReferenceWithConfiguration来配置服务对象

Vertx.官方目前提供了下面几种类型的服务发现(ServiceType)

  • HttpEndpoint restAPI,对应的服务对象为HttpClient
  • EventBusService 异步RPC服务代理,对应的服务对象为对应的服务接口
  • MessageSource 消息,对应的服务对象为MessageConsumer
  • JDBCDataSource JDBC,对应的服务对象为JDBCClient

Vert.x还定义了一种unknown类型的服务类型,但这种类型无法获取对应的服务引用。我们可以通过服务记录中的location和metadata属性类创建服务对象。这些服务对象的使用不会触发usage事件. 当然,我们也可以实现自定义的服务类型(后面描述)

HTTP endpoints

发布


	Record record1 = HttpEndpoint.createRecord(
	    "some-http-service", // The service name
	    "localhost", // The host
	    8433, // the port
	    "/api" // the root of the service
	);

	discovery.publish(record1, ar -> {
	  // ...
	});
	
	Record record2 = HttpEndpoint.createRecord(
	    "some-other-name", // the service name
	    true, // whether or not the service requires HTTPs
	    "localhost", // The host
	    8433, // the port
	    "/api", // the root of the service
	    new JsonObject().put("some-metadata", "some value")
	);

消费


	discovery.getRecord(new JsonObject().put("name", "some-http-service"), ar -> {
	  if (ar.succeeded()  && ar.result() != null) {
	    // Retrieve the service reference
	    ServiceReference reference = discovery.getReference(ar.result());
	    // Retrieve the service object
	    HttpClient client = reference.get();

	    // You need to path the complete path
	    client.getNow("/api/persons", response -> {
	
	      // ...
	
	      // Dont' forget to release the service
	      reference.release();
	
	    });
	  }
	});

或者


	HttpEndpoint.getClient(discovery, new JsonObject().put("name", "some-http-service"), ar -> {
	  if (ar.succeeded()) {
	    HttpClient client = ar.result();

	    // You need to path the complete path
	    client.getNow("/api/persons", response -> {
	
	      // ...
	
	      // Dont' forget to release the service
	      ServiceDiscovery.releaseServiceObject(discovery, client);
	
	    });
	  }
	});

Event bus services

发布


	Record record = EventBusService.createRecord(
	    "some-eventbus-service", // The service name
	    "address", // the service address,
	    "examples.MyService", // the service interface as string
	    new JsonObject()
	        .put("some-metadata", "some value")
	);

	discovery.publish(record, ar -> {
	  // ...
	});

也可以直接使用接口的class


	Record record = EventBusService.createRecord(
        "some-eventbus-service", // The service name
        "address", // the service address,
        MyService.class // the service interface
        );

	discovery.publish(record, ar -> {
	// ...
	});

消费


	discovery.getRecord(new JsonObject().put("name", "some-eventbus-service"), ar -> {
	if (ar.succeeded() && ar.result() != null) {
        // Retrieve the service reference
        ServiceReference reference = discovery.getReference(ar.result());
        // Retrieve the service object
        MyService service = reference.get();

        // Dont' forget to release the service
        reference.release();
    }
    });

Message source

发布


	Record record = MessageSource.createRecord(
	    "some-message-source-service", // The service name
	    "some-address" // The event bus address
	);

	discovery.publish(record, ar -> {
	  // ...
	});
	
	record = MessageSource.createRecord(
	    "some-other-message-source-service", // The service name
	    "some-address", // The event bus address
	    "examples.MyData" // The payload type
	);

也可以直接使用Payload的class


	Record record1 = MessageSource.createRecord(
        "some-message-source-service", // The service name
        "some-address", // The event bus address
        JsonObject.class // The message payload type
        );

	Record record2 = MessageSource.createRecord(
	    "some-other-message-source-service", // The service name
	    "some-address", // The event bus address
	    JsonObject.class, // The message payload type
	    new JsonObject().put("some-metadata", "some value")
	);

消费


	discovery.getRecord(new JsonObject().put("name", "some-message-source-service"), ar -> {
	  if (ar.succeeded() && ar.result() != null) {
	    // Retrieve the service reference
	    ServiceReference reference = discovery.getReference(ar.result());
	    // Retrieve the service object
	    MessageConsumer<JsonObject> consumer = reference.get();

	    // Attach a message handler on it
	    consumer.handler(message -> {
	      // message handler
	      JsonObject payload = message.body();
	    });
	
	    // ...
	    // when done
	    reference.release();
	  }
	});

或者


	MessageSource.<JsonObject>getConsumer(discovery, new JsonObject().put("name", "some-message-source-service"), ar -> {
	  if (ar.succeeded()) {
	    MessageConsumer<JsonObject> consumer = ar.result();

	    // Attach a message handler on it
	    consumer.handler(message -> {
	      // message handler
	      JsonObject payload = message.body();
	    });
	    // ...
	
	    // Dont' forget to release the service
	    ServiceDiscovery.releaseServiceObject(discovery, consumer);
	
	  }
	});

JDBC Data source

发布


	Record record = JDBCDataSource.createRecord(
	    "some-data-source-service", // The service name
	    new JsonObject().put("url", "some jdbc url"), // The location
	    new JsonObject().put("some-metadata", "some-value") // Some metadata
	);

	discovery.publish(record, ar -> {
	  // ...
	});

消费


	discovery.getRecord(
	    new JsonObject().put("name", "some-data-source-service"),
	    ar -> {
	      if (ar.succeeded() && ar.result() != null) {
	        // Retrieve the service reference
	        ServiceReference reference = discovery.getReferenceWithConfiguration(
	            ar.result(), // The record
	            new JsonObject().put("username", "clement").put("password", "*****")); // Some additional metadata

	        // Retrieve the service object
	        JDBCClient client = reference.get();
	
	        // ...
	
	        // when done
	        reference.release();
	      }
	    });

或者


	JDBCDataSource.<JsonObject>getJDBCClient(discovery,
	    new JsonObject().put("name", "some-data-source-service"),
	    new JsonObject().put("username", "clement").put("password", "*****"), // Some additional metadata
	    ar -> {
	      if (ar.succeeded()) {
	        JDBCClient client = ar.result();

	        // ...
	
	        // Dont' forget to release the service
	        ServiceDiscovery.releaseServiceObject(discovery, client);
	
	      }
	    });

Redis Data source

发布


	Record record = RedisDataSource.createRecord(
	  "some-redis-data-source-service", // The service name
	  new JsonObject().put("url", "localhost"), // The location
	  new JsonObject().put("some-metadata", "some-value") // Some metadata
	);

	discovery.publish(record, ar -> {
	  // ...
	});

消费


	discovery.getRecord(
	  new JsonObject().put("name", "some-redis-data-source-service"), ar -> {
	    if (ar.succeeded() && ar.result() != null) {
	      // Retrieve the service reference
	      ServiceReference reference = discovery.getReference(ar.result());

	      // Retrieve the service instance
	      RedisClient client = reference.get();
	
	      // ...
	
	      // when done
	      reference.release();
	    }
	  });

或者


	RedisDataSource.getRedisClient(discovery,
	  new JsonObject().put("name", "some-redis-data-source-service"),
	  ar -> {
	    if (ar.succeeded()) {
	      RedisClient client = ar.result();

	      // ...
	
	      // Dont' forget to release the service
	      ServiceDiscovery.releaseServiceObject(discovery, client);
	
	    }
	  });

监听服务的发布和注销

每次服务提供者发布或者注销,都会有一个事件被发布到vertx.discovery.announce地址上(这个地址可以通过ServiceDiscoveryOptions修改)

事件中收到的状态属性表示服务记录的新状态

  • UP : 服务可用,可以使用这个服务
  • DOWN : 服务不可用,不应该再使用这个服务
  • OUT_OF_SERVICE : 服务还没有启动,不应该使用这个服务,但是可能过一段事件之后服务可用

监听服务的使用

每次一个服务引用被获取(绑定)或者释放,都会有一个事件被发布到vertx.discovery.usage地址上(这个地址可以通过ServiceDiscoveryOptions修改)

事件的JsonObject包括

  • record 服务记录
  • type 事件类型 bind或者release
  • ID 服务发现的ID,它可能是ServiceDiscovery的名字或者的节点的ID,可以通过ServiceDiscoveryOptions指定,在单机模式下默认为localhost,在集群模式下是该节点在集群下的ID

也可以将usage的地址设为null来关闭usage事件的触发

示例


    ServiceDiscovery discovery = ServiceDiscovery.create(vertx);
    vertx.eventBus().consumer("vertx.discovery.announce", msg -> {
      System.out.println("announce: " + msg.body());
    });

    vertx.eventBus().consumer("vertx.discovery.usage", msg -> {
      System.out.println("usage: " + msg.body());
    });
    
    vertx.setPeriodic(5000, l -> {
      discovery.getRecords(r -> true, ar -> {
        List<Record> records = ar.result();
        for (Record record : records) {
          ServiceReference serviceReference = discovery.getReference(record);
          HttpClient httpClient = serviceReference.get();
          serviceReference.release();
        }
      });
    });
    discovery.close();

输出

	announce: {"location":{"endpoint":"http://localhost:8080/api","host":"localhost","port":8080,"root":"/api","ssl":false},"metadata":{},"name":"some-rest-api","status":"UP","type":"http-endpoint"}
	announce: {"location":{"host":"localhost","endpoint":"http://localhost:8080/api","port":8080,"ssl":false,"root":"/api"},"metadata":{},"name":"some-rest-api","status":"DOWN","type":"http-endpoint"}
	usage: {"type":"bind","record":{"name":"some-rest-api","location":{"host":"localhost","endpoint":"http://localhost:8080/api","port":8080,"ssl":false,"root":"/api"},"metadata":{},"registration":"214bafff-2da9-4c26-b9d9-bc31f08febaa","type":"http-endpoint","status":"UP"},"id":"cfc09ae8-6a03-438c-962f-bb7b4ff9b4ae"}
	usage: {"type":"release","record":{"name":"some-rest-api","location":{"host":"localhost","endpoint":"http://localhost:8080/api","port":8080,"ssl":false,"root":"/api"},"metadata":{},"registration":"214bafff-2da9-4c26-b9d9-bc31f08febaa","type":"http-endpoint","status":"UP"},"id":"cfc09ae8-6a03-438c-962f-bb7b4ff9b4ae"}

Service discovery bridges

通过Bridges可以从其他服务发现组件上(Docker, Kubernates, Consul,Zookeeper)导入导出服务提供者。我们只需要实现ServiceImporter接口来定义自己的Bridges,并通过ServiceDiscovery.registerServiceImporter方法注册Bridges

当Bridges被注册之后,start会被执行,在导入(或导出)服务记录之后,必须将future设置为完成。 如果start方法是阻塞方法,必须使用executeBlocking来运行。

当ServiceDiscovery停止时,Bridges也被停止,stop方法会被调用用来清理对应的资源,删除导入(或导出)的服务记录。同样,在执行完毕之后不需要将future设置为完成

在集群模式下,只需要有一个节点注册bridge,所有的服务都会通过事件的方式广播给所有的成员

扩展

自定义服务类型

可以按照下面的步骤实现自定义的服务类型

  • (可选)创建一个继承ServiceType的接口.
  • 创建一个实现了上一步接口(或者ServiceType)的实现类. 这个实现类包括一个名字和对应ServiceReference的创建方法.
  • 创建一个继承了AbstractServiceReference的ServiceReference类,这个子类需要实现retrieve() 用于创建对应的服务对象.AbstractServiceReference会将创建的服务对象缓存,在release之前这个方法只会调用一次。如果服务对象需要一些清理操作,还需要重写close()方法
  • 在META-INF/services/io.vertx.ext.discovery.spi.ServiceType文件中加入自定义的ServiceType的实现类

自定义bridge

Vert.x官方的Zookeeper桥接尚未正式发布,下面是参考Consul实现的zookeeper桥接

创建一个辅助类,维护了Record和服务ID之间的关系,并且封装了服务的发布和注销方法


	public class ImportedZookeeperService {

	  private final String name;
	
	  private final Record record;
	
	  private final String id;
	
	  /**
	   * Creates a new instance of {@link ImportedZookeeperService}.
	   *
	   * @param name   the service name
	   * @param id     the service id, may be the name
	   * @param record the record (not yet registered)
	   */
	  public ImportedZookeeperService(String name, String id, Record record) {
	    Objects.requireNonNull(name);
	    Objects.requireNonNull(id);
	    Objects.requireNonNull(record);
	    this.name = name;
	    this.record = record;
	    this.id = id;
	  }
	
	  /**
	   * @return the name
	   */
	  public String name() {
	    return name;
	  }
	
	  /**
	   * Registers the service and completes the given future when done.
	   *
	   * @param publisher  the service publisher instance
	   * @param completion the completion future
	   * @return the current {@link ImportedZookeeperService}
	   */
	  public ImportedZookeeperService register(ServicePublisher publisher,
	                                           Future<ImportedZookeeperService> completion) {
	    publisher.publish(record, ar -> {
	      if (ar.succeeded()) {
	        record.setRegistration(ar.result().getRegistration());
	        completion.complete(this);
	      } else {
	        completion.fail(ar.cause());
	      }
	    });
	    return this;
	  }
	
	  /**
	   * Unregisters the service and completes the given future when done, if not {@code null}
	   *
	   * @param publiher   the service publisher instance
	   * @param completion the completion future
	   */
	  public void unregister(ServicePublisher publiher, Future<Void> completion) {
	    if (record.getRegistration() != null) {
	      publiher.unpublish(record.getRegistration(), ar -> {
	        if (ar.succeeded()) {
	          record.setRegistration(null);
	        }
	        if (completion != null) {
	          completion.complete();
	        }
	      });
	    } else {
	      if (completion != null) {
	        completion.fail("Record not published");
	      }
	    }
	  }
	
	  /**
	   * @return the id
	   */
	  String id() {
	    return id;
	  }
	}
  1. 实现Zookeeper的ServiceImporter

start方法会创建zookeeper的连接,并通过retrieveIndividualServices方法来从zookeeper中导入服务记录;


	  @Override
	  public void start(Vertx vertx, ServicePublisher publisher, JsonObject configuration,
	                    Future<Void> future) {
	    this.vertx = vertx;
	    this.publisher = publisher;
	    this.zkConnect = configuration.getString("zookeeper.connect", "localhost:2181");
	    this.basePath = configuration.getString("zookeeper.path", "/services");
	    this.sleepMsBetweenRetries = configuration.getInteger("zookeeper.retry.sleep", 1000);
	    this.retryTimes = configuration.getInteger("zookeeper.retry.times", 5);
	    // When the bridge is configured, ready and has imported / exported the initial services, it
	    // must complete the given Future. If the bridge starts method is blocking, it must use an
	    // executeBlocking construct, and complete the given future object
	    vertx.<Void>executeBlocking(
	            f -> {
	              try {
	                client = CuratorFrameworkFactory.newClient(zkConnect,
	                                                           new RetryNTimes(retryTimes,
	                                                                           sleepMsBetweenRetries));
	                client.start();

	                serviceDiscovery =
	                        ServiceDiscoveryBuilder.builder(String.class)
	                                .basePath(basePath)
	                                .watchInstances(true)
	                                .client(client).build();
	
	                serviceDiscovery.start();
	
	                cache = TreeCache.newBuilder(client, basePath).build();
	                cache.start();
	                cache.getListenable().addListener(this);
	
	                f.complete();
	              } catch (Exception e) {
	                future.fail(e);
	              }
	            },
	            ar -> {
	              if (ar.failed()) {
	                future.fail(ar.cause());
	              } else {
	                Future<Void> f = Future.future();
	                f.setHandler(x -> {
	                  if (x.failed()) {
	                    future.fail(x.cause());
	                  } else {
	                    started = true;
	                    future.complete();
	                  }
	                });
	                retrieveIndividualServices(f);
	              }
	            }
	    );
	  }

retrieveIndividualServices方法是服务导入的核心逻辑,它先从zookeeper中读取到所有的服务实例,然后将zookeeper中的服务实例发布,并将zookeeper中不存在的服务实例注销


	  private synchronized void retrieveIndividualServices(Future<Void> completed) {
	    List<ServiceInstance<String>> instances = new ArrayList<>();
	    try {
	      Collection<String> names = serviceDiscovery.queryForNames();
	      for (String name : names) {
	        instances.addAll(serviceDiscovery.queryForInstances(name));
	      }
	    } catch (KeeperException.NoNodeException e) {
	      // no services
	      // Continue
	    } catch (Exception e) {
	      if (completed != null) {
	        completed.fail(e);
	      } else {
	        LOGGER.error("Unable to retrieve service instances from Zookeeper", e);
	        return;
	      }
	    }
	    Future<List<ImportedZookeeperService>> future = Future.future();
	    importService(instances, future);
	    future.setHandler(ar -> {
	      if (ar.failed()) {
	        //completed.fail(ar.cause());
	        unregisterAllServices(completed);
	      } else {
	        List<ImportedZookeeperService> services = future.result();
	        List<String> retrievedIds =
	                services.stream().map(ImportedZookeeperService::id).collect(Collectors.toList());
	        List<String> existingIds =
	                imports.stream().map(ImportedZookeeperService::id).collect(Collectors.toList());

	        LOGGER.trace("Imported services: " + existingIds + ", Retrieved services form Zookeeper: "
	                     + retrievedIds);
	
	        services.forEach(svc -> {
	          String id = svc.id();
	
	          if (!existingIds.contains(id)) {
	            LOGGER.info("Imported service: " + id);
	            imports.add(svc);
	          }
	        });
	
	        //使用foreach删除会出现ConcurrentModificationException
	        Iterator<ImportedZookeeperService> iterator = imports.iterator();
	        while (iterator.hasNext()) {
	          ImportedZookeeperService svc = iterator.next();
	          if (!retrievedIds.contains(svc.id())) {
	            LOGGER.info("Unregistering " + svc.id());
	            iterator.remove();
	            svc.unregister(publisher, null);
	          }
	        }
	
	        completed.complete();
	      }
	    });
	  }

close方法中将所有的服务实例注销


	  @Override
	  public void close(Handler<Void> closeHandler) {
	    Future<Void> done = Future.future();
	    //删除所有服务实例
	    unregisterAllServices(done);

	    done.setHandler(v -> {
	      try {
	        if (cache != null) {
	          CloseableUtils.closeQuietly(cache);
	        }
	        if (serviceDiscovery != null) {
	          CloseableUtils.closeQuietly(serviceDiscovery);
	        }
	        if (client != null) {
	          CloseableUtils.closeQuietly(client);
	        }
	      } catch (Exception e) {
	        // Ignore them
	      }
	      closeHandler.handle(null);
	    });
	  }
	
	  private synchronized void unregisterAllServices(Future<Void> completed) {
	    List<Future> list = new ArrayList<>();
	
	    imports.forEach(svc -> {
	      Future<Void> unreg = Future.future();
	      svc.unregister(publisher, unreg);
	      list.add(unreg);
	    });
	
	    CompositeFuture.all(list).setHandler(x -> {
	      if (x.failed()) {
	        completed.fail(x.cause());
	      } else {
	        completed.complete();
	      }
	    });
	  }

同时,我们还需要监听整个服务节点的变化,来重新导入服务实例


	  @Override
	  public void childEvent(CuratorFramework curatorFramework,
	                         TreeCacheEvent treeCacheEvent) throws Exception {
	    if (started) {
	      retrieveIndividualServices(Future.future());
	    }
	  }

源码分析

ServiceDiscovery

构造方法


	  public DiscoveryImpl(Vertx vertx, ServiceDiscoveryOptions options) {
	    this.vertx = vertx;
	    this.announce = options.getAnnounceAddress();
	    this.usage = options.getUsageAddress();

	    this.backend = getBackend(options.getBackendConfiguration().getString("backend-name", null));
	    this.backend.init(vertx, options.getBackendConfiguration());
	
	    this.id = options.getName() != null ? options.getName() : getNodeId(vertx);
	    this.options = options;
	  }

DiscoveryImpl的构造方法主要做一些配置工作

  • 配置announce和usage事件的地址
  • 初始化对应的backend
  • 创建对应的ID

getBackend主要送从classpath中读取对应的ServiceDiscoveryBackend对象,如果没有找到合适的ServiceDiscoveryBackend,就使用默认值DefaultServiceDiscoveryBackend。关于ServiceDiscoveryBackend会在后面在描述

registerServiceImporter 直接调用importer.start方法启动对应的importer,在成功之后将import加入到importers中


	  public ServiceDiscovery registerServiceImporter(ServiceImporter importer, JsonObject configuration,
	                                                  Handler<AsyncResult<Void>> completionHandler) {
	    JsonObject conf;
	    if (configuration == null) {
	      conf = new JsonObject();
	    } else {
	      conf = configuration;
	    }

	    Future<Void> completed = Future.future();
	    completed.setHandler(
	        ar -> {
	          if (ar.failed()) {
	            LOGGER.error("Cannot start the service importer " + importer, ar.cause());
	            if (completionHandler != null) {
	              completionHandler.handle(Future.failedFuture(ar.cause()));
	            }
	          } else {
	            importers.add(importer);
	            LOGGER.info("Service importer " + importer + " started");
	
	            if (completionHandler != null) {
	              completionHandler.handle(Future.succeededFuture(null));
	            }
	          }
	        }
	    );
	
	    importer.start(vertx, this, conf, completed);
	    return this;
	  }

close方法 将所有import、export关闭,将所有的ServiceReference释放,并删除绑定关系。


	  public void close() {
	    LOGGER.info("Stopping service discovery");
	    List<Future> futures = new ArrayList<>();
	    for (ServiceImporter importer : importers) {
	      Future<Void> future = Future.future();
	      // TODO Change this call to call close, once the stop method has been removed
	      importer.stop(vertx, this, future);
	      futures.add(future);
	    }

	    for (ServiceExporter exporter : exporters) {
	      Future<Void> future = Future.future();
	      exporter.close(future::complete);
	      futures.add(future);
	    }
	
	    bindings.forEach(ServiceReference::release);
	    bindings.clear();
	
	    CompositeFuture.all(futures).setHandler(ar -> {
	      if (ar.succeeded()) {
	        LOGGER.info("Discovery bridges stopped");
	      } else {
	        LOGGER.warn("Some discovery bridges did not stopped smoothly", ar.cause());
	      }
	    });
	  }

publish 发布服务记录

将服务记录保存到ServiceDiscoveryBackend。如果注册有exporter,还需要调用exporter的onPublish方法向对应的Bridges发布服务。然后触发announce对象。


	  @Override
	  public void publish(Record record, Handler<AsyncResult<Record>> resultHandler) {
	    Status status = record.getStatus() != null
	        && record.getStatus() != Status.UNKNOWN
	        && record.getStatus() != Status.DOWN
	        ? record.getStatus() : Status.UP;

	    backend.store(record.setStatus(status), resultHandler);
	    for (ServiceExporter exporter : exporters) {
	      exporter.onPublish(new Record(record));
	    }
	    Record announcedRecord = new Record(record);
	    announcedRecord
	        .setRegistration(null)
	        .setStatus(status);
	    vertx.eventBus().publish(announce, announcedRecord.toJson());
	  }

unpublish 注销服务

从ServiceDiscoveryBackend中删除服务记录。如果注册有exporter,还需要调用exporter的onUnpublish方法向对应的Bridges注销服务。然后触发announce事件


	  @Override
	  public void unpublish(String id, Handler<AsyncResult<Void>> resultHandler) {
	    backend.remove(id, record -> {
	      if (record.failed()) {
	        resultHandler.handle(Future.failedFuture(record.cause()));
	        return;
	      }

	      for (ServiceExporter exporter : exporters) {
	        exporter.onUnpublish(id);
	      }
	
	      Record announcedRecord = new Record(record.result());
	      announcedRecord
	          .setRegistration(null)
	          .setStatus(Status.DOWN);
	      vertx.eventBus().publish(announce, announcedRecord.toJson());
	      resultHandler.handle(Future.succeededFuture());
	    });
	
	  }

getReferenceWithConfiguration方法

创建对应的ServiceReference,然后在bindings中保存绑定关系,发布bind类型的usage事件


	  @Override
	  public ServiceReference getReferenceWithConfiguration(Record record, JsonObject configuration) {
	    ServiceReference reference = ServiceTypes.get(record).get(vertx, this, record, configuration);
	    bindings.add(reference);
	    sendBindEvent(reference);
	    return reference;
	  }

release方法

删除bindings中的绑定关系,释放ServiceReference,发布release类型的usage事件


	  @Override
	  public boolean release(ServiceReference reference) {
	    boolean removed = bindings.remove(reference);
	    reference.release();
	    sendUnbindEvent(reference);
	    return removed;
	  }

getRecord方法比较简单,不做描述

ServiceDiscoveryBackend

服务实例的存储类,默认使用分布式内存存储(DefaultServiceDiscoveryBackend)。DefaultServiceDiscoveryBackend并不复杂,它内部使用一个AsyncMap来保存服务实例。


	  private AsyncMap<String, String> registry;

	  @Override
	  public void init(Vertx vertx, JsonObject config) {
	    this.registry = new AsyncMap<>(vertx, "service.registry");
	  }

AsyncMap的构造方法会判断节点是否在集群模式下,如果在集群模式下会使用clusterManager提供的分布式MAP,否则使用一个本地MAP(多个ServiceDiscovery共享同一个map).(LocalMapWrapper是借助ConcurrentMap对map的一个简单封装)


	//这是3.4.X版本的源码,会有性能问题,3.5.0之后的版本已经修改了实现方式
	  public AsyncMap(Vertx vertx, String name) {
	    this.vertx = vertx;
	    ClusterManager clusterManager = ((VertxInternal) vertx).getClusterManager();
	    if (clusterManager == null) {
	      syncMap = new LocalMapWrapper<>(vertx.sharedData().<K, V>getLocalMap(name));
	    } else {
	      syncMap = clusterManager.getSyncMap(name);
	    }
	  }

store方法 为每个服务记录生成一个唯一ID,只有ID为null的服务记录才表示未被发布


  @Override
  public void store(Record record, Handler<AsyncResult<Record>> resultHandler) {
    String uuid = UUID.randomUUID().toString();
    if (record.getRegistration() != null) {
      throw new IllegalArgumentException("The record has already been registered");
    }

    record.setRegistration(uuid);
    registry.put(uuid, record.toJson().encode(), ar -> {
      if (ar.succeeded()) {
        resultHandler.handle(Future.succeededFuture(record));
      } else {
        resultHandler.handle(Future.failedFuture(ar.cause()));
      }
    });
  }

remove方法 从AsyncMap中删除对应的ID


  @Override
  public void remove(String uuid, Handler<AsyncResult<Record>> resultHandler) {
    Objects.requireNonNull(uuid, "No registration id in the record");
    registry.remove(uuid, ar -> {
      if (ar.succeeded()) {
        if (ar.result() == null) {
          // Not found
          resultHandler.handle(Future.failedFuture("Record '" + uuid + "' not found"));
        } else {
          resultHandler.handle(Future.succeededFuture(
              new Record(new JsonObject(ar.result()))));
        }
      } else {
        resultHandler.handle(Future.failedFuture(ar.cause()));
      }
    });
  }

ServiceReference

所有服务类型的ServiceReference都可以继承自AbstractServiceReference。AbstractServiceReference提供了一个简单的缓存处理。ServiceReference未被释放时都从缓存中取.


	  public synchronized <X> X get() {
	    if (service == null) {
	      service = retrieve();
	    }
	    return cached();
	  }

	  public synchronized void release() {
	    ((DiscoveryImpl) discovery).unbind(this);
	    if (service != null) {
	      close();
	      service = null;
	    }
	  }

它有一个retrieve的抽象方法用于获取每个服务类型对应的服务对象,需要各个子类实现


	protected abstract T retrieve();

例如HTTP类型的实现如下


    public HttpClient retrieve() {
      HttpClientOptions options;
      if (config != null) {
        options = new HttpClientOptions(config);
      } else {
        options = new HttpClientOptions();
      }
      options.setDefaultPort(location.getPort()).setDefaultHost(location.getHost());
      if (location.isSsl()) {
        options.setSsl(true);
      }

      return vertx.createHttpClient(options);
    }
Edgar

Edgar
一个略懂Java的小菜比