deepinss

Deep in shadowsocks.

View on GitHub
15 January 2018

If you want to keep a secret, you must also hide it from yourself. 👽

by

如果你想保住一个秘密, 你必须从你开始就不知道它

cryptor 模块成就了我们的***, 隐藏自己. 支持常见的加密算法以及已经不太安全的加密算法也被支持.

TL;DR

导入(import)

from __future__ import absolute_import, division, print_function, \
  with_statement

import os
import sys
import hashlib
import logging

from shadowsocks import common
from shadowsocks.crypto import rc4_md5, openssl, mbedtls, sodium, table

引入 os, sys, hashlib, logging 模块

常量(constants)

CIPHER_ENC_ENCRYPTION = 1
CIPHER_ENC_DECRYPTION = 0

加密标志位为 1, 解密标志位为 0, 不是 SS 规定的, Openssl 规定的:

EVP_CipherInit_ex(), EVP_CipherUpdate() and EVP_CipherFinal_ex() are functions that can be used for decryption or encryption. The operation performed depends on the value of the enc parameter. It should be set to 1 for encryption, 0 for decryption and -1 to leave the value unchanged (the actual value of ‘enc’ being supplied in a previous call).

引用自: https://www.openssl.org/docs/man1.1.0/crypto/EVP_CIPHER_CTX_new.html

METHOD_INFO_KEY_LEN = 0
METHOD_INFO_IV_LEN = 1
METHOD_INFO_CRYPTO = 2

method_supported = {}
method_supported.update(rc4_md5.ciphers)
method_supported.update(openssl.ciphers)
method_supported.update(mbedtls.ciphers)
method_supported.update(sodium.ciphers)
method_supported.update(table.ciphers)

在文件模块导入阶段 from shadowsocks.crypto import rc4_md5, openssl, mbedtls, sodium, table 把所有支持的方法全部导入进来.

看一下 rc4_md5 导出来的变量:

...
ciphers = {
    'rc4-md5': (16, 16, create_cipher),
}
...

变量指向:

METHOD_INFO_L_KEY_LEN => 16

METHOD_INFO_IV_LEN => 16

METHOD_INFO_CRYPTO => create_cipher

把所有支持的加密算法放到 method_supported 变量里面.

辅助函数/变量

random_string

def random_string(length):
    return os.urandom(length)

根据 length 获取随机字符串.

os.urandom

cached_keys

cached_keys = {}

根据 password 扩展成长字符串.

try_cipher

def try_cipher(key, method=None, crypto_path=None):
    Cryptor(key, method, crypto_path)

尝试算法, 是为了在检查配置文件的时候检查加密算否是否被支持, 如果不被支持会报错导致服务无法启动.

EVP_BytesToKey

def EVP_BytesToKey(password, key_len, iv_len):
    # equivalent to OpenSSL's EVP_BytesToKey() with count 1
    # so that we make the same key and iv as nodejs version
    cached_key = '%s-%d-%d' % (password, key_len, iv_len)
    r = cached_keys.get(cached_key, None)
    if r:
        return r
    m = []
    i = 0
    while len(b''.join(m)) < (key_len + iv_len):
        md5 = hashlib.md5()
        data = password
        if i > 0:
            data = m[i - 1] + password
        md5.update(data)
        m.append(md5.digest())
        i += 1
    ms = b''.join(m)
    key = ms[:key_len]
    iv = ms[key_len:key_len + iv_len]
    cached_keys[cached_key] = (key, iv)
    return key, iv

根据 password 和 key_len, iv_len 来扩展 password, 算法采用的是openssl-EVP_BytesToKey

根据注释来看, 没有使用 openssl 内置的 EVP_BytesToKey 加密算法是为了和 nodejs 版本的 shadowsocks 保持一致, 不过现在的 nodejs 版本的 shadowsocks 已经停止维护了.

cached_key = '%s-%d-%d' % (password, key_len, iv_len) 生成缓存的key, 同样的 password 生成同样的 (key, iv), 所以可以做缓存.

加密类 Crypto

class Crypto(object):
    ...

各类加密算法的 wrapper, 传递加密算法名, 然后调用其对用的加密方法进行加密.

__init__

def __init__(self, password, method, crypto_path=None):
    """
    Crypto wrapper
    :param password: str cipher password
    :param method: str cipher
    :param crypto_path: dict or none
        {'openssl': path, 'sodium': path, 'mbedtls': path}
    """
    self.password = password
    self.key = None
    self.method = method
    self.iv_sent = False # 加密向量需要双方共享, 是否被发送的标志位
    self.cipher_iv = b''
    self.decipher = None # 解密
    self.decipher_iv = None # 解密向量
    self.crypto_path = crypto_path
    method = method.lower()
    self._method_info = Cryptor.get_method_info(method)
    if self._method_info:
        self.cipher = self.get_cipher(
            password, method, CIPHER_ENC_ENCRYPTION,
            random_string(self._method_info[METHOD_INFO_IV_LEN])
        )
    else:
        logging.error('method %s not supported' % method)
        sys.exit(1)

参数:

  1. self: 实例
  2. password 加密秘钥, 就是配置文件(json 格式)里面的 password 字段
  3. method 加密算法名, 就是配置文件中的 method 字段
  4. crypto_path 自定义加密库的路径, 如果为配置, 将从系统路径中去找

cipher_iv 为初始向量, 见维基百科解释(英文版本)

                       |-------------> password 用户加密秘钥
                       |-------------> key 生成的加密秘钥
instance(实例) ---------|------------> method 加密算法
                       |-------------> iv_sent 初始化向量是否已经被发送
                       |-------------> cipher_iv 初始化向量
                       |-------------> decipher 解密
                       |-------------> decipher_iv 解密的初始化向量
                       |-------------> cryptor_path 加密库的地址
                       |-------------> _method_info 根据 method 获得的关于该加密算法的信息
                       |-------------> cipher method 加密类实例

if self._method_info: 如果关于指定的该加密算法不支持, sys.exit(1) 抛出错误已经不能解决问题了, 直接退出进程.

get_method_info

@staticmethod
def get_method_info(method):
    method = method.lower()
    m = method_supported.get(method)
    return m

@staticmethod 装饰器让该方法成为类的静态方法

参数:

  1. method: 根据加密算法名获取关于该加密算法的信息
  2. @return (ken_len, iv_len, 创建该加密算法的方法)

iv_len

def iv_len(self):
    return len(self.cipher_iv)

获取实例的加密向量(cipher_iv)的长度

get_cipher

def get_cipher(self, password, method, op, iv):
    password = common.to_bytes(password)
    m = self._method_info
    if m[METHOD_INFO_KEY_LEN] > 0:
        key, _ = EVP_BytesToKey(password,
                                m[METHOD_INFO_KEY_LEN],
                                m[METHOD_INFO_IV_LEN])
    else:
        # key_length == 0 indicates we should use the key directly
        key, iv = password, b''
    self.key = key
    iv = iv[:m[METHOD_INFO_IV_LEN]]
    if op == CIPHER_ENC_ENCRYPTION:
        # this iv is for cipher not decipher
        self.cipher_iv = iv
    return m[METHOD_INFO_CRYPTO](method, key, iv, op, self.crypto_path)

参数:

  1. self: 实例本身
  2. password: 加密秘钥
  3. method: 加密算法
  4. op: 操作(是加密(CIPHER_ENC_ENCRYPTION)还是解密(CIPHER_ENC_DECRYPTION))
  5. iv: 调用 random_string 生成的随机字符串

if m[METHOD_INFO_KEY_LEN] > 0: 如果该加密算法需要的加密秘钥长度大于零, 说明需要根据 password 扩展出来 (key, iv), key, _ = EVP_BytesToKey(password, m[METHOD_INFO_KEY_LEN], m[METHOD_INFO_IV_LEN]) 调用 EVP_BytesToKey 扩展 password.

else: 指示应该直接使用 password

self.key = key 将 key 存储在 self 上

iv = iv[:m[METHOD_INFO_IV_LEN]] 取出该加密算法需要的向量长度

if op == CIPHER_ENC_ENCRYPTION: 如果是加密过程, self.cipher_iv = iv 将该向量存储在 self 上

return m[METHOD_INFO_CRYPTO](method, key, iv, op, self.crypto_path) 取出 m 中的加密函数类然后传入参数创建加密实例, 等到后面读加密模块的代码会提到这里.

encrypt

def encrypt(self, buf):
    if len(buf) == 0:
        return buf
    if self.iv_sent:
        return self.cipher.encrypt(buf)
    else:
        self.iv_sent = True
        return self.cipher_iv + self.cipher.encrypt(buf)

参数:

  1. self: 实例本身
  2. buf: 需要加密的数据

if len(buf) == 0: 如果 buf 为空, 就不用加密了, 什么也没有就是最好的加密方式

if self.iv_sent: 如果 iv 向量已经被发送了, 直接把数据加密, 然后发送就好

else: 如果 iv 向量没有被发送, 拼接在有效负载数据的前端, 后面拼接上加密的数据, 在这里会标识 iv 向量已经被发送(后面会解释为什么需要 iv_sent 变量).

decrypt

def decrypt(self, buf):
    if len(buf) == 0:
        return buf
    if self.decipher is None:
        decipher_iv_len = self._method_info[METHOD_INFO_IV_LEN]
        decipher_iv = buf[:decipher_iv_len]
        self.decipher_iv = decipher_iv
        self.decipher = self.get_cipher(
            self.password, self.method,
            CIPHER_ENC_DECRYPTION,
            decipher_iv
        )
        buf = buf[decipher_iv_len:]
        if len(buf) == 0:
            return buf
    return self.decipher.decrypt(buf)

参数:

  1. self: 实例本身
  2. buf: 加密的数据(完全不可读)

if len(buf) == 0: 如果什么都没有, 不必要解密

if self.decipher is None: 如果 self.decipher 为 None, 说明是第一次解密关于该次连接的数据, 需要创建解密的类实例

decipher_iv_len = self._method_info[METHOD_INFO_IV_LEN] 取出该加密算法的初始化向量(iv)的长度

decipher_iv = buf[:decipher_iv_len] 从加密的数据中取出来解密向量

self.decipher_iv = decipher_iv 将解密向量存储在 self 上

self.decipher = self.get_cipher( self.password, self.method, CIPHER_ENC_DECRYPTION, decipher_iv) 获取解密类实例, 传入的是 CIPHER_ENC_DECRYPTION 解密标志位常量

buf = buf[decipher_iv_len:] 从 buf 中删除初始化向量(iv), 因为初始化向量只是用来解密数据, 并不是数据的一部分

if len(buf) == 0: 如果删除之后没有了其他数据, 说明没有任何数据, ssclient 只是传递了初始化向量, return buf 直接返回删除了出事黄两项的 buf

return self.decipher.decrypt(buf) 调用 decipherdecrypt 解密被加密的数据

测试代码

该文件的其余部分都是测试部分的代码, 暂时不与解释.

class Cryptor(object):
    def __init__(self, password, method, crypto_path=None):
        """
        Crypto wrapper
        :param password: str cipher password
        :param method: str cipher
        :param crypto_path: dict or none
            {'openssl': path, 'sodium': path, 'mbedtls': path}
        """
        self.password = password
        self.key = None
        self.method = method
        self.iv_sent = False
        self.cipher_iv = b''
        self.decipher = None
        self.decipher_iv = None
        self.crypto_path = crypto_path
        method = method.lower()
        self._method_info = Cryptor.get_method_info(method)
        if self._method_info:
            self.cipher = self.get_cipher(
                password, method, CIPHER_ENC_ENCRYPTION,
                random_string(self._method_info[METHOD_INFO_IV_LEN])
            )
        else:
            logging.error('method %s not supported' % method)
            sys.exit(1)

    @staticmethod
    def get_method_info(method):
        method = method.lower()
        m = method_supported.get(method)
        return m

    def iv_len(self):
        return len(self.cipher_iv)

    def get_cipher(self, password, method, op, iv):
        password = common.to_bytes(password)
        m = self._method_info
        if m[METHOD_INFO_KEY_LEN] > 0:
            key, _ = EVP_BytesToKey(password,
                                    m[METHOD_INFO_KEY_LEN],
                                    m[METHOD_INFO_IV_LEN])
        else:
            # key_length == 0 indicates we should use the key directly
            key, iv = password, b''
        self.key = key
        iv = iv[:m[METHOD_INFO_IV_LEN]]
        if op == CIPHER_ENC_ENCRYPTION:
            # this iv is for cipher not decipher
            self.cipher_iv = iv
        return m[METHOD_INFO_CRYPTO](method, key, iv, op, self.crypto_path)

    def encrypt(self, buf):
        if len(buf) == 0:
            return buf
        if self.iv_sent:
            return self.cipher.encrypt(buf)
        else:
            self.iv_sent = True
            return self.cipher_iv + self.cipher.encrypt(buf)

    def decrypt(self, buf):
        if len(buf) == 0:
            return buf
        if self.decipher is None:
            decipher_iv_len = self._method_info[METHOD_INFO_IV_LEN]
            decipher_iv = buf[:decipher_iv_len]
            self.decipher_iv = decipher_iv
            self.decipher = self.get_cipher(
                self.password, self.method,
                CIPHER_ENC_DECRYPTION,
                decipher_iv
            )
            buf = buf[decipher_iv_len:]
            if len(buf) == 0:
                return buf
        return self.decipher.decrypt(buf)
					
跳转函数列表
tags: