deepinss

Deep in shadowsocks.

View on GitHub
26 December 2017

tcprelay 💌

by

Introduce to TCPRelay

EventLoop 事件发生了, 需要处理器进行处理, 在这里处理 TCP 的流程. 在 local.py 里面有这么一行代码 tcp_server = tcprelay.TCPRelay(config, dns_resolver, True), 创建 TCP 的服务器. 还有这么一行 tcp_server.add_to_loop(loop), 将其添加进入事件轮训器, 发生事件的时候, 调用这里的方法来处理 TCP 请求.

TL;DR

术语规定

Notes: 记住, 我们讨论的是 local 端的代码, 所以我们的数据是这样流动的, 不涉及 服务器.


            |--------------->|                   |------------>|                 |\
浏览器 ------|                |---- local ------|            | ---- server 端---|\ 这边和服务器通讯
            |<---------------|                   |<------------|                 |\

导入模块


from __future__ import absolute_import, division, print_function, \
    with_statement

import time
import socket
import errno
import struct
import logging
import traceback
import random

from shadowsocks import cryptor, eventloop, shell, common
from shadowsocks.common import parse_header, onetimeauth_verify, \
    onetimeauth_gen, ONETIMEAUTH_BYTES, ONETIMEAUTH_CHUNK_BYTES, \
    ONETIMEAUTH_CHUNK_DATA_LEN, ADDRTYPE_AUTH

引入内置模块, 这里尤其要注意 struct 模块, 又是和 C 打交道的 👀 模块. 从 shadowsocks 模块引入 crypto 加密模块, 导入 eventloop 模块, 主要是用到里面的一些常量(POLL_IN, POLL_OUT, etc.)

模块常量定义


# we clear at most TIMEOUTS_CLEAN_SIZE timeouts each time
TIMEOUTS_CLEAN_SIZE = 512

一次最多清除的过期 socket 的数量, 如果超时的 socket 超过了这个临界点, 则只处理 512 个, 剩余等待下次处理, 主要是为了能及时处理发生的事件, 不至于在超时请求过多的时候导致后续事件的处理被延迟.


MSG_FASTOPEN = 0x20000000


# SOCKS METHOD definition
METHOD_NOAUTH = 0

# SOCKS command definition # 这里的变量是由 socks 协议规定的协议头的常量
CMD_CONNECT = 1
CMD_BIND = 2
CMD_UDP_ASSOCIATE = 3

SOCKS5 协议规定的认证方法和连接行为的常量值, 可以看看 Wikipedia-socks5 关于 SOCKS5 握手协议的详细介绍.

英文版介绍, 中文版的 Wikipedia 正常是无法访问的.


# for each opening port, we have a TCP Relay

# for each connection, we have a TCP Relay Handler to handle the connection

# for each handler, we have 2 sockets:
#    local:   connected to the client
#    remote:  connected to remote server

# for each handler, it could be at one of several stages:

# as sslocal:
# stage 0 auth METHOD received from local, reply with selection message
# stage 1 addr received from local, query DNS for remote
# stage 2 UDP assoc
# stage 3 DNS resolved, connect to remote
# stage 4 still connecting, more data from local received
# stage 5 remote connected, piping local and remote

# as ssserver:
# stage 0 just jump to stage 1
# stage 1 addr received from local, query DNS for remote
# stage 3 DNS resolved, connect to remote
# stage 4 still connecting, more data from local received
# stage 5 remote connected, piping local and remote

STAGE_INIT = 0
STAGE_ADDR = 1
STAGE_UDP_ASSOC = 2
STAGE_DNS = 3
STAGE_CONNECTING = 4
STAGE_STREAM = 5
STAGE_DESTROYED = -1

对于每一个监听的端口, 都会有一个 TCPrelay, 对于每一次连接, 都有 TCPRelayHandler 来处理这个请求.

对于每一个 TCPRelayHandler, 我们有两个 socket:

  1. local: 连接 client 端
  2. remote: 连接 server 服务器

对于每一个 TCPRelayHandler, 它必须处在这些状态之一(这就是一个状态机, 如果你读过 TCP 的原理, 会发现 TCP 连接也是一个非常复杂的状态机, 有兴趣可以看下IBM的官方文档, 非常棒, 在这里):

sslocal 状态:

  1. stage 0 收到 浏览器 发过来的认证 METHOD, 然后回复给 浏览器 选择的信息
  2. stage 1 收到 local 发过来的 addr(请求地址), 开始查询 server(也就是说在配置文件里面你可以填写ssserver的域名, 而不仅仅填写 IP 地址) 的 DNS信息
  3. stage 3 DNS 查询成功, 开始连接 server
  4. stage 4 仍在和 server 连接中, 可以从 浏览器 获取更多信息了, 等 server 一旦连接成功, 就将这些数据一起发给 server
  5. stage 5 server 连接建立成功, piping 浏览器 和 server

STAGE_DESTORED 说明请求已经被销毁, 系统资源已经被释放.


# for each handler, we have 2 stream directions:
#    upstream:    from client to server direction
#                 read local and write to remote
#    downstream:  from server to client direction
#                 read remote and write to local

STREAM_UP = 0 #   00000000
STREAM_DOWN = 1 # 00000001

对于每一个 TCPRelayHandler, 我们有 2 个数据流方向

Notes: 请查看上面的图来理解这两个数据流方向


# for each stream, it's waiting for reading, or writing, or both
WAIT_STATUS_INIT = 0 # 00000000
WAIT_STATUS_READING = 1  # 00000001
WAIT_STATUS_WRITING = 2 # 00000010
WAIT_STATUS_READWRITING = WAIT_STATUS_READING | WAIT_STATUS_WRITING # 00000011

对于每一个数据流, 它要么是在等待读, 等待写, 或者是同时在等待读和写

上面的这一句话迷惑了我理解 local 端的通讯的整个流程, 先行解释一下:

  1. upstream
    • upstream 等待读, 是等待从 local socket 读取数据
    • upstream 等待写, 是等待向 remote socket 写入数据
    • upstream 等待读写, 是等待从 local socket 读取数据, 也等待向 remote socket 写入数据
  2. downstream
    • downstream 等待读, 是等待从 remote socket 读取数据
    • downstream 等待写, 是等待向 local socket 写入数据
    • downstream 等待读写, 是等待从 remote socket 读取数据, 也等待向 local socket 写入数据

BUF_SIZE = 32 * 1024 # 缓冲区大小
UP_STREAM_BUF_SIZE = 16 * 1024
DOWN_STREAM_BUF_SIZE = 32 * 1024

BUF_SIZE: 缓冲区大小, 暂时没有地方用到该常量

UP_STREAM_BUF_SIZE: 从 upstream 读取的数据(也就是从 local socket 读取数据)上限

DOWN_STREAM_BUF_SIZE: 从 downstream 读取的数据(也就是从 remote socket 读取数据)上限


# helper exceptions for TCPRelayHandler
class BadSocksHeader(Exception):
    pass
class NoAcceptableMethods(Exception):
    pass

辅助类, 在解析 socks5 协议的时候出错会抛出来

TCPRelayHandler


class TCPRelayHandler(object):
    ...

TCPRelay


class TCPRelay(object):
    ...

创造一个 TCP 服务器, 负责监听端口, TCP 请求来了, eventloop 产生事件, 传递到这里, 是服务器 socket 发生的事件? 说明 浏览器 请求连接. 不是? 说明是一个已经被接受的请求发生了不可描述.

__init__


def __init__(self, config, dns_resolver, is_local, stat_callback=None):
    self._config = config
    self._is_local = is_local
    self._dns_resolver = dns_resolver
    self._closed = False
    self._eventloop = None
    self._fd_to_handlers = {} # file descriptor(全局唯一的一个长整数L) => 相对应的处理器
    self._is_tunnel = False

    self._timeout = config['timeout']
    self._timeouts = []  # a list for all the handlers
    # we trim the timeouts once a while
    self._timeout_offset = 0   # last checked position for timeout
    self._handler_to_timeouts = {}  # key: handler value: index in timeouts 为了高效删除 list 里的元素

    if is_local:
        listen_addr = config['local_address']
        listen_port = config['local_port']
    else:
        listen_addr = config['server']
        listen_port = config['server_port']
    self._listen_port = listen_port

    addrs = socket.getaddrinfo(listen_addr, listen_port, 0,
                                socket.SOCK_STREAM, socket.SOL_TCP) # socket.SOCK_STREAM 指定为 tcp
    if len(addrs) == 0:
        raise Exception("can't get addrinfo for %s:%d" %
                        (listen_addr, listen_port))
    af, socktype, proto, canonname, sa = addrs[0]  # family(AF_INET) type(socket.SOCK_STREAM) protocol(IPPROTOTCP) 权威回复 (address, port)
    server_socket = socket.socket(af, socktype, proto)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind(sa)
    server_socket.setblocking(False)
    if config['fast_open']:
        try:
            server_socket.setsockopt(socket.SOL_TCP, 23, 5)
        except socket.error:
            logging.error('warning: fast open is not available')
            self._config['fast_open'] = False
    server_socket.listen(1024)
    self._server_socket = server_socket
    self._stat_callback = stat_callback

参数:


self._config = config
self._is_local = is_local
self._dns_resolver = dns_resolver

将参数存到 self 里面


self._closed = False

标识该 TCP 服务器是不是已经被关闭


self._eventloop = None

该 TCP 服务器所处的 事件循环器, 初始化为 None


self._fd_to_handlers = {}

文件描述符对应其处理器(TCPRelayHandler), 图形描述一下:


eventloop 事件发生 ----> 发现是文件描述符`xx`发生的事件, 找出对应的处理器(e.g. TCPRelay), 调用 handle_event
                            |
               |------------|handle_event
               |            |
               |           \ /
               |        这是一个请求连接我们监听的端口的请求 ¬
               |        accept, 然后将生成的处理该请求的 socket 传递给 TCPRelayHandler ¬
               |        __init__, 创建 TCPRelayHandler 实例, 会将处理该请求的 socket 的文件 ¬
               |       描述符作为 key, self 作为 value, 存放在 _fd_to_handlers 中
              \ /
              不是请求连接我们监听的端口的请求, 说明这是一个已经正在处理中的 socket 发生的事件 ¬
              从 _fd_to_handlers 取出来它对应的 TCPRelayHandler 实例, 调用其 handler_event 处理

这就是 _fd_to_handlers 对象的作用, 变量名也可以望文生义, fd(file descriptor) 对应 handler(TCPRelayHandler实例)


self._is_tunnel = False

TODO: 没懂暂不解释


self._timeout = config['timeout']

从配置文件取出来 timeout 字段, 存下来. 超时时间, 过了这个超时时间, 即便请求没有处理完毕, 也要销毁.


self._timeouts = []  # a list for all the handlers

这里面存放了所有的 handler, 从变量名字上来看, 假设所有的都已经过期, 然后处理的时候判断是不是真的过期了.


self._timeout_offset = 0   # last checked position for timeout

timeouts 里面最后一次检查的位置.


self._handler_to_timeouts = {}

key: handler value: index in timeouts key 是 TCPRelayHandler 的 hash 值, value 为其在 timeouts 中的索引值, 该变量是为了高效删除 timeouts 里的元素.


listen_addr = config['local_address']
listen_port = config['local_port']

从配置文件里面取出来需要监听的本地IP地址(注意, 这里不能填写域名)和端口号.


self._listen_port = listen_port

存下来需要监听的端口号


addrs = socket.getaddrinfo(listen_addr, listen_port, 0,
                                   socket.SOCK_STREAM, socket.SOL_TCP) # socket.SOCK_STREAM 指定为 tcp

这里的 getaddrinfo 是来获取监听这个 IP 地址和端口号需要的所有信息, 有了这些信息, 我们才能监听配置的端口号, 该方法由 socket 模块提供, 其实就是封装了一份 C 里面的方法, 后面可以详细的介绍 C 里面的 socket, 和 python 官方解释器 cpython 是如何封装的, 这里我们知道就就好.


if len(addrs) == 0:
            raise Exception("can't get addrinfo for %s:%d" %
                            (listen_addr, listen_port))

如果 addrlen 为 0 说明获取该IP地址的时候出错, 无法进行下一步的行为, 抛出错误.


af, socktype, proto, canonname, sa = addrs[0]  # family(AF_INET) type(socket.SOCK_STREAM) protocol(IPPROTOTCP) 权威回复 (address, port)

af: address family 地址家庭

type: type

prototocal

canonname: 权威回复

sa: socket address 包含监听该地址的所有信息


server_socket = socket.socket(af, socktype, proto)

创建 socket 对象, 传入在上方拿到的信息.


server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

设置 socket 选项.


server_socket.bind(sa)

绑定地址, 需要一个元组 (address, port)


server_socket.setblocking(False)

非阻塞式监听


server_socket.listen(1024)

开始监听, 传入最大的监听队列数量.


self._server_socket = server_socket

存储下来 socket


self._stat_callback = stat_callback

存储下来 stat 统计函数.

add_to_loop


def add_to_loop(self, loop):
    if self._eventloop:
        raise Exception('already add to loop')
    if self._closed:
        raise Exception('already closed')
    self._eventloop = loop
    self._eventloop.add(self._server_socket,
                        eventloop.POLL_IN | eventloop.POLL_ERR, self)
    self._eventloop.add_periodic(self.handle_periodic)

将自身加入循环器, 传递过来的就是循环器. 如果已经在一个循环器中了, 就抛出错误. 如果已经被关闭, 抛出错误. 存储下来循环器, 然后将自身的 _server_socket 也就是服务器 socket (listen 的 socket), 监听模式为有错误产生(POLL_ERR)和有数据流入(POLL_IN), 然后将自身的周期性函数加入循环器中.

remove_handler


def remove_handler(self, handler):
    index = self._handler_to_timeouts.get(hash(handler), -1) # 从全部的处理器中找到需要移除的处理器
    if index >= 0: # 如果找到了
        # delete is O(n), so we just set it to None
        self._timeouts[index] = None # 将其在_timeouts中的引用置为 None 释放资源
        del self._handler_to_timeouts[hash(handler)]

根据 handler 拿出来该 handler_timeouts 里面的索引值, 这里是为了高效的删除 handler, 因为如果没有这个索引值, 那就要循环整个 _timeouts 数组来找到这个 handler, 速度会慢! 如果找到了, 就置其为 None

注意

update_activity


def update_activity(self, handler, data_len): # handler: TCPReplyHandler, data_len: 活跃的时候传送的字节数
    if data_len and self._stat_callback: # 由实例化 TCPReply 时传递进来的函数, 为了统计需要, 不过该版本的 shadowsocks 并没有用到该统计函数
        self._stat_callback(self._listen_port, data_len)

    # set handler to active
    now = int(time.time())
    if now - handler.last_activity < eventloop.TIMEOUT_PRECISION:
        # thus we can lower timeout modification frequency
        return
    handler.last_activity = now # 更新 TCPReply 的 last_activity
    index = self._handler_to_timeouts.get(hash(handler), -1) # 根据handler 的 ID 拿出来当时的 _timeouts 的长度 其实就是自己在 _timeouts 里面的索引值
    if index >= 0:
        # delete is O(n), so we just set it to None
        self._timeouts[index] = None # 因为在取 length 值得时候自己并不在里面, 所以在自己进入这个 _timeouts 的 list 里面的时候 这个 length 值就是自己的索引值
    length = len(self._timeouts) # 当前的所有 handler 的数量
    self._timeouts.append(handler) # 将当前的 handler 放入 _timeouts 中
    self._handler_to_timeouts[hash(handler)] = length # _handler_to_timeouts 存放的是 处理器 的 ID 对应着处理器自己在 _timeouts 里面的索引值

更新活跃, 参数:


                   |---> 数据长度有效并且统计函数存在, 将监听的端口和数据长度传递给统计函数
                   |
update_activity -- |---> 检测当前时间距离这个 handler 的上次活跃是否已经小于了 eventloop 的时间粒度,
                   |           这里的判断可以降低我们超时修改的频率, 如果没有超过, 直接返回
                   |
                   |---> 如果在 _handler_to_timeouts 里面找到了对应的索引值, 说明该 handler 已经被
                   |           存储下来, 将其对应的位置置为 None
                   |
                   |--- 获取当前的所有的 handler 的长度, 然后将其存储到存放所有的 handler 的数组 _timeouts 中,
                   |    然后将该 handler 在 _timeouts 里面的索引存储下来.

详细的解释一下 _timeouts 变量和 _handler_to_timeouts 这两个变量的对应关系:

这里 handler1, handler2, handler3, 都是在创建的时候被添加, 也就是从 TCPRelayHandler 的构造函数 __init__self._update_activity() 调用的

handler1被创建 -> handler2被创建 -> handler3被创建


  _timeouts           _handler_to_timeouts
+-----------+        +--------------------+
|           |        |                    |
| handler1 ---------------->  0           |
|           |        |                    |
| handler2 ---------------->  1           |
|           | ------>|                    |
| handler3 ---------------->  2           |
|           |        |                    |
|   ...     |        |       ...          |
+-----------+        +--------------------|

在 TCPRelayHandler 的 _on_local_read_on_remote_read 中, 会调用该函数.

handler1 调用 _on_local_read


  _timeouts           _handler_to_timeouts
+-----------+        +--------------------+
|           |        |                    |
|  None    ---------------->  0           |
|           |        |                    |
| handler2 ---------------->  1           |
|           | ------>|                    |
| handler3 ---------------->  2           |
|           |        |                    |
| handler1 ---------------->  3           |
|   ...     |        |       ...          |
+-----------+        +--------------------|

handler1 调用 _on_remote_read


  _timeouts           _handler_to_timeouts
+-----------+        +--------------------+
|           |        |                    |
|  None    ---------------->  0           |
|           |        |                    |
| handler2 ---------------->  1           |
|           | ------>|                    |
| handler3 ---------------->  2           |
|           |        |                    |
|  None    ---------------->  3           |
|           |        |                    |
| handler1 ---------------->  4           |
|   ...     |        |       ...          |
+-----------+        +--------------------|

handler2 调用 _on_local_read


  _timeouts           _handler_to_timeouts
+-----------+        +--------------------+
|           |        |                    |
|  None    ---------------->  0           |
|           |        |                    |
|  None    ---------------->  1           |
|           | ------>|                    |
| handler3 ---------------->  2           |
|           |        |                    |
|  None    ---------------->  3           |
|           |        |                    |
| handler1 ---------------->  4           |
|           |        |                    |
| handler2 ---------------->  5           |
|   ...     |        |       ...          |
+-----------+        +--------------------|

handler2 调用 _on_remote_read


  _timeouts           _handler_to_timeouts
+-----------+        +--------------------+
|           |        |                    |
|  None    ---------------->  0           |
|           |        |                    |
|  None    ---------------->  1           |
|           | ------>|                    |
| handler3 ---------------->  2           |
|           |        |                    |
|  None    ---------------->  3           |
|           |        |                    |
| handler1 ---------------->  4           |
|           |        |                    |
|  None    ---------------->  5           |
|           |        |                    |
| handler2 ---------------->  6           |
|   ...     |        |       ...          |
+-----------+        +--------------------|

以此类推…

我们从 _handler_to_timeouts 取出来索引值比我们循环整个 timeouts 来获取对应的 handler 速度会快很多, 但是会增加点内存占用, 毕竟多了一个变量.

_sweep_timeout

def _sweep_timeout(self):
    # tornado's timeout memory management is more flexible than we need
    # we just need a sorted last_activity queue and it's faster than heapq
    # in fact we can do O(1) (大〇表示法, 算法复杂度) insertion/remove so we invent(发明, 创造) our own
    if self._timeouts:
        logging.log(shell.VERBOSE_LEVEL, 'sweeping timeouts')
        now = time.time()
        length = len(self._timeouts)
        pos = self._timeout_offset
        while pos < length:
            handler = self._timeouts[pos]
            if handler:
                if now - handler.last_activity < self._timeout:
                    break
                else:
                    if handler.remote_address:
                        logging.warn('timed out: %s:%d' %
                                        handler.remote_address)
                    else:
                        logging.warn('timed out')
                    handler.destroy()
                    self._timeouts[pos] = None
                    pos += 1
            else:
                pos += 1
        if pos > TIMEOUTS_CLEAN_SIZE and pos > length >> 1:
            # clean up the timeout queue when it gets larger than half
            # of the queue
            self._timeouts = self._timeouts[pos:]
            for key in self._handler_to_timeouts:
                self._handler_to_timeouts[key] -= pos
            pos = 0
        self._timeout_offset = pos

清除超时的 handler(TCPRelayHandler)

注释翻译:

tornado 的超时内存管理比我们需要的更具弹性 我们只需要一个根据 last_activity 排序的队列, 并且比 headq(堆排序算法) 更快 实际上我们可以做 O(1) 插入/移除, 因此我们创造自己的算法

tornado 是一个 Python 版本的 Web Server, 官网在这里

if self._timeouts:

如果 self._timeouts 数组不为空, 开始处理

now = time.time()
length = len(self._timeouts)
pos = self._timeout_offset
while pos < length:

如果上次处理的地方小于了 _timeouts 的长度, 说明有新的 handler 被添加进来, 需要在这一次的 _sweep_timeout 处理.

handler = self._timeouts[pos]

取出上次处理的下一个 handler, if handler:, 如果 handler 不存在, else: pos += 1, pos 变量加一.

如果 handler 存在, 走向销毁流程:


if now - handler.last_activity < self._timeout: # config 中配置的过期时间, 如果超过此事件, 即便该请求没有处理完也要 destroy 释放资源
    break

一旦发现活跃的 handler 没有超时, break 掉, 不再处理 timeout, 为了能快速响应请求.

if handler.remote_address:
        logging.warn('timed out: %s:%d' %
                        handler.remote_address)
    else:
        logging.info('timed out')
        logging.warn('timed out')
    handler.destroy()
    self._timeouts[pos] = None  # free memory # 其实在 destroy 的时候就已经被置为了空, 这里是多此一举, 不过也可以说是为了保证的确已经被释放
    pos += 1

发现该 handler 已经超时, 调用 handler 的 destory 销毁, 然后将 _timeouts 的 pos 位置 置为 None. 然后 pos += 1.

if pos > TIMEOUTS_CLEAN_SIZE and pos > length >> 1:

如果 pos 大于了 TIMEOUTS_CLEAN_SIZE 常量, 说明超时的已经很多了, 在上面的 destory 中可以看到, 并没有删除在 _timeouts 中的位置, 而是置为了 None, 所以如果超时过多会导致 _timeouts 中元素过多, 并且都是 None, 所以要在这里处理一下, 这里还有一个 and 符号, 第二个判断是 pos > length >> 1 说明 pos 的值是不是超过了 _timeouts 的长度的一半, 这里是位运算符 ».

位运算符对于奇偶数的处理不尽相同, 如果length是偶数则是一半, 奇数则是一半减一

位运算:

/* 偶数 */
const foo = 4
foo >> 1
// 4 / 2 result: 2

/* 奇数 */
const bar = 7
bar >> 1
// 7 / 2 - 1 result: 3
self._timeouts = self._timeouts[pos:]

移除 None 值

for key in self._handler_to_timeouts:
    self._handler_to_timeouts[key] -= pos

每一个处理器存着自己被update当时的处理器的数量, 这里移除了 None 也要将所有的处理器 对应的 减去这些被删除的 None 值的数量

pos = 0

重置 pos 值为0

self._timeout_offset = pos

更新 TCPReply 上的变量, 来为下一次到这里用到. 分段式处理思维.

handle_event

 def handle_event(self, sock, fd, event):
    if sock:
        logging.log(shell.VERBOSE_LEVEL, 'fd %d %s', fd,
                    eventloop.EVENT_NAMES.get(event, event))
    if sock == self._server_socket:
        if event & eventloop.POLL_ERR:
            # TODO
            raise Exception('server_socket error')
        try:
            logging.debug('accept')
            conn = self._server_socket.accept()
            TCPRelayHandler(self, self._fd_to_handlers,
                            self._eventloop, conn[0], self._config,
                            self._dns_resolver, self._is_local)
        except (OSError, IOError) as e:
            error_no = eventloop.errno_from_exception(e)
            if error_no in (errno.EAGAIN, errno.EINPROGRESS,
                            errno.EWOULDBLOCK):
                return
            else:
                shell.print_exception(e)
                if self._config['verbose']:
                    traceback.print_exc()
    else: # 发生事件的 socket 是一个非服务端 socket, 这是处理 socks5 客户端请求的一个 socket
        if sock:
            handler = self._fd_to_handlers.get(fd, None)
            if handler:
                handler.handle_event(sock, event)
        else:
            logging.warn('poll removed fd')

最重要的方法, TCP 请求都会传递到这里, 并且传入参数:

如果 sock 存在, 打印请求的 fd 以及发生的事件模式

然后判断发生事件的是不是我们的服务器 socket, 如果是服务器的 socket 发生了事件, 说明这是有请求流入, 我们走向服务器流程.

下面是处理的代码:

if sock == self._server_socket: # 如果发生事件的 socket 是我们监听的 socket
    if event & eventloop.POLL_ERR:
        # TODO
        raise Exception('server_socket error')
    try:
        logging.debug('accept')
        conn = self._server_socket.accept() # socks5 服务端的 socket 接受连接, 将会创建一个新的 socket 来处理客户端的请求 conn[0]是新创建的这个 socket conn[1]是连接过来的客户端的ip和port example: ('127.0.0.1', 9999)
        TCPRelayHandler(self, self._fd_to_handlers, # 这里将 _fd_to_handlers 传递给 TCPRelayHandler, 他会将请求的 socket 映射到这个 dict 里面
                        self._eventloop, conn[0], self._config,
                        self._dns_resolver, self._is_local)
    except (OSError, IOError) as e:
        error_no = eventloop.errno_from_exception(e)
        if error_no in (errno.EAGAIN, errno.EINPROGRESS,
                        errno.EWOULDBLOCK):
            return
        else:
            shell.print_exception(e)
            if self._config['verbose']:
                traceback.print_exc()

图形化流程

                          |------> 如果 event 是 POLL_ERR, 说明我们监听的 socket 出现了错误, 无法继续, 抛出来错误❌
                          |
is server socket ---------|----|--> try 接受客户端的连接请求, 并创建和该请求相关的 TCPRelayHandler
                               |
                               |--> except 接受请求的时候发生了错误

如果发生事件的不是 server socket, 说明这是一个已有的请求发生的事件

else: # 发生事件的 socket 是一个非服务端 socket, 这是处理 socks5 客户端请求的一个 socket
    if sock:
        handler = self._fd_to_handlers.get(fd, None)
        if handler:
            handler.handle_event(sock, event)
    else:
        logging.warn('poll removed fd')

图形化流程:

                          |-----> 如果发生事件的 socket 存在, 根据文件描述符取出来和该请求相关的 TCPRelayHandler
                          |             |
not server socket --------|             |-----> 如果handler存在, 调用其 handler_event 并且传入 socket  事件模式
                          |
                          |-----> 如果发生事件的 socket 不存在, 说明 poll 已经移除了该文件描述符的监听

handle_periodic

周期性的处理 TCPRelay 中的数据, 每隔几秒之后都会执行到这里.

def handle_periodic(self):
    logging.info('周期性的处理 tcpreply 的 timeouts')
    if self._closed:
        if self._server_socket:
            self._eventloop.remove(self._server_socket)
            self._server_socket.close()
            self._server_socket = None
            logging.info('closed TCP port %d', self._listen_port)
        if not self._fd_to_handlers:
            logging.info('stopping')
            self._eventloop.stop()
    self._sweep_timeout()

图形化过程:

                         |--- YES ----> 当前的 server socket 是否存在 -- YES ---> 从 eventloop中移除 server socket,
                         |                                         |           调用 close 关闭 socket,
                         |                                         |           去除和该 socket 的引用
                         |                                         |---- NO ---> CONTINUE
                         |                                         |
                         |                                         |-----> _fd_to_handlers 如果为空, 
                         |                                                 则证明已经处理完所有的请求,
                         |                                                 可以关闭 eventloop了
                         |
当前是否处于关闭状态 -------|--- NO -----> CONTINUE


清扫超时的请求 --> 可以看上面 _sweep_timeout 方法

close

关闭 TCP 监听服务器

def close(self, next_tick=False):
    logging.debug('TCP close')
    self._closed = True
    if not next_tick:
        if self._eventloop:
            self._eventloop.remove_periodic(self.handle_periodic)
            self._eventloop.remove(self._server_socket)
        self._server_socket.close()
        for handler in list(self._fd_to_handlers.values()):
            handler.destroy()

参数:

  1. self 自身
  2. next_tick 是马上关闭服务器还是等待处理完所有的请求连接之后再关闭

logging.debug('TCP close') 打印日志

self._closed = true 服务器已经关闭的标志位

if not next_tick: 如果不等待处理已经建立的请求, 直接强制关闭 TCP 服务器

if self._eventloop:
    self._eventloop.remove_periodic(self.handle_periodic)
    self._eventloop.remove(self._server_socket)

_eventloop 中移除周期性的函数 self.handle_periodic_eventloop 中移除对于 self._server_socket socket 的监听

for handler in list(self._fd_to_handlers.values()):
    handler.destroy()

destroy 掉所有存在的 处理器(TCPRelayHandler).

tags: