服务器向浏览器推送消息

最后更新:2019-07-29

如果我们想将服务器端的数据在实时推送给WEB客户端时(例如IM,WEB端获取支付结果),一般来说有4种实现

  • 短轮询
  • 长轮询
  • SSE(Server-Sent Events)
  • HTTP/2 Server Push(本文不考虑)
  • websocket(本文不考虑)

短轮询

因为客户端只能主动向服务器端发送请求获取数据,但是要服务端主动发送数据给客户端,就可以在客户端不断的发送请求给服务器端,服务器端不管有没有数据都会返回数据并关闭连接。保持客户端不断发送请求的方法是设置一个定时器不断调用某个函数。

短轮询开发简单,但是有大半的请求都是无用的,浪费带宽和服务器资源,而且客户端获取信息不及时

长轮询

在长轮询机制中,客户端像传统轮询一样从服务器请求数据。然而,如果服务器没有可以立即返回给客户端的数据,则不会立刻返回一个空结果,而是保持这个请求等待数据到来(或者恰当的超时:小于ajax的超时时间),之后将数据作为结果返回给客户端。客户端收到响应(或超时)后立即重新想服务器发起请求

长轮询与短轮询相比,在无数据的时候不会频繁发送请求,客户端获取信息也更及时。但是它的开发相对更复杂一点,而且服务器hold连接会占用部分资源

Spring实现

Spring提供了DeferredResult对象来允许容器线程快速释放以便可以接受更多的请求提升吞吐量,让真正的业务逻辑在其他的工作线程中去完成。在Spring Mvc的控制层中,只要有一个用户请求便会实例化一个DeferedResult对象,然后返回该对象,进行响应客户端。只要DeferedResult对象不设置result响应的内容,则控制层的容器主线程在响应客户端上就会发生阻塞。

下面看一个demo

@GetMapping(value = "/deferred")
public DeferredResult<String> deferred() {
	logger.info("Request received");
	DeferredResult<String> deferredResult = new DeferredResult<>();
	CompletableFuture.supplyAsync(() -> {
		try {
			TimeUnit.SECONDS.sleep(3);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return "hello deferred";
	})
		.whenCompleteAsync((result, throwable) -> deferredResult.setResult(result));
	logger.info("Servlet thread released");

	return deferredResult;
}

@GetMapping(value = "/timeout")
public DeferredResult<String> timeout() {
	logger.info("Request received");
	DeferredResult<String> deferredResult = new DeferredResult<>(1000L, "timeout");
	deferredResult.onError(throwable -> {
		logger.error("deferred occur error");
	});
	deferredResult.onTimeout(() -> {
		logger.info("deferred timeout");
	});

	return deferredResult;
}

因为SpringMVC只会实例化一个Controller对象,无论有多少个用户请求,在堆上只有一个Controller对象,因此可以添加一个成员变量List,将这些用户请求的DeferedResult对象存放到List中,然后启动一个定时线程扫描list,从而依次执行setResult方法,响应客户端。这里就不上代码了。

SSE

SSE 就是服务器向客户端声明,接下来要发送的是流信息(streaming)。也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。

SSE是单向通道,只能服务器想浏览器发送(本质是下载)。相比websocket更轻量,而且支持短线重连,但是只能传输文本

spring SseEmitter实现

spring可以通过SseEmitter来实现,下面是一个简单的DEMO

public class MemoryInfo {
  private long heap;
  private long nonHeap;
  private long ts;
}
@Service
public class MemoryObserverJob {

  public final ApplicationEventPublisher eventPublisher;

  public MemoryObserverJob(ApplicationEventPublisher eventPublisher) {
    this.eventPublisher = eventPublisher;
  }

  @Scheduled(fixedRate = 1000)
  public void doSomething() {
    MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();
    MemoryUsage heap = memBean.getHeapMemoryUsage();
    MemoryUsage nonHeap = memBean.getNonHeapMemoryUsage();

    MemoryInfo mi = new MemoryInfo();
    mi.setHeap(heap.getUsed());
    mi.setNonHeap(nonHeap.getUsed());
    this.eventPublisher.publishEvent(mi);
  }
}
@RestController
public class SSEController {

  private final CopyOnWriteArrayList<SseEmitter> emitters = new CopyOnWriteArrayList<>();

  @GetMapping("/memory")
  public SseEmitter handle(HttpServletResponse response) {
    response.setHeader("Cache-Control", "no-store");

    SseEmitter emitter = new SseEmitter();
    this.emitters.add(emitter);

    emitter.onCompletion(() -> this.emitters.remove(emitter));
    emitter.onTimeout(() -> this.emitters.remove(emitter));

    return emitter;
  }

  @EventListener
  public void onMemoryInfo(MemoryInfo memoryInfo) {
    List<SseEmitter> deadEmitters = new ArrayList<>();
    this.emitters.forEach(emitter -> {
      try {
        emitter.send(memoryInfo);
      }
      catch (Exception e) {
        deadEmitters.add(emitter);
      }
    });

    this.emitters.removeAll(deadEmitters);
  }

}

index.html

<!DOCTYPE html>
<html>
<head>
  <title>Server Memory Monitor</title>
  <script>
    function initialize() {
      const eventSource = new EventSource('http://localhost:8080/memory');
      eventSource.onmessage = e => {
        const msg = JSON.parse(e.data);
        document.getElementById("timestamp").innerHTML = new Date(msg.ts);
        document.getElementById("heap").innerHTML = msg.heap;
        document.getElementById("nonheap").innerHTML = msg.nonHeap;
      };

      eventSource.onopen = e => console.log('open');
      eventSource.onerror = e => {
        if (e.readyState == EventSource.CLOSED) {
          console.log('close');
        }
        else {
          console.log(e);
        }
      };

      eventSource.addEventListener('second', function(e) {
        console.log('second', e.data);
      }, false);
    }
    window.onload = initialize;
  </script>
</head>
<body>
<h1>Memory Observer</h1>

<h3>Timestamp</h3>
<div id="timestamp"></div>

<h3>Heap Memory Usage</h3>
<div id="heap"></div>

<h3>Non Heap Memory Usage</h3>
<div id="nonheap"></div>
</body>
</html>

访问http://localhost:8080,我们可以看到内存信息每秒都在刷新

仔细观察浏览器和后台的控制台,可以发现服务端超时(或者完成)后,浏览器就好重新发起连接

emitter.complete();

请求头为Accept: text/event-stream,响应头为:Accept: text/event-stream,响应的数据为data:{"heap":143372408,"nonHeap":143372408,"ts":0}

服务器向浏览器发送的 SSE 数据,必须是 UTF-8 编码的文本,具有如下的 HTTP 头信息。

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

可以通过下面的语句指定超时时间

SseEmitter emitter = new SseEmitter(180_000L); //keep connection open for 180 seconds

我们还可以通过emitter.send方法发送SseEventBuilder

SseEventBuilder builder = SseEmitter.event().name("second").data("1");
SseEventBuilder builder =
SseEmitter.event().reconnectTime(10_000L).data(memoryInfo).id("1");
emitter.send(builder);

响应内容:

event:memory
retry:10000
data:{"heap":139911144,"nonHeap":139911144,"ts":0}
id:1

每一次响应的信息,由若干个message组成,每个message之间用\n\n分隔。每个message内部由若干行组成,每一行都是如下格式。

[field]: value\n

上面的field可以取四个值。

  • data 数据内容,如果数据很长,可以分成多行,最后一行用\n\n结尾,前面行都用\n结尾。
  • event event字段表示自定义的事件类型,默认是message事件。浏览器可以用addEventListener()监听该事件
  • id 数据标识符用id字段表示,相当于每一条数据的编号。浏览器用lastEventId属性读取这个值。一旦连接断线,浏览器会发送一个 HTTP 头,里面包含一个特殊的Last-Event-ID头信息,将这个值发送回来,用来帮助服务器端重建连接。因此,这个头信息可以被视为一种同步机制
  • retry 服务器可以用retry字段,指定浏览器重新发起连接的时间间隔。两种情况会导致浏览器重新发起连接:一种是时间间隔到期,二是由于网络错误等原因,导致连接出错。

Spring5 WebFlux实现

  @GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  public Flux<MemoryInfo> streamFlux() {
    return Flux.interval(Duration.ofSeconds(1))
        .map(sequence -> heap());
  }

第二种实现(没测试通过,还没搞清楚)

  @GetMapping(path = "/stream-sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  public Flux<ServerSentEvent<MemoryInfo>> streamEvents() {
    return Flux.interval(Duration.ofSeconds(1))
        .map(sequence -> ServerSentEvent.<MemoryInfo> builder()
            .id(String.valueOf(sequence))
            .event("memory")
            .data(heap())
            .build());
  }

参考资料

https://golb.hplar.ch/2017/03/Server-Sent-Events-with-Spring.html

http://www.ruanyifeng.com/blog/2017/05/server-sent_events.html

Edgar

Edgar
一个略懂Java的小菜比