If you want to keep a secret, you must also hide it from yourself. 👽
by
如果你想保住一个秘密, 你必须从你开始就不知道它
cryptor 模块成就了我们的***, 隐藏自己. 支持常见的加密算法以及已经不太安全的加密算法也被支持.
TL;DR
导入(import)
from __future__ import absolute_import, division, print_function, \
with_statement
import os
import sys
import hashlib
import logging
from shadowsocks import common
from shadowsocks.crypto import rc4_md5, openssl, mbedtls, sodium, table
引入 os
, sys
, hashlib
, logging
模块
常量(constants)
CIPHER_ENC_ENCRYPTION = 1
CIPHER_ENC_DECRYPTION = 0
加密标志位为 1
, 解密标志位为 0
, 不是 SS 规定的, Openssl 规定的:
EVP_CipherInit_ex(), EVP_CipherUpdate() and EVP_CipherFinal_ex() are functions that can be used for decryption or encryption. The operation performed depends on the value of the enc parameter. It should be set to 1 for encryption, 0 for decryption and -1 to leave the value unchanged (the actual value of ‘enc’ being supplied in a previous call).
引用自: https://www.openssl.org/docs/man1.1.0/crypto/EVP_CIPHER_CTX_new.html
METHOD_INFO_KEY_LEN = 0
METHOD_INFO_IV_LEN = 1
METHOD_INFO_CRYPTO = 2
method_supported = {}
method_supported.update(rc4_md5.ciphers)
method_supported.update(openssl.ciphers)
method_supported.update(mbedtls.ciphers)
method_supported.update(sodium.ciphers)
method_supported.update(table.ciphers)
在文件模块导入阶段 from shadowsocks.crypto import rc4_md5, openssl, mbedtls, sodium, table
把所有支持的方法全部导入进来.
- METHOD_INFO_L_KEY_LEN: _method_info 索引值, 代表的是 KEY 的长度
- METHOD_INFO_IV_LEN: _method_info 索引值, 代表的是 IV(初始化向量) 的长度
- METHOD_INFO_CRYPTO: _method_info 索引值, 代表的是加密的函数
看一下 rc4_md5
导出来的变量:
...
ciphers = {
'rc4-md5': (16, 16, create_cipher),
}
...
变量指向:
METHOD_INFO_L_KEY_LEN => 16
METHOD_INFO_IV_LEN => 16
METHOD_INFO_CRYPTO => create_cipher
把所有支持的加密算法放到 method_supported
变量里面.
辅助函数/变量
random_string
def random_string(length):
return os.urandom(length)
根据 length
获取随机字符串.
cached_keys
cached_keys = {}
根据 password
扩展成长字符串.
try_cipher
def try_cipher(key, method=None, crypto_path=None):
Cryptor(key, method, crypto_path)
尝试算法, 是为了在检查配置文件的时候检查加密算否是否被支持, 如果不被支持会报错导致服务无法启动.
EVP_BytesToKey
def EVP_BytesToKey(password, key_len, iv_len):
# equivalent to OpenSSL's EVP_BytesToKey() with count 1
# so that we make the same key and iv as nodejs version
cached_key = '%s-%d-%d' % (password, key_len, iv_len)
r = cached_keys.get(cached_key, None)
if r:
return r
m = []
i = 0
while len(b''.join(m)) < (key_len + iv_len):
md5 = hashlib.md5()
data = password
if i > 0:
data = m[i - 1] + password
md5.update(data)
m.append(md5.digest())
i += 1
ms = b''.join(m)
key = ms[:key_len]
iv = ms[key_len:key_len + iv_len]
cached_keys[cached_key] = (key, iv)
return key, iv
根据 password 和 key_len, iv_len 来扩展 password, 算法采用的是openssl-EVP_BytesToKey
根据注释来看, 没有使用 openssl 内置的 EVP_BytesToKey
加密算法是为了和 nodejs
版本的 shadowsocks
保持一致, 不过现在的 nodejs 版本的 shadowsocks
已经停止维护了.
cached_key = '%s-%d-%d' % (password, key_len, iv_len)
生成缓存的key, 同样的 password 生成同样的 (key, iv), 所以可以做缓存.
加密类 Crypto
class Crypto(object):
...
各类加密算法的 wrapper, 传递加密算法名, 然后调用其对用的加密方法进行加密.
__init__
def __init__(self, password, method, crypto_path=None):
"""
Crypto wrapper
:param password: str cipher password
:param method: str cipher
:param crypto_path: dict or none
{'openssl': path, 'sodium': path, 'mbedtls': path}
"""
self.password = password
self.key = None
self.method = method
self.iv_sent = False # 加密向量需要双方共享, 是否被发送的标志位
self.cipher_iv = b''
self.decipher = None # 解密
self.decipher_iv = None # 解密向量
self.crypto_path = crypto_path
method = method.lower()
self._method_info = Cryptor.get_method_info(method)
if self._method_info:
self.cipher = self.get_cipher(
password, method, CIPHER_ENC_ENCRYPTION,
random_string(self._method_info[METHOD_INFO_IV_LEN])
)
else:
logging.error('method %s not supported' % method)
sys.exit(1)
参数:
- self: 实例
- password 加密秘钥, 就是配置文件(json 格式)里面的 password 字段
- method 加密算法名, 就是配置文件中的 method 字段
- crypto_path 自定义加密库的路径, 如果为配置, 将从系统路径中去找
cipher_iv
为初始向量, 见维基百科解释(英文版本)
|-------------> password 用户加密秘钥
|-------------> key 生成的加密秘钥
instance(实例) ---------|------------> method 加密算法
|-------------> iv_sent 初始化向量是否已经被发送
|-------------> cipher_iv 初始化向量
|-------------> decipher 解密
|-------------> decipher_iv 解密的初始化向量
|-------------> cryptor_path 加密库的地址
|-------------> _method_info 根据 method 获得的关于该加密算法的信息
|-------------> cipher method 加密类实例
if self._method_info:
如果关于指定的该加密算法不支持, sys.exit(1)
抛出错误已经不能解决问题了, 直接退出进程.
get_method_info
@staticmethod
def get_method_info(method):
method = method.lower()
m = method_supported.get(method)
return m
@staticmethod
装饰器让该方法成为类的静态方法
参数:
- method: 根据加密算法名获取关于该加密算法的信息
- @return (ken_len, iv_len, 创建该加密算法的方法)
iv_len
def iv_len(self):
return len(self.cipher_iv)
获取实例的加密向量(cipher_iv)的长度
get_cipher
def get_cipher(self, password, method, op, iv):
password = common.to_bytes(password)
m = self._method_info
if m[METHOD_INFO_KEY_LEN] > 0:
key, _ = EVP_BytesToKey(password,
m[METHOD_INFO_KEY_LEN],
m[METHOD_INFO_IV_LEN])
else:
# key_length == 0 indicates we should use the key directly
key, iv = password, b''
self.key = key
iv = iv[:m[METHOD_INFO_IV_LEN]]
if op == CIPHER_ENC_ENCRYPTION:
# this iv is for cipher not decipher
self.cipher_iv = iv
return m[METHOD_INFO_CRYPTO](method, key, iv, op, self.crypto_path)
参数:
- self: 实例本身
- password: 加密秘钥
- method: 加密算法
- op: 操作(是加密(CIPHER_ENC_ENCRYPTION)还是解密(CIPHER_ENC_DECRYPTION))
- iv: 调用
random_string
生成的随机字符串
if m[METHOD_INFO_KEY_LEN] > 0:
如果该加密算法需要的加密秘钥长度大于零, 说明需要根据 password
扩展出来 (key, iv), key, _ = EVP_BytesToKey(password, m[METHOD_INFO_KEY_LEN], m[METHOD_INFO_IV_LEN])
调用 EVP_BytesToKey
扩展 password
.
else:
指示应该直接使用 password
self.key = key
将 key 存储在 self 上
iv = iv[:m[METHOD_INFO_IV_LEN]]
取出该加密算法需要的向量长度
if op == CIPHER_ENC_ENCRYPTION:
如果是加密过程, self.cipher_iv = iv
将该向量存储在 self 上
return m[METHOD_INFO_CRYPTO](method, key, iv, op, self.crypto_path)
取出 m 中的加密函数类然后传入参数创建加密实例, 等到后面读加密模块的代码会提到这里.
encrypt
def encrypt(self, buf):
if len(buf) == 0:
return buf
if self.iv_sent:
return self.cipher.encrypt(buf)
else:
self.iv_sent = True
return self.cipher_iv + self.cipher.encrypt(buf)
参数:
- self: 实例本身
- buf: 需要加密的数据
if len(buf) == 0:
如果 buf
为空, 就不用加密了, 什么也没有就是最好的加密方式
if self.iv_sent:
如果 iv 向量已经被发送了, 直接把数据加密, 然后发送就好
else:
如果 iv 向量没有被发送, 拼接在有效负载数据的前端, 后面拼接上加密的数据, 在这里会标识 iv 向量已经被发送(后面会解释为什么需要 iv_sent 变量).
decrypt
def decrypt(self, buf):
if len(buf) == 0:
return buf
if self.decipher is None:
decipher_iv_len = self._method_info[METHOD_INFO_IV_LEN]
decipher_iv = buf[:decipher_iv_len]
self.decipher_iv = decipher_iv
self.decipher = self.get_cipher(
self.password, self.method,
CIPHER_ENC_DECRYPTION,
decipher_iv
)
buf = buf[decipher_iv_len:]
if len(buf) == 0:
return buf
return self.decipher.decrypt(buf)
参数:
- self: 实例本身
- buf: 加密的数据(完全不可读)
if len(buf) == 0:
如果什么都没有, 不必要解密
if self.decipher is None:
如果 self.decipher
为 None, 说明是第一次解密关于该次连接的数据, 需要创建解密的类实例
decipher_iv_len = self._method_info[METHOD_INFO_IV_LEN]
取出该加密算法的初始化向量(iv)的长度
decipher_iv = buf[:decipher_iv_len]
从加密的数据中取出来解密向量
self.decipher_iv = decipher_iv
将解密向量存储在 self 上
self.decipher = self.get_cipher( self.password, self.method, CIPHER_ENC_DECRYPTION, decipher_iv)
获取解密类实例, 传入的是 CIPHER_ENC_DECRYPTION
解密标志位常量
buf = buf[decipher_iv_len:]
从 buf 中删除初始化向量(iv), 因为初始化向量只是用来解密数据, 并不是数据的一部分
if len(buf) == 0:
如果删除之后没有了其他数据, 说明没有任何数据, ssclient 只是传递了初始化向量, return buf
直接返回删除了出事黄两项的 buf
return self.decipher.decrypt(buf)
调用 decipher
的 decrypt
解密被加密的数据
测试代码
该文件的其余部分都是测试部分的代码, 暂时不与解释.
class Cryptor(object):
def __init__(self, password, method, crypto_path=None):
"""
Crypto wrapper
:param password: str cipher password
:param method: str cipher
:param crypto_path: dict or none
{'openssl': path, 'sodium': path, 'mbedtls': path}
"""
self.password = password
self.key = None
self.method = method
self.iv_sent = False
self.cipher_iv = b''
self.decipher = None
self.decipher_iv = None
self.crypto_path = crypto_path
method = method.lower()
self._method_info = Cryptor.get_method_info(method)
if self._method_info:
self.cipher = self.get_cipher(
password, method, CIPHER_ENC_ENCRYPTION,
random_string(self._method_info[METHOD_INFO_IV_LEN])
)
else:
logging.error('method %s not supported' % method)
sys.exit(1)
@staticmethod
def get_method_info(method):
method = method.lower()
m = method_supported.get(method)
return m
def iv_len(self):
return len(self.cipher_iv)
def get_cipher(self, password, method, op, iv):
password = common.to_bytes(password)
m = self._method_info
if m[METHOD_INFO_KEY_LEN] > 0:
key, _ = EVP_BytesToKey(password,
m[METHOD_INFO_KEY_LEN],
m[METHOD_INFO_IV_LEN])
else:
# key_length == 0 indicates we should use the key directly
key, iv = password, b''
self.key = key
iv = iv[:m[METHOD_INFO_IV_LEN]]
if op == CIPHER_ENC_ENCRYPTION:
# this iv is for cipher not decipher
self.cipher_iv = iv
return m[METHOD_INFO_CRYPTO](method, key, iv, op, self.crypto_path)
def encrypt(self, buf):
if len(buf) == 0:
return buf
if self.iv_sent:
return self.cipher.encrypt(buf)
else:
self.iv_sent = True
return self.cipher_iv + self.cipher.encrypt(buf)
def decrypt(self, buf):
if len(buf) == 0:
return buf
if self.decipher is None:
decipher_iv_len = self._method_info[METHOD_INFO_IV_LEN]
decipher_iv = buf[:decipher_iv_len]
self.decipher_iv = decipher_iv
self.decipher = self.get_cipher(
self.password, self.method,
CIPHER_ENC_DECRYPTION,
decipher_iv
)
buf = buf[decipher_iv_len:]
if len(buf) == 0:
return buf
return self.decipher.decrypt(buf)
跳转函数列表 |
---|