tcprelay 💌
by
Introduce to TCPRelay
EventLoop 事件发生了, 需要处理器进行处理, 在这里处理 TCP 的流程. 在 local.py
里面有这么一行代码 tcp_server = tcprelay.TCPRelay(config, dns_resolver, True)
, 创建 TCP 的服务器. 还有这么一行 tcp_server.add_to_loop(loop)
, 将其添加进入事件轮训器, 发生事件的时候, 调用这里的方法来处理 TCP 请求.
TL;DR
术语规定
- 浏览器: socks5 的客户端
- local: Shadowsocks 的 local 端
- server: Shadowsocks 的 server 端
- 服务器: 请求的服务端(e.g. google.com服务)
- local socket: 代表 local 端创建的用来和 浏览器 通讯的 socket
- remote socket: 代表 local 端创建的用来和 server 通讯的 socket
Notes: 记住, 我们讨论的是 local 端的代码, 所以我们的数据是这样流动的, 不涉及 服务器.
|--------------->| |------------>| |\
浏览器 ------| |---- local 端 ------| | ---- server 端---|\ 这边和服务器通讯
|<---------------| |<------------| |\
导入模块
from __future__ import absolute_import, division, print_function, \
with_statement
import time
import socket
import errno
import struct
import logging
import traceback
import random
from shadowsocks import cryptor, eventloop, shell, common
from shadowsocks.common import parse_header, onetimeauth_verify, \
onetimeauth_gen, ONETIMEAUTH_BYTES, ONETIMEAUTH_CHUNK_BYTES, \
ONETIMEAUTH_CHUNK_DATA_LEN, ADDRTYPE_AUTH
引入内置模块, 这里尤其要注意 struct
模块, 又是和 C 打交道的 👀 模块. 从 shadowsocks
模块引入 crypto
加密模块, 导入 eventloop
模块, 主要是用到里面的一些常量(POLL_IN, POLL_OUT, etc.)
模块常量定义
# we clear at most TIMEOUTS_CLEAN_SIZE timeouts each time
TIMEOUTS_CLEAN_SIZE = 512
一次最多清除的过期 socket 的数量, 如果超时的 socket 超过了这个临界点, 则只处理 512 个, 剩余等待下次处理, 主要是为了能及时处理发生的事件, 不至于在超时请求过多的时候导致后续事件的处理被延迟.
MSG_FASTOPEN = 0x20000000
# SOCKS METHOD definition
METHOD_NOAUTH = 0
# SOCKS command definition # 这里的变量是由 socks 协议规定的协议头的常量
CMD_CONNECT = 1
CMD_BIND = 2
CMD_UDP_ASSOCIATE = 3
SOCKS5 协议规定的认证方法和连接行为的常量值, 可以看看 Wikipedia-socks5 关于 SOCKS5 握手协议的详细介绍.
英文版介绍, 中文版的 Wikipedia 正常是无法访问的.
# for each opening port, we have a TCP Relay
# for each connection, we have a TCP Relay Handler to handle the connection
# for each handler, we have 2 sockets:
# local: connected to the client
# remote: connected to remote server
# for each handler, it could be at one of several stages:
# as sslocal:
# stage 0 auth METHOD received from local, reply with selection message
# stage 1 addr received from local, query DNS for remote
# stage 2 UDP assoc
# stage 3 DNS resolved, connect to remote
# stage 4 still connecting, more data from local received
# stage 5 remote connected, piping local and remote
# as ssserver:
# stage 0 just jump to stage 1
# stage 1 addr received from local, query DNS for remote
# stage 3 DNS resolved, connect to remote
# stage 4 still connecting, more data from local received
# stage 5 remote connected, piping local and remote
STAGE_INIT = 0
STAGE_ADDR = 1
STAGE_UDP_ASSOC = 2
STAGE_DNS = 3
STAGE_CONNECTING = 4
STAGE_STREAM = 5
STAGE_DESTROYED = -1
对于每一个监听的端口, 都会有一个 TCPrelay, 对于每一次连接, 都有 TCPRelayHandler 来处理这个请求.
对于每一个 TCPRelayHandler, 我们有两个 socket:
- local: 连接 client 端
- remote: 连接 server 服务器
对于每一个 TCPRelayHandler, 它必须处在这些状态之一(这就是一个状态机, 如果你读过 TCP 的原理, 会发现 TCP 连接也是一个非常复杂的状态机, 有兴趣可以看下IBM的官方文档, 非常棒, 在这里):
sslocal 状态:
- stage 0 收到 浏览器 发过来的认证 METHOD, 然后回复给 浏览器 选择的信息
- stage 1 收到 local 发过来的 addr(请求地址), 开始查询 server(也就是说在配置文件里面你可以填写ssserver的域名, 而不仅仅填写 IP 地址) 的 DNS信息
- stage 3 DNS 查询成功, 开始连接 server
- stage 4 仍在和 server 连接中, 可以从 浏览器 获取更多信息了, 等 server 一旦连接成功, 就将这些数据一起发给 server
- stage 5 server 连接建立成功, piping 浏览器 和 server
STAGE_DESTORED
说明请求已经被销毁, 系统资源已经被释放.
# for each handler, we have 2 stream directions:
# upstream: from client to server direction
# read local and write to remote
# downstream: from server to client direction
# read remote and write to local
STREAM_UP = 0 # 00000000
STREAM_DOWN = 1 # 00000001
对于每一个 TCPRelayHandler, 我们有 2 个数据流方向
- upstream: 从 浏览器 到 server 方向 →, 读取 浏览器 发过来的数据, 写向 server 端
- downstream: 从 server 到 浏览器 方向 ←, 读取 server 发过来的数据, 写向 浏览器 端
Notes: 请查看上面的图来理解这两个数据流方向
# for each stream, it's waiting for reading, or writing, or both
WAIT_STATUS_INIT = 0 # 00000000
WAIT_STATUS_READING = 1 # 00000001
WAIT_STATUS_WRITING = 2 # 00000010
WAIT_STATUS_READWRITING = WAIT_STATUS_READING | WAIT_STATUS_WRITING # 00000011
对于每一个数据流, 它要么是在等待读, 等待写, 或者是同时在等待读和写
上面的这一句话迷惑了我理解 local 端的通讯的整个流程, 先行解释一下:
- upstream
- upstream 等待读, 是等待从 local socket 读取数据
- upstream 等待写, 是等待向 remote socket 写入数据
- upstream 等待读写, 是等待从 local socket 读取数据, 也等待向 remote socket 写入数据
- downstream
- downstream 等待读, 是等待从 remote socket 读取数据
- downstream 等待写, 是等待向 local socket 写入数据
- downstream 等待读写, 是等待从 remote socket 读取数据, 也等待向 local socket 写入数据
BUF_SIZE = 32 * 1024 # 缓冲区大小
UP_STREAM_BUF_SIZE = 16 * 1024
DOWN_STREAM_BUF_SIZE = 32 * 1024
BUF_SIZE: 缓冲区大小, 暂时没有地方用到该常量
UP_STREAM_BUF_SIZE: 从 upstream 读取的数据(也就是从 local socket 读取数据)上限
DOWN_STREAM_BUF_SIZE: 从 downstream 读取的数据(也就是从 remote socket 读取数据)上限
# helper exceptions for TCPRelayHandler
class BadSocksHeader(Exception):
pass
class NoAcceptableMethods(Exception):
pass
辅助类, 在解析 socks5 协议的时候出错会抛出来
TCPRelayHandler
class TCPRelayHandler(object):
...
TCPRelay
class TCPRelay(object):
...
创造一个 TCP 服务器, 负责监听端口, TCP 请求来了, eventloop 产生事件, 传递到这里, 是服务器 socket 发生的事件? 说明 浏览器 请求连接. 不是? 说明是一个已经被接受的请求发生了不可描述.
__init__
def __init__(self, config, dns_resolver, is_local, stat_callback=None):
self._config = config
self._is_local = is_local
self._dns_resolver = dns_resolver
self._closed = False
self._eventloop = None
self._fd_to_handlers = {} # file descriptor(全局唯一的一个长整数L) => 相对应的处理器
self._is_tunnel = False
self._timeout = config['timeout']
self._timeouts = [] # a list for all the handlers
# we trim the timeouts once a while
self._timeout_offset = 0 # last checked position for timeout
self._handler_to_timeouts = {} # key: handler value: index in timeouts 为了高效删除 list 里的元素
if is_local:
listen_addr = config['local_address']
listen_port = config['local_port']
else:
listen_addr = config['server']
listen_port = config['server_port']
self._listen_port = listen_port
addrs = socket.getaddrinfo(listen_addr, listen_port, 0,
socket.SOCK_STREAM, socket.SOL_TCP) # socket.SOCK_STREAM 指定为 tcp
if len(addrs) == 0:
raise Exception("can't get addrinfo for %s:%d" %
(listen_addr, listen_port))
af, socktype, proto, canonname, sa = addrs[0] # family(AF_INET) type(socket.SOCK_STREAM) protocol(IPPROTOTCP) 权威回复 (address, port)
server_socket = socket.socket(af, socktype, proto)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(sa)
server_socket.setblocking(False)
if config['fast_open']:
try:
server_socket.setsockopt(socket.SOL_TCP, 23, 5)
except socket.error:
logging.error('warning: fast open is not available')
self._config['fast_open'] = False
server_socket.listen(1024)
self._server_socket = server_socket
self._stat_callback = stat_callback
参数:
- config: 配置文件, 需要用到里面的
local_address
,local_port
等 - dns_resolver: dns 查询器, 用来查询 server 的ip地址
- is_local: 是不是 local 端, 不过我们只站在 local 端, 此参数可以忽略
- stat_callback: 数据统计函数, 用来统计流量的函数, 默认为
None
self._config = config
self._is_local = is_local
self._dns_resolver = dns_resolver
将参数存到 self
里面
self._closed = False
标识该 TCP 服务器是不是已经被关闭
self._eventloop = None
该 TCP 服务器所处的 事件循环器, 初始化为 None
self._fd_to_handlers = {}
文件描述符对应其处理器(TCPRelayHandler), 图形描述一下:
eventloop 事件发生 ----> 发现是文件描述符`xx`发生的事件, 找出对应的处理器(e.g. TCPRelay), 调用 handle_event
|
|------------|handle_event
| |
| \ /
| 这是一个请求连接我们监听的端口的请求 ¬
| accept, 然后将生成的处理该请求的 socket 传递给 TCPRelayHandler ¬
| __init__, 创建 TCPRelayHandler 实例, 会将处理该请求的 socket 的文件 ¬
| 描述符作为 key, self 作为 value, 存放在 _fd_to_handlers 中
\ /
不是请求连接我们监听的端口的请求, 说明这是一个已经正在处理中的 socket 发生的事件 ¬
从 _fd_to_handlers 取出来它对应的 TCPRelayHandler 实例, 调用其 handler_event 处理
这就是 _fd_to_handlers
对象的作用, 变量名也可以望文生义, fd(file descriptor) 对应 handler(TCPRelayHandler实例)
self._is_tunnel = False
TODO: 没懂暂不解释
self._timeout = config['timeout']
从配置文件取出来 timeout
字段, 存下来. 超时时间, 过了这个超时时间, 即便请求没有处理完毕, 也要销毁.
self._timeouts = [] # a list for all the handlers
这里面存放了所有的 handler, 从变量名字上来看, 假设所有的都已经过期, 然后处理的时候判断是不是真的过期了.
self._timeout_offset = 0 # last checked position for timeout
在 timeouts
里面最后一次检查的位置.
self._handler_to_timeouts = {}
key: handler value: index in timeouts key 是 TCPRelayHandler 的 hash 值, value 为其在 timeouts
中的索引值, 该变量是为了高效删除 timeouts
里的元素.
listen_addr = config['local_address']
listen_port = config['local_port']
从配置文件里面取出来需要监听的本地IP地址(注意, 这里不能填写域名)和端口号.
self._listen_port = listen_port
存下来需要监听的端口号
addrs = socket.getaddrinfo(listen_addr, listen_port, 0,
socket.SOCK_STREAM, socket.SOL_TCP) # socket.SOCK_STREAM 指定为 tcp
这里的 getaddrinfo
是来获取监听这个 IP 地址和端口号需要的所有信息, 有了这些信息, 我们才能监听配置的端口号, 该方法由 socket
模块提供, 其实就是封装了一份 C 里面的方法, 后面可以详细的介绍 C 里面的 socket, 和 python 官方解释器 cpython
是如何封装的, 这里我们知道就就好.
if len(addrs) == 0:
raise Exception("can't get addrinfo for %s:%d" %
(listen_addr, listen_port))
如果 addr
的 len
为 0 说明获取该IP地址的时候出错, 无法进行下一步的行为, 抛出错误.
af, socktype, proto, canonname, sa = addrs[0] # family(AF_INET) type(socket.SOCK_STREAM) protocol(IPPROTOTCP) 权威回复 (address, port)
af: address family 地址家庭
type: type
prototocal
canonname: 权威回复
sa: socket address 包含监听该地址的所有信息
server_socket = socket.socket(af, socktype, proto)
创建 socket 对象, 传入在上方拿到的信息.
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
设置 socket 选项.
server_socket.bind(sa)
绑定地址, 需要一个元组 (address, port)
server_socket.setblocking(False)
非阻塞式监听
server_socket.listen(1024)
开始监听, 传入最大的监听队列数量.
self._server_socket = server_socket
存储下来 socket
self._stat_callback = stat_callback
存储下来 stat
统计函数.
add_to_loop
def add_to_loop(self, loop):
if self._eventloop:
raise Exception('already add to loop')
if self._closed:
raise Exception('already closed')
self._eventloop = loop
self._eventloop.add(self._server_socket,
eventloop.POLL_IN | eventloop.POLL_ERR, self)
self._eventloop.add_periodic(self.handle_periodic)
将自身加入循环器, 传递过来的就是循环器. 如果已经在一个循环器中了, 就抛出错误. 如果已经被关闭, 抛出错误. 存储下来循环器, 然后将自身的 _server_socket
也就是服务器 socket (listen 的 socket), 监听模式为有错误产生(POLL_ERR)和有数据流入(POLL_IN), 然后将自身的周期性函数加入循环器中.
remove_handler
def remove_handler(self, handler):
index = self._handler_to_timeouts.get(hash(handler), -1) # 从全部的处理器中找到需要移除的处理器
if index >= 0: # 如果找到了
# delete is O(n), so we just set it to None
self._timeouts[index] = None # 将其在_timeouts中的引用置为 None 释放资源
del self._handler_to_timeouts[hash(handler)]
根据 handler
拿出来该 handler
在 _timeouts
里面的索引值, 这里是为了高效的删除 handler
, 因为如果没有这个索引值, 那就要循环整个 _timeouts
数组来找到这个 handler
, 速度会慢! 如果找到了, 就置其为 None
注意
- 置为
None
, 而不是删除, 是为了不影响保存下来的其他的handler
在_handler_to_timeouts
中的索引值 hash(handler)
因为_handler_to_timeouts
里面存储的是handler
的 hash 值
update_activity
def update_activity(self, handler, data_len): # handler: TCPReplyHandler, data_len: 活跃的时候传送的字节数
if data_len and self._stat_callback: # 由实例化 TCPReply 时传递进来的函数, 为了统计需要, 不过该版本的 shadowsocks 并没有用到该统计函数
self._stat_callback(self._listen_port, data_len)
# set handler to active
now = int(time.time())
if now - handler.last_activity < eventloop.TIMEOUT_PRECISION:
# thus we can lower timeout modification frequency
return
handler.last_activity = now # 更新 TCPReply 的 last_activity
index = self._handler_to_timeouts.get(hash(handler), -1) # 根据handler 的 ID 拿出来当时的 _timeouts 的长度 其实就是自己在 _timeouts 里面的索引值
if index >= 0:
# delete is O(n), so we just set it to None
self._timeouts[index] = None # 因为在取 length 值得时候自己并不在里面, 所以在自己进入这个 _timeouts 的 list 里面的时候 这个 length 值就是自己的索引值
length = len(self._timeouts) # 当前的所有 handler 的数量
self._timeouts.append(handler) # 将当前的 handler 放入 _timeouts 中
self._handler_to_timeouts[hash(handler)] = length # _handler_to_timeouts 存放的是 处理器 的 ID 对应着处理器自己在 _timeouts 里面的索引值
更新活跃, 参数:
- handler: 处理器 TCPRelayHandler
- data_len: 数据长度
|---> 数据长度有效并且统计函数存在, 将监听的端口和数据长度传递给统计函数
|
update_activity -- |---> 检测当前时间距离这个 handler 的上次活跃是否已经小于了 eventloop 的时间粒度,
| 这里的判断可以降低我们超时修改的频率, 如果没有超过, 直接返回
|
|---> 如果在 _handler_to_timeouts 里面找到了对应的索引值, 说明该 handler 已经被
| 存储下来, 将其对应的位置置为 None
|
|--- 获取当前的所有的 handler 的长度, 然后将其存储到存放所有的 handler 的数组 _timeouts 中,
| 然后将该 handler 在 _timeouts 里面的索引存储下来.
详细的解释一下 _timeouts
变量和 _handler_to_timeouts
这两个变量的对应关系:
这里 handler1, handler2, handler3, 都是在创建的时候被添加, 也就是从 TCPRelayHandler
的构造函数 __init__
的 self._update_activity()
调用的
handler1被创建 -> handler2被创建 -> handler3被创建
_timeouts _handler_to_timeouts
+-----------+ +--------------------+
| | | |
| handler1 ----------------> 0 |
| | | |
| handler2 ----------------> 1 |
| | ------>| |
| handler3 ----------------> 2 |
| | | |
| ... | | ... |
+-----------+ +--------------------|
在 TCPRelayHandler 的 _on_local_read
和 _on_remote_read
中, 会调用该函数.
handler1 调用 _on_local_read
_timeouts _handler_to_timeouts
+-----------+ +--------------------+
| | | |
| None ----------------> 0 |
| | | |
| handler2 ----------------> 1 |
| | ------>| |
| handler3 ----------------> 2 |
| | | |
| handler1 ----------------> 3 |
| ... | | ... |
+-----------+ +--------------------|
handler1 调用 _on_remote_read
_timeouts _handler_to_timeouts
+-----------+ +--------------------+
| | | |
| None ----------------> 0 |
| | | |
| handler2 ----------------> 1 |
| | ------>| |
| handler3 ----------------> 2 |
| | | |
| None ----------------> 3 |
| | | |
| handler1 ----------------> 4 |
| ... | | ... |
+-----------+ +--------------------|
handler2 调用 _on_local_read
_timeouts _handler_to_timeouts
+-----------+ +--------------------+
| | | |
| None ----------------> 0 |
| | | |
| None ----------------> 1 |
| | ------>| |
| handler3 ----------------> 2 |
| | | |
| None ----------------> 3 |
| | | |
| handler1 ----------------> 4 |
| | | |
| handler2 ----------------> 5 |
| ... | | ... |
+-----------+ +--------------------|
handler2 调用 _on_remote_read
_timeouts _handler_to_timeouts
+-----------+ +--------------------+
| | | |
| None ----------------> 0 |
| | | |
| None ----------------> 1 |
| | ------>| |
| handler3 ----------------> 2 |
| | | |
| None ----------------> 3 |
| | | |
| handler1 ----------------> 4 |
| | | |
| None ----------------> 5 |
| | | |
| handler2 ----------------> 6 |
| ... | | ... |
+-----------+ +--------------------|
以此类推…
我们从 _handler_to_timeouts
取出来索引值比我们循环整个 timeouts
来获取对应的 handler 速度会快很多, 但是会增加点内存占用, 毕竟多了一个变量.
_sweep_timeout
def _sweep_timeout(self):
# tornado's timeout memory management is more flexible than we need
# we just need a sorted last_activity queue and it's faster than heapq
# in fact we can do O(1) (大〇表示法, 算法复杂度) insertion/remove so we invent(发明, 创造) our own
if self._timeouts:
logging.log(shell.VERBOSE_LEVEL, 'sweeping timeouts')
now = time.time()
length = len(self._timeouts)
pos = self._timeout_offset
while pos < length:
handler = self._timeouts[pos]
if handler:
if now - handler.last_activity < self._timeout:
break
else:
if handler.remote_address:
logging.warn('timed out: %s:%d' %
handler.remote_address)
else:
logging.warn('timed out')
handler.destroy()
self._timeouts[pos] = None
pos += 1
else:
pos += 1
if pos > TIMEOUTS_CLEAN_SIZE and pos > length >> 1:
# clean up the timeout queue when it gets larger than half
# of the queue
self._timeouts = self._timeouts[pos:]
for key in self._handler_to_timeouts:
self._handler_to_timeouts[key] -= pos
pos = 0
self._timeout_offset = pos
清除超时的 handler(TCPRelayHandler)
注释翻译:
tornado 的超时内存管理比我们需要的更具弹性 我们只需要一个根据 last_activity 排序的队列, 并且比 headq(堆排序算法) 更快 实际上我们可以做 O(1) 插入/移除, 因此我们创造自己的算法
tornado 是一个 Python 版本的 Web Server, 官网在这里
if self._timeouts:
如果 self._timeouts
数组不为空, 开始处理
now = time.time()
length = len(self._timeouts)
pos = self._timeout_offset
- 获取当前时间戳
- 获取当前的 _timeouts 数组长度
- 获取上一次在这里处理的偏移量, 初始化为0️⃣
while pos < length:
如果上次处理的地方小于了 _timeouts
的长度, 说明有新的 handler 被添加进来, 需要在这一次的 _sweep_timeout
处理.
handler = self._timeouts[pos]
取出上次处理的下一个 handler, if handler:
, 如果 handler 不存在, else: pos += 1
, pos
变量加一.
如果 handler 存在, 走向销毁流程:
if now - handler.last_activity < self._timeout: # config 中配置的过期时间, 如果超过此事件, 即便该请求没有处理完也要 destroy 释放资源
break
一旦发现活跃的 handler
没有超时, break
掉, 不再处理 timeout, 为了能快速响应请求.
if handler.remote_address:
logging.warn('timed out: %s:%d' %
handler.remote_address)
else:
logging.info('timed out')
logging.warn('timed out')
handler.destroy()
self._timeouts[pos] = None # free memory # 其实在 destroy 的时候就已经被置为了空, 这里是多此一举, 不过也可以说是为了保证的确已经被释放
pos += 1
发现该 handler 已经超时, 调用 handler 的 destory 销毁, 然后将 _timeouts
的 pos 位置 置为 None
. 然后 pos += 1
.
if pos > TIMEOUTS_CLEAN_SIZE and pos > length >> 1:
如果 pos 大于了 TIMEOUTS_CLEAN_SIZE
常量, 说明超时的已经很多了, 在上面的 destory 中可以看到, 并没有删除在 _timeouts
中的位置, 而是置为了 None
, 所以如果超时过多会导致 _timeouts
中元素过多, 并且都是 None
, 所以要在这里处理一下, 这里还有一个 and
符号, 第二个判断是 pos > length >> 1
说明 pos 的值是不是超过了 _timeouts
的长度的一半, 这里是位运算符 ».
位运算符对于奇偶数的处理不尽相同, 如果length是偶数则是一半, 奇数则是一半减一
位运算:
/* 偶数 */
const foo = 4
foo >> 1
// 4 / 2 result: 2
/* 奇数 */
const bar = 7
bar >> 1
// 7 / 2 - 1 result: 3
self._timeouts = self._timeouts[pos:]
移除 None 值
for key in self._handler_to_timeouts:
self._handler_to_timeouts[key] -= pos
每一个处理器存着自己被update当时的处理器的数量, 这里移除了 None 也要将所有的处理器 对应的 减去这些被删除的 None 值的数量
pos = 0
重置 pos 值为0
self._timeout_offset = pos
更新 TCPReply 上的变量, 来为下一次到这里用到. 分段式处理思维.
handle_event
def handle_event(self, sock, fd, event):
if sock:
logging.log(shell.VERBOSE_LEVEL, 'fd %d %s', fd,
eventloop.EVENT_NAMES.get(event, event))
if sock == self._server_socket:
if event & eventloop.POLL_ERR:
# TODO
raise Exception('server_socket error')
try:
logging.debug('accept')
conn = self._server_socket.accept()
TCPRelayHandler(self, self._fd_to_handlers,
self._eventloop, conn[0], self._config,
self._dns_resolver, self._is_local)
except (OSError, IOError) as e:
error_no = eventloop.errno_from_exception(e)
if error_no in (errno.EAGAIN, errno.EINPROGRESS,
errno.EWOULDBLOCK):
return
else:
shell.print_exception(e)
if self._config['verbose']:
traceback.print_exc()
else: # 发生事件的 socket 是一个非服务端 socket, 这是处理 socks5 客户端请求的一个 socket
if sock:
handler = self._fd_to_handlers.get(fd, None)
if handler:
handler.handle_event(sock, event)
else:
logging.warn('poll removed fd')
最重要的方法, TCP 请求都会传递到这里, 并且传入参数:
- sock: 发生 tcp 事件的 socket
- fd: 发生 tcp 事件的 文件描述符, 就是一个 float 数字
- event: 发生 tcp 事件的类型, 是可读事件发生还是可写事件发生异或错误事件发生
如果 sock 存在, 打印请求的 fd
以及发生的事件模式
然后判断发生事件的是不是我们的服务器 socket, 如果是服务器的 socket 发生了事件, 说明这是有请求流入, 我们走向服务器流程.
下面是处理的代码:
if sock == self._server_socket: # 如果发生事件的 socket 是我们监听的 socket
if event & eventloop.POLL_ERR:
# TODO
raise Exception('server_socket error')
try:
logging.debug('accept')
conn = self._server_socket.accept() # socks5 服务端的 socket 接受连接, 将会创建一个新的 socket 来处理客户端的请求 conn[0]是新创建的这个 socket conn[1]是连接过来的客户端的ip和port example: ('127.0.0.1', 9999)
TCPRelayHandler(self, self._fd_to_handlers, # 这里将 _fd_to_handlers 传递给 TCPRelayHandler, 他会将请求的 socket 映射到这个 dict 里面
self._eventloop, conn[0], self._config,
self._dns_resolver, self._is_local)
except (OSError, IOError) as e:
error_no = eventloop.errno_from_exception(e)
if error_no in (errno.EAGAIN, errno.EINPROGRESS,
errno.EWOULDBLOCK):
return
else:
shell.print_exception(e)
if self._config['verbose']:
traceback.print_exc()
图形化流程
|------> 如果 event 是 POLL_ERR, 说明我们监听的 socket 出现了错误, 无法继续, 抛出来错误❌
|
is server socket ---------|----|--> try 接受客户端的连接请求, 并创建和该请求相关的 TCPRelayHandler
|
|--> except 接受请求的时候发生了错误
如果发生事件的不是 server socket, 说明这是一个已有的请求发生的事件
else: # 发生事件的 socket 是一个非服务端 socket, 这是处理 socks5 客户端请求的一个 socket
if sock:
handler = self._fd_to_handlers.get(fd, None)
if handler:
handler.handle_event(sock, event)
else:
logging.warn('poll removed fd')
图形化流程:
|-----> 如果发生事件的 socket 存在, 根据文件描述符取出来和该请求相关的 TCPRelayHandler
| |
not server socket --------| |-----> 如果handler存在, 调用其 handler_event 并且传入 socket 和 事件模式
|
|-----> 如果发生事件的 socket 不存在, 说明 poll 已经移除了该文件描述符的监听
handle_periodic
周期性的处理 TCPRelay 中的数据, 每隔几秒之后都会执行到这里.
def handle_periodic(self):
logging.info('周期性的处理 tcpreply 的 timeouts')
if self._closed:
if self._server_socket:
self._eventloop.remove(self._server_socket)
self._server_socket.close()
self._server_socket = None
logging.info('closed TCP port %d', self._listen_port)
if not self._fd_to_handlers:
logging.info('stopping')
self._eventloop.stop()
self._sweep_timeout()
图形化过程:
|--- YES ----> 当前的 server socket 是否存在 -- YES ---> 从 eventloop中移除 server socket,
| | 调用 close 关闭 socket,
| | 去除和该 socket 的引用
| |---- NO ---> CONTINUE
| |
| |-----> _fd_to_handlers 如果为空,
| 则证明已经处理完所有的请求,
| 可以关闭 eventloop了
|
当前是否处于关闭状态 -------|--- NO -----> CONTINUE
清扫超时的请求 --> 可以看上面 _sweep_timeout 方法
close
关闭 TCP 监听服务器
def close(self, next_tick=False):
logging.debug('TCP close')
self._closed = True
if not next_tick:
if self._eventloop:
self._eventloop.remove_periodic(self.handle_periodic)
self._eventloop.remove(self._server_socket)
self._server_socket.close()
for handler in list(self._fd_to_handlers.values()):
handler.destroy()
参数:
- self 自身
- next_tick 是马上关闭服务器还是等待处理完所有的请求连接之后再关闭
logging.debug('TCP close')
打印日志
self._closed = true
服务器已经关闭的标志位
if not next_tick:
如果不等待处理已经建立的请求, 直接强制关闭 TCP 服务器
if self._eventloop:
self._eventloop.remove_periodic(self.handle_periodic)
self._eventloop.remove(self._server_socket)
从 _eventloop
中移除周期性的函数 self.handle_periodic
从 _eventloop
中移除对于 self._server_socket
socket 的监听
for handler in list(self._fd_to_handlers.values()):
handler.destroy()
destroy 掉所有存在的 处理器(TCPRelayHandler).
tags: