分类 默认分类 下的文章

python flask 实现 SSE

这是最近2年第二次需要 server-sent events(SSE). 需求都是一样, 某个页面的需要后台去处理一个很长的请求, 后台处理至少要30秒钟. 这30多秒可以让用户去等, 但是有点长. 所以, 可以通过不断的把处理的进度和处理的中间结果尽快的展示到页面上显得尤为重要. 因此可以通过SSE把服务端到最新更新及时发送到页面, 让用户等等不是那么烦躁.

什么是 server-sent events

server-sent events 主要用来解决服务端要实时发送数据到客户端到情况. 比如最新的股票实时行情, 网页游戏服务端的实时数据, 视频/音频信息流等数据. 服务端随时可以推送数据到客户端, 客户端接收并处理. 但是客户端在第一次发送请求到服务端之后, 不会再发送新请求到服务端, 只能被动接收服务端到推送数据. 但是客户端可以选择随时关闭连接, 不再接收新数据. 这也是SSE 区分于 WebSockets 的一个重大区别.

server-sent events 客户端的接口

完全的文档可以看这里(https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events), 讲的很清楚.

客户端发起连接请求

const evtSource = new EventSource("//api.example.com/sse", {
  withCredentials: true,
});

客户端接收到匿名event

客户端处理:

evtSource.onmessage = (event) => {
  console.log(event);
};

这个时候, 服务端发送的数据是:

data: some text

# 或者是:
data: {"name": "eric", "age": 55}

客户端接收到命名event

客户端处理:

evtSource.addEventListener("myEventName", (event) => {
  console.log(event);
});

这个时候, 服务端发送的数据是:

event: myEventName
data: {"name": "eric", "age": 55}

客户端收到的注释(comment)

服务器可以发送以 : 开头的数据, 这时候客户端会认为这是没有任何意义的数据, 只是注释, 这通常只是用来保持这个连接, 不让它断掉. 比如

: this is a comment

服务端和客户端通信的协议

通过上面的例子可以看到, 2端其实都是纯文本行的数据发送与接收. 每行都是以 key: value 的形式表示的, 除了注释行, 它没有key, 是以 : 开头的一行.

允许的key 只有以下几种可能:

  1. event: event 的名字, 没有名字就默认是匿名事件.
  2. data: event的内容, 可以连续2行或多行都是 event: value 的形式, 这时候客户端浏览器会自动拼接2行或多行成一行, 拼接处加上换行符.
  3. id: event 的id, 可以没有.
  4. retry: 服务端发送给客户端, 当它侦测到断连之后, 多久才能发起重连断时间毫秒数.

所有不符合 key: value (这里的key是上面的4种) 和 : some comment 的形式, 都被认为是无效的.

客户端关闭连接

客户端通过下面的形式关闭连接:

evtSource.close();

客户端发现错误

客户端通过下面的代码做出错处理:

evtSource.onerror = (err) => {
  console.error("EventSource failed:", err);
};

关于客户端 EventSource 的所有API: https://developer.mozilla.org/en-US/docs/Web/API/EventSource

服务端的代码

其实客户端的代码相对来说比较统一, 浏览器都是统一规范. 服务端有不同的服务器语言, 实现起来却可能有差异. 比如 Java 的 JAX-RS 里面就有专门处理 SSE 的API.

今天我们就看一下 Python 里面是如何实现的.

python Flask 的实现

对于大家经常用到的 Flask, 如果你搜索 Flask SSE, 结果第一的是 https://flask-sse.readthedocs.io/en/latest/quickstart.html, 但是你点进去看, 发现它竟然使用 redis 去实现 SSE, 这相当于我又要安装一个 redis server. 这可都重的.

其实不需要这么实现, 也能达到 SSE 的效果. 服务端的代码如下:

import time
from flask import Flask, stream_with_context

@app.route('/sse')
def handle_sse_stream():
    def generate_event():
        while True:
           yield f'data: {"time": time.time()}\n\n'
           sleep(1000)
        
    return Response(stream_with_context(generate_event()), mimetype='text/event-stream')

上面的函数里面通过一个生成器, 不断的生成新的事件, 然后发送给客户端.

改进

通常我们的代码不会仅仅在那里sleep, 然后发送一个时间, 但是它可以每隔一段时间查询数据库去看最新的状态, 然后发送最新的状态给客户端. 比如上面的 yield 行的代码可以改成:

yield f'data: {"status": get_db_status()}\n\n'

进一步改进

上面的代码其实要求有一个数据库或者共享的组件来协调, 如果在同一个服务器上, 可以通过 queue.queue 这个队列来组成生产者/消费者 消息队列来传递消息.
上面的代码可以改成如下的方式:

def generate_event(queue):
    while True:
       yield f'data: {"msg": queue.get()}\n\n'
       sleep(1000)

这就解决了需要一个中间件来协调的问题.

总结

对于基于 Python flask 的 SSE, 其实可以通过 flask的 stream_with_context() 和 生成器来实现, 再通过 queue.queue 就实现了服务器内部的异步通信.

chrome 插件 SwitchyOmega 突然不能用 ERR_MANDATORY_PROXY_CONFIGURATION_FAILED

今天早上到公司, 任何网站都打不开了, 本地起的服务器本地端口都打不开. 出现如下页面:
not.png

症状

  1. 如果不用 SwitchyOmega 插件, 网站是可以打开的.
  2. 如果单单使用代理能打开墙外网站.
  3. 如果使用 Auto Switch 全都打不开.

我有一个墙外 proxy, 一个公司 PAC script. 然后使用前面2个组装一个 Auto Switch.

分析

根据症状分析 Auto Switch 里面的 公司 PAC script 出问题了. 于是检查 PAC script. 如下图:
php.png

竟然是一个 php 脚本, 这是???

说明公司的 PAC 脚本的服务器应该是 php 的, 它现在直接源文件返回了. 如果查看里面的php 内容, 发现它其实是产生 PAC 的脚本.

最后发现原来是公司这个 PAC 的服务器间歇性出问题, 有时候返回真正的 PAC, 有时候返回 php.

由 Transfer-Encoding chunked 引起的 site issue

Transfer-Encoding: chunked 介绍

Transfer-Encoding 是 HTTP 1.x 版本的一个header, 设置 payload 传输时候的一种编码. 可能的编码格式有: chunked, compress, deflate, gzip. 可以同时设置多个兼容的值. 这个header 只适用于 hop to top, 不适用于整个连接. 如果你想在整个连接上使用压缩算法, 应该使用 Content-Encoding header.

使用 Transfer-Encoding 的例子:

Transfer-Encoding: gzip, chunked

为什么要使用 chunked

假如一开始就知道要传输多长的payload 数据, 就可以使用 Content-Length header.
chunked: 一般因为一开始并不知道要传输多长的payload数据, 所以要一块一块传输, 在每一块的头上标注这一块有多长.
例子:

HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked

7\r\n
Mozilla\r\n
11\r\n
Developer Network\r\n
0\r\n
\r\n

payload header

HTTP 1.x 的 header 分为

由 Transfer-Encoding chunked 引起的 site issue

HTTP GET 请求

http GET 请求是最简单的请求类型. 在浏览器输入一个URL, 直接回车, 就是发送一个 http GET 请求. 一个简单的例子:

GET /path/to/resource?query=string HTTP/1.1
Host: www.example.com
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
Accept-Language: en-US,en;q=0.5
Accept-Encoding: gzip, deflate, sdch
Connection: keep-alive

上面的最后一行header 之后, 会在发送一行只有\r\n 的行, 表示请求结束.

Transfer-Encoding: chunked 介绍

Transfer-Encoding 是 HTTP 1.x 版本的一个header, 设置 payload 传输时候的一种编码. 可能的编码格式有: chunked, compress, deflate, gzip. 可以同时设置多个兼容的值. 这个header 只适用于 hop to top, 不适用于整个连接. 如果你想在整个连接上使用压缩算法, 应该使用 Content-Encoding header.

使用 Transfer-Encoding 的例子:

Transfer-Encoding: gzip, chunked

为什么要使用 chunked

假如一开始就知道要传输多长的payload 数据, 就可以使用 Content-Length header.
chunked: 一般因为一开始并不知道要传输多长的payload数据, 所以要一块一块传输, 在每一块的头上标注这一块有多长.
例子:

HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked

7\r\n
Mozilla\r\n
11\r\n
Developer Network\r\n
0\r\n
\r\n

当 HTTP GET 遇到 Transfer-Encoding: chunked

根据上面的介绍, HTTP GET 请求不应该包含 payload. 所以如果在header 里面误发了Transfer-Encoding: chunked,会发生什么事情呢?

不同的服务器可能有不同的处理方式, 有的快速返回, 有的等待接受payload.

Tomcat 的处理方式

根据作者本地 debug 的实践, 到现在为止(20240713), Tomcat 的最新版本仍然是等待接收 chunked payload, 直到 socket read timeout.

这是某个Tomcat 版本等待读取 payload 的栈:

java.lang.Object.wait(Native Method)
org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.fillReadBuffer(NioEndpoint.java:1333)
org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.read(NioEndpoint.java:1234)
org.apache.coyote.http11.Http11InputBuffer.fill(Http11InputBuffer.java:785)
org.apache.coyote.http11.Http11InputBuffer.access$400(Http11InputBuffer.java:41)
org.apache.coyote.http11.Http11InputBuffer$SocketInputBuffer.doRead(Http11InputBuffer.java:1185)
org.apache.coyote.http11.filters.ChunkedInputFilter.readBytes(ChunkedInputFilter.java:310)
org.apache.coyote.http11.filters.ChunkedInputFilter.parseChunkHeader(ChunkedInputFilter.java:338)
org.apache.coyote.http11.filters.ChunkedInputFilter.doRead(ChunkedInputFilter.java:164)
org.apache.coyote.http11.filters.ChunkedInputFilter.end(ChunkedInputFilter.java:229)
org.apache.coyote.http11.Http11InputBuffer.endRequest(Http11InputBuffer.java:644)
org.apache.coyote.http11.Http11Processor.endRequest(Http11Processor.java:1184)
org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:430)
org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63)
org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:926)
org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1791)
org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52)
org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)
org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
java.lang.Thread.run(Thread.java:750) 

Tomcat 的逻辑是先逐行读取 header 信息, 直到读到 \r\n 行, 然后根据header设置需要的 InputFilter 列表(虽然是列表, 可能只有一个). 常见的 InputFilter 有:

  1. VoidInputFilter - 当 GET, HEAD 请求时用.
  2. ChunkedInputFilter - 当 chunked 的时候用.

一个请求的例子

下面是使用 python 写的一个发送 GET 请求并且设置 Transfer-Encoding: chunked 的例子:

import socket
from concurrent.futures import ThreadPoolExecutor
 
# Configuration
host = 'www.tianxiaohui.com'
port = 80
buffer_size = 4096
read_timeout = 100000  # Set read timeout to 10 seconds
 
def call():
    # Create a socket object using IPv4 and TCP protocols
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
    # Set the read timeout on the socket
    client_socket.settimeout(read_timeout)
 
    try:
        # Connect to the server
        client_socket.connect((host, port))
 
        # Prepare the HTTP request data
        http_request = ("GET /sell/marketing/v1/ad_campaign?limit=100&offset=0 HTTP/1.1\r\n"
                        f"Host: {host}\r\n"
                        "accept: application/json, text/json, text/x-json, text/javascript\r\n"
                        "accept-encoding: application/gzip, deflate\r\n"
                        "Transfer-Encoding: chunked\r\n"
                        "\r\n")
 
        # Send the HTTP request to the server
        client_socket.sendall(http_request.encode())
 
        # Receive the response from the server
        response = ''
        while True:
            part = client_socket.recv(buffer_size).decode()
            if not part:
                break
            response += part
 
    except socket.timeout:
        print("Read timed out")
        response = None
    finally:
        # Close the socket
        client_socket.close()
 
    # Return the response
    return response
 
# Number of parallel calls
num_calls = 1
 
# Use ThreadPoolExecutor to execute the calls in parallel
with ThreadPoolExecutor(max_workers=num_calls) as executor:
    # Submit all calls to the executor
    future_calls = [executor.submit(call) for _ in range(num_calls)]
     
    # Wait for all futures to complete and print their results
    for future in future_calls:
        response = future.result()
        if response is not None:
            print("Response:")
            print(response)

如果改成对着本地的 tomcat 调用, 可以看到它等在那里20ms, 这20ms 就是读取完 header 之后, 等待读取 chunked payload, 却迟迟等不来的结果, 最后只有等到 read timeout.

这是在最新的 Tomcat 10.1.25 上得到的栈:

java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(java.base@17.0.4.1/Native Method)
    - waiting on <0x000000061a3aea90> (a java.util.concurrent.Semaphore)
    at org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.fillReadBuffer(NioEndpoint.java:1280)
    - locked <0x000000061a3aea90> (a java.util.concurrent.Semaphore)
    at org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.read(NioEndpoint.java:1181)
    at org.apache.coyote.http11.Http11InputBuffer.fill(Http11InputBuffer.java:789)
    at org.apache.coyote.http11.Http11InputBuffer$SocketInputBuffer.doRead(Http11InputBuffer.java:1195)
    at org.apache.coyote.http11.filters.ChunkedInputFilter.readBytes(ChunkedInputFilter.java:254)
    at org.apache.coyote.http11.filters.ChunkedInputFilter.fill(ChunkedInputFilter.java:295)
    at org.apache.coyote.http11.filters.ChunkedInputFilter.parseChunkHeader(ChunkedInputFilter.java:328)
    at org.apache.coyote.http11.filters.ChunkedInputFilter.doRead(ChunkedInputFilter.java:136)
    at org.apache.coyote.http11.filters.ChunkedInputFilter.end(ChunkedInputFilter.java:181)
    at org.apache.coyote.http11.Http11InputBuffer.endRequest(Http11InputBuffer.java:646)
    at org.apache.coyote.http11.Http11Processor.endRequest(Http11Processor.java:1188)
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:429)
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63)
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:904)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1741)
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52)
    at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1190)
    at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:63)
    at java.lang.Thread.run(java.base@17.0.4.1/Thread.java:833)

由http header Keep-Alive 引出的问题

由于建立一个网络连接相对于本地操作十分昂贵, 所以系统应该尽量重用一个已经建立的连接.

因此, 在http 1.0的规范中定义了 Keep-Alive header, 一个典型的例子:

Keep-Alive: timeout=5, max=1000

这里的 timeout 表示如果没有后续的重用, 让这个连接最短逗留(idle) 5秒才能关闭它. 若有重用, 这从最后一个重用重新开始计时.
这里的 max 表示如果这个连接被一直重用, 但是这个连接最多只能发送1000个请求(或响应), 一旦超过这个数目, 即使能还没等idle 5秒, 也要关闭它. 所以它表示最多可以通过这个连接发送多少请求(或响应).

http 1.1

http 1.1 默认是持久化连接, 所以默认暗含是 Keep-Alive 的, 即便不发送这个头字段. 如果不在 http 的请求和响应中发送这个头字段, 那么连接的两端又该如何控制 idle 的timeout 和 一个连接最多可以发送的请求(或响应)的数目呢?

既然是为了重用, 那就最大化重用. 是不是一直重用某个连接, 一直不关闭就好了呢?

timeout 的合理性

  1. 从资源管理的角度. 若服务器端一直保持连接, 那么每个新用户都保持一个连接, 即便用户几分钟没数据传输, 这会导致服务器资源(连接, 内存等)很快被耗尽. 若客户端程序(浏览器, 代码客户端)也会有一样的问题.
  2. 从公平的角度. 若一个连接一直保持, 那么后来的新用户肯定无法再连接上.
  3. 从安全和性能的角度. 若保持很多没有数据的连接, 可能会有潜在的性能降级或者内存泄漏问题.

设置一个合理的 timeout 值, 保证一旦一段时间没数据传输, 就关闭这个连接.

max 请求的合理性

既然有了 timeout 可以保证 idle 了就关闭, 同时为了最大化重用, 那么是不是不需要 max 了呢?
考虑一种情况: timeout 是5秒, 但是每当 idle 4秒的时候, 就有一个新的请求(或响应), 那么这个连接将一直会被重用. 那么一直重用又有什么不好呢?

在理想的情况一下, 一直有数据固定频率的传输, 永久的使用这个连接其实是非常美好的一种状态.

但是 max 也有其它合理的地方:

  1. 避免某些写的不好的代码把某些请求(request)资源绑定在这个连接之上, 导致内存泄漏, 若关掉连接则全部释放.
  2. 公平性. 比如某些抢票的网站, 避免某些人一直先保持一个连接, 导致后面的人根本连不上. max 保证多少次请求之后, 一定关闭.
  3. 对于某些负载均衡器(Loader Balancer), 如果某个特定IP上一直保持特别多的连接, 会导致不再均衡.

服务端的 timeout 和 max

虽然 http 1.1 不再设置 Keep-Alive 头字段, 默认是持久连接. 但是连接的2端仍然要对合理的这2种机制做处理. 下面列举一些常见的服务端的处理和配置.

Tomcat web 服务器

Tomcat 是一个常用的 Java web 服务器. 在今天(20240605)最新的 10.1 版本里面的 http Connector 的配置里, 就能看到关于 timeout 和 max 的配置:
keepAliveTimeout: 默认60秒.
maxKeepAliveRequests : 默认100.

nginx 服务器

nginx 的配置分别是:
keepalive_timeout: 默认75秒.
keepalive_requests: 默认 1000.
不过, nginx 还有一个 keepalive_time: 类似max, 它从时间角度约束一个连接从开始建立最长存活多久, 区别于idle timeout.

envoy proxy

对于 Envoy proxy 没有对应的完全一致的概念, 不过它有另外2个参数:
common_http_protocol_options.idle_timeout:
stream_idle_timeout:
更多内容参考: https://www.envoyproxy.io/docs/envoy/latest/faq/configuration/timeouts
https://github.com/envoyproxy/envoy/issues/8652

客户端的 timeout 和 max

既然对于 http 1.1 默认是持久的, 那么客户端也是暗含的, 不发送 Keep-Alive 的, 那么客户端是怎么处理的呢?

chrome 浏览器

没有找到相关文档.

Apache httpClient

在 4.5 版本 2.6 节 连接管理部份(https://hc.apache.org/httpcomponents-client-4.5.x/current/tutorial/html/connmgmt.html), 有下面一段描述:
“If the Keep-Alive header is not present in the response, HttpClient assumes the connection can be kept alive indefinitely”

JDK HttpClient

对于取值的大概处理:
https://github.com/openjdk/jdk/blob/e1870d360e05c372e672b519d7de2a60c333675b/src/java.base/share/classes/sun/net/www/http/HttpClient.java#L892-L937

max 的取值:

https://github.com/openjdk/jdk/blob/e1870d360e05c372e672b519d7de2a60c333675b/src/java.base/share/classes/sun/net/www/http/HttpClient.java#L902C25-L902C82

keepAliveConnections = p.findInt("max", usingProxy?50:5);

timeout 的取值

https://github.com/openjdk/jdk/blob/e1870d360e05c372e672b519d7de2a60c333675b/src/java.base/share/classes/sun/net/www/http/HttpClient.java#L906C37-L918

    /*
     * The timeout if specified by the server. Following values possible
     *  0: the server specified no keep alive headers
     * -1: the server provided "Connection: keep-alive" but did not specify a
     *     a particular time in a "Keep-Alive:" headers
     * -2: the server provided "Connection: keep-alive" and timeout=0
     * Positive values are the number of seconds specified by the server
     * in a "Keep-Alive" header
     */
OptionalInt timeout = p.findInt("timeout");
if (timeout.isEmpty()) {
    keepAliveTimeout = -1;
} else {
    keepAliveTimeout = timeout.getAsInt();
    if (keepAliveTimeout < 0) {
        // if the server specified a negative (invalid) value
        // then we set to -1, which is equivalent to no value
        keepAliveTimeout = -1;
    } else if (keepAliveTimeout == 0) {
        // handled specially to mean close connection immediately
        keepAliveTimeout = -2;
    }
}                          

max 用完之后, 关闭连接:

https://github.com/openjdk/jdk/blob/e1870d360e05c372e672b519d7de2a60c333675b/src/java.base/share/classes/sun/net/www/http/HttpClient.java#L443-L453

客户端总结

客户端也会考虑 连接idle 的timeout 和 max 请求数, 但是很多情况下, 它没有服务端那么紧迫, 但是客户端也有这些机制, 只是没有那么透明.