deepinss

Deep in shadowsocks.

View on GitHub
1 February 2018

eventloop.py

by

eventloop.py 事件轮询器

** 2018.2.5 完结 **

该模块负责监听所有的 socket, 获取这些 socket 发生的事件, 然后将其分发给注册的监听器

import

from __future__ import absolute_import, division, print_function, \
    with_statement

import os
import time
import socket
import select
import traceback
import errno
import logging
from collections import defaultdict

from shadowsocks import shell

导出

__all__ = ['EventLoop', 'POLL_NULL', 'POLL_IN', 'POLL_OUT', 'POLL_ERR',
           'POLL_HUP', 'POLL_NVAL', 'EVENT_NAMES']

__all__ 声明从该模块导出的变量, 该变量在 from xxx import * 生效. 详细见这里.

模块常量

POLL_NULL = 0x00
POLL_IN = 0x01
POLL_OUT = 0x04
POLL_ERR = 0x08
POLL_HUP = 0x10
POLL_NVAL = 0x20

EVENT_NAMES = {
    POLL_NULL: 'POLL_NULL',
    POLL_IN: 'POLL_IN',
    POLL_OUT: 'POLL_OUT',
    POLL_ERR: 'POLL_ERR',
    POLL_HUP: 'POLL_HUP',
    POLL_NVAL: 'POLL_NVAL',
}
TIMEOUT_PRECISION = 10

POLL_* 变量标识发生的一个事件的数据状态

class KqueueLoop

简介

Kqueue 形式的轮询, 获取发生的事件. 支持的平台有 NetBSD, OpenBSD, DragonflyBSD, and OS X(数据来源自维基百科)

类常量

MAX_EVENTS = 1024

一次循环中取出来的最多的事件数量

类构造器 __init__

def __init__(self):
    self._kqueue = select.kqueue()
    self._fds = {}

select.kqueue() 创建一个 kqueue 事件获取方式

_fds 存储着 file descriptor(文件描述符) -> 对应的监听的 mode(POLL_IN 或 POLL_OUT)

_control

简介

监听或者是移除监听一个文件描述符的状态, 该函数为私有函数

def _control(self, fd, mode, flags):
    events = []
    if mode & POLL_IN:
        events.append(select.kevent(fd, select.KQ_FILTER_READ, flags))
    if mode & POLL_OUT:
        events.append(select.kevent(fd, select.KQ_FILTER_WRITE, flags))
    for e in events:
        self._kqueue.control([e], 0)

接收参数

交互式程序流

def _control(self, fd, mode, flags):
    events = []
    if mode & POLL_IN:
        events.append(select.kevent(fd, select.KQ_FILTER_READ, flags))
    if mode & POLL_OUT:
        events.append(select.kevent(fd, select.KQ_FILTER_WRITE, flags))
    for e in events:
        self._kqueue.control([e], 0)
					
跳转函数列表

poll

简介

取出来发生的事件, 一次取出来的最多的事件数量为 KqueueLoop.MAX_EVENTS, 超时时间为 timeout, 如果超过了超时时间依旧没有事件发生, 程序继续往下走

def poll(self, timeout):
    if timeout < 0:
        timeout = None  # kqueue behaviour
    events = self._kqueue.control(
        None, KqueueLoop.MAX_EVENTS, timeout
    )
    results = defaultdict(lambda: POLL_NULL)
    for e in events:
        fd = e.ident
        if e.filter == select.KQ_FILTER_READ:
            results[fd] |= POLL_IN
        elif e.filter == select.KQ_FILTER_WRITE:
            results[fd] |= POLL_OUT
    return results.items()

接收参数

注意点

defaultdict 类 在给一个 key 赋值的时候, 该 key 不存在, 就会调用 lambda: POLL_NULL, 这就是 POLL_NULL 常量的用处.

交互式程序流

def poll(self, timeout):
    if timeout < 0:
        timeout = None  # kqueue behaviour
    events = self._kqueue.control(
        None, KqueueLoop.MAX_EVENTS, timeout
    )
    results = defaultdict(lambda: POLL_NULL)
    for e in events:
        fd = e.ident
        if e.filter == select.KQ_FILTER_READ:
            results[fd] |= POLL_IN
        elif e.filter == select.KQ_FILTER_WRITE:
            results[fd] |= POLL_OUT
    return results.items()
					
跳转函数列表

register

简介

注册一个 fd(文件描述符) 进入监听序列, 监听模式为 mode

def register(self, fd, mode):
    self._fds[fd] = mode # 文件描述符和其监听模式的对应
    self._control(fd, mode, select.KQ_EV_ADD) # 添加注册 传入 Kqueue 的添加常量

接收参数

注意点

self._fds[fd] = mode 将监听的文件描述符和其监听的模式存储在 _fd dict 中, 方便在移除的时候取出来该文件描述符在监听时候的 mode

交互式程序流

def register(self, fd, mode):
    self._fds[fd] = mode # 文件描述符和其监听模式的对应
    self._control(fd, mode, select.KQ_EV_ADD) # 添加注册 传入 Kqueue 的添加常量
					
跳转函数列表

unregister

简介

移除注册在监听序列中的 fd(文件描述符), 其监听模式已经存储在 self._fds

def unregister(self, fd):
    self._control(fd, self._fds[fd], select.KQ_EV_DELETE) # KQ_EN_DELETE 删除注册 self._fds 拿出来注册该文件描述符的时候的模式
    del self._fds[fd] # 移除 KqueueLopp 关于该文件描述符的数据(mode)

接收参数

注意点

移除注册时候不需要传递 mode, 因为在监听的时候已经保存在 self._fds

交互式程序流

def unregister(self, fd):
    self._control(fd, self._fds[fd], select.KQ_EV_DELETE) # KQ_EN_DELETE 删除注册 self._fds 拿出来注册该文件描述符的时候的模式
    del self._fds[fd] # 移除 KqueueLopp 关于该文件描述符的数据(mode)
					
跳转函数列表

modify

简介

修改一个文件描述符的监听模式, 从 POLL_IN 到 POLL_OUT 或者是反过来

def modify(self, fd, mode):
    self.unregister(fd) # 删除注册 file descriptor => mode
    self.register(fd, mode) # 重新注册 file descriptor => mode

接收参数

交互式程序流

def modify(self, fd, mode):
    self.unregister(fd) # 删除注册 file descriptor => mode
    self.register(fd, mode) # 重新注册 file descriptor => mode
					
跳转函数列表

close

简介

关闭该 kqueue 事件监听器

def close(self):
    self._kqueue.close()

接收参数

class EventLoop

事件轮询器, 根据不同的平台创建不同的时间获取方式 epoll, kqueue(https://en.wikipedia.org/wiki/Kqueue), select(https://en.wikipedia.org/wiki/Select_(Unix)). 处理周期性的函数, 比如: 过期请求的销毁.将发生事件的文件描述符分发给相应的处理器.

__init__ 构造函数

简介

处理不同的平台获取事件的不同方式, 初始化一些变量

def __init__(self):
    if hasattr(select, 'epoll'):
        self._impl = select.epoll()
        model = 'epoll'
    elif hasattr(select, 'kqueue'):
        self._impl = KqueueLoop()
        model = 'kqueue'
    elif hasattr(select, 'select'):
        self._impl = SelectLoop()
        model = 'select'
    else:
        raise Exception('can not find any available functions in select '
                        'package')
    self._fdmap = {}  # (f, handler)
    self._last_time = time.time()
    self._periodic_callbacks = []
    self._stopping = False
    logging.debug('using event model: %s', model)

接收参数

注意点

交互式程序流

def __init__(self):
    if hasattr(select, 'epoll'):
        self._impl = select.epoll()
        model = 'epoll'
    elif hasattr(select, 'kqueue'):
        self._impl = KqueueLoop()
        model = 'kqueue'
    elif hasattr(select, 'select'):
        self._impl = SelectLoop()
        model = 'select'
    else:
        raise Exception('can not find any available functions in select '
                        'package')
    self._fdmap = {}  # (f, handler)
    self._last_time = time.time()
    self._periodic_callbacks = []
    self._stopping = False
    logging.debug('using event model: %s', model)
					
跳转函数列表

poll

简介

获取事件

def poll(self, timeout=None):
    events = self._impl.poll(timeout)
    return [(self._fdmap[fd][0], fd, event) for fd, event in events]

接收参数

解读

调用 selv._impl.poll 获取事件, events 为获取出来的事件列表, 格式为

[(
  fd, # 文件按描述
  mode # 发生的事件类型
), (
  fd, # 文件描述符
  mode # 发生的事件类型
)]

数组推导式来获取关于该文件描述符的文件, 最后返回的数据格式为:

[(
  f, # 文件
  fd, # 文件描述符
  mode # 发生的事件类型
), (
  f, # 文件
  fd, # 文件描述符
  mode # 发生的事件类型
)]

add

将文件加入监听序列

def add(self, f, mode, handler):
    fd = f.fileno()
    self._fdmap[fd] = (f, handler)
    self._impl.register(fd, mode)

接收参数

交互式程序流

def add(self, f, mode, handler):
    fd = f.fileno()
    self._fdmap[fd] = (f, handler)
    self._impl.register(fd, mode)
					
跳转函数列表

remove

移除 f 的监听

def remove(self, f):
    fd = f.fileno()
    del self._fdmap[fd]
    self._impl.unregister(fd)

接收参数

程序执行流

def remove(self, f):
    fd = f.fileno()
    del self._fdmap[fd]
    self._impl.unregister(fd)
					
跳转函数列表

add_periodic

添加周期性函数

def add_periodic(self, callback):
    self._periodic_callbacks.append(callback)

接收参数

remove_periodic

移除周期性函数

def remove_periodic(self, callback):
    self._periodic_callbacks.remove(callback)

接收参数

modify

修改一个 f 的监听模式

def modify(self, f, mode):
    fd = f.fileno()
    self._impl.modify(fd, mode)

接收参数

交互式程序流

def modify(self, f, mode):
    fd = f.fileno()
    self._impl.modify(fd, mode)
					
跳转函数列表

stop

标识 self._stopping 为 True, 停止轮询器, 不再获取事件.

def stop(self):
    self._stopping = True

run

简介

启动事件轮询器, 获取事件并将其分发给相应的处理器, 调用周期性的函数

def run(self):
    events = []
    while not self._stopping:
        asap = False
        try:
            events = self.poll(TIMEOUT_PRECISION)
        except (OSError, IOError) as e:
            if errno_from_exception(e) in (errno.EPIPE, errno.EINTR):
                # EPIPE: Happens when the client closes the connection
                # EINTR: Happens when received a signal
                # handles them as soon as possible
                asap = True
                logging.debug('poll:%s', e)
            else:
                logging.error('poll:%s', e)
                traceback.print_exc()
                continue

        for sock, fd, event in events:
            handler = self._fdmap.get(fd, None)
            if handler is not None:
                handler = handler[1]
                try:
                    handler.handle_event(sock, fd, event)
                except (OSError, IOError) as e:
                    shell.print_exception(e)
        now = time.time()
        if asap or now - self._last_time >= TIMEOUT_PRECISION:
            for callback in self._periodic_callbacks:
                callback()
            self._last_time = now

接收参数

注意点

交互式程序流

def run(self):
    events = []
    while not self._stopping:
        asap = False
        try:
            events = self.poll(TIMEOUT_PRECISION)
        except (OSError, IOError) as e:
            if errno_from_exception(e) in (errno.EPIPE, errno.EINTR):
                # EPIPE: Happens when the client closes the connection
                # EINTR: Happens when received a signal
                # handles them as soon as possible
                asap = True
                logging.debug('poll:%s', e)
            else:
                logging.error('poll:%s', e)
                traceback.print_exc()
                continue

        for sock, fd, event in events:
            handler = self._fdmap.get(fd, None)
            if handler is not None:
                handler = handler[1]
                try:
                    handler.handle_event(sock, fd, event)
                except (OSError, IOError) as e:
                    shell.print_exception(e)
        now = time.time()
        if asap or now - self._last_time >= TIMEOUT_PRECISION:
            for callback in self._periodic_callbacks:
                callback()
            self._last_time = now
					
跳转函数列表

del

在该变量被垃圾回收机制回收的时候做的事情

def __del__(self):
    self._impl.close()

调用 self._impl.close 关闭获取事件的实例, KqueueLoop.close

errno_from_exception

从异常对象里获取异常的 errno.

来源于 tornado

# from tornado
def errno_from_exception(e):
    """Provides the errno from an Exception object.

    There are cases that the errno attribute was not set so we pull
    the errno out of the args but if someone instatiates an Exception
    without any args you will get a tuple error. So this function
    abstracts all that behavior to give you a safe way to get the
    errno.
    """

    if hasattr(e, 'errno'):
        return e.errno
    elif e.args:
        return e.args[0]
    else:
        return None

接收参数

get_sock_error

获取 socket 文件的错误, 在异步处理一个 socket 文件的时候, 如果该 socket 出现了错误, 需要通过这个函数来获取错误信息.

# from tornado
def get_sock_error(sock):
    error_number = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
    return socket.error(error_number, os.strerror(error_number))

接受参数

参考连接: https://stackoverflow.com/questions/21031717/so-error-vs-errno

在线命令行工具
tags: