# hagworm **Repository Path**: wsb310/hagworm ## Basic Information - **Project Name**: hagworm - **Description**: Hagworm是原生框架、原生库的中间层,对它们进行了更高层次的抽象,用来屏蔽直接的调用,达到不改变使用习惯的情况下可以随意更换框架或库。 Hagworm整合了它支持的各种框架和库,使它们成为一个整体,屏蔽了底层细节,简化了使用方式。 Hagworm提供了一个打包的环境,建立了工程质量的底线,开发者只需要关注业务逻辑本身,不需要再关注底层的性能和安全等问题。 - **Primary Language**: Python - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 14 - **Forks**: 6 - **Created**: 2020-08-28 - **Last Updated**: 2026-05-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Hagworm > Network Development Suite - 异步网络开发套件 ## 项目理念 Hagworm 是一个面向异步编程的 Python 工具库,旨在提供高效、可靠的网络开发基础设施。项目以 `asyncio` 为核心,封装了从基础工具函数到分布式组件的全栈开发能力,帮助开发者快速构建高性能的异步应用。 核心理念: - **异步优先**:所有 I/O 操作均基于 `asyncio`,支持 `uvloop` 加速 - **开箱即用**:提供 FastAPI/gRPC/RabbitMQ/Nacos 等框架的集成方案 - **生产就绪**:包含日志、配置、事务、缓存、限流等生产级组件 - **优雅抽象**:通过上下文管理器、装饰器等模式简化资源管理 ## 安装 ```bash pip install hagworm ``` 要求 Python >= 3.10。 --- ## 工程目录结构 ``` hagworm/ ├── hagworm/ # 核心代码 │ ├── __init__.py # 版本信息与品牌标识 │ ├── extend/ # 基础扩展模块 │ │ ├── asyncio/ # 异步扩展模块 │ │ │ ├── base.py # 异步基础工具类 │ │ │ ├── buffer.py # 队列缓冲与文件缓冲 │ │ │ ├── command.py # 多进程命令框架 │ │ │ ├── event.py # 异步事件总线 │ │ │ ├── file.py # 带缓存的文件加载器 │ │ │ ├── future.py # Future/线程/子进程工具 │ │ │ ├── mail.py # SMTP 邮件客户端 │ │ │ ├── mongo.py # MongoDB 连接管理 │ │ │ ├── mysql.py # MySQL 连接管理 │ │ │ ├── net.py # HTTP 客户端 │ │ │ ├── pool.py # 通用对象池 │ │ │ ├── redis.py # Redis 连接管理 │ │ │ ├── socket.py # TCP/Unix Socket 服务端与客户端 │ │ │ ├── task.py # 定时任务/并发任务/限流器 │ │ │ └── transaction.py # 异步事务 │ │ ├── base.py # 同步基础工具类 │ │ ├── cache.py # 堆栈缓存 │ │ ├── compile.py # PYC 编译工具 │ │ ├── config.py # 配置管理 │ │ ├── crypto.py # RSA 加解密 │ │ ├── error.py # 异常处理 │ │ ├── event.py # 同步事件总线 │ │ ├── igraph.py # 图数据结构 │ │ ├── interface.py # 接口定义 │ │ ├── logging.py # 日志系统 │ │ ├── media.py # M3U8 媒体解析 │ │ ├── metaclass.py # 单例元类 │ │ ├── ntp.py # NTP 时间同步 │ │ ├── process.py # 多进程管理 │ │ ├── qrcode.py # 二维码生成 │ │ ├── struct.py # 数据结构 │ │ ├── text.py # 文本处理 │ │ ├── trace.py # 链路追踪 │ │ ├── transaction.py # 同步事务 │ │ └── validator.py # 验证器 │ ├── frame/ # 应用框架 │ │ ├── fastapi/ # FastAPI 集成 │ │ │ ├── base.py # FastAPI 应用创建与中间件 │ │ │ ├── field.py # 自定义字段类型 │ │ │ ├── model.py # 数据模型 │ │ │ └── response.py # 响应封装 │ │ ├── gunicorn.py # Gunicorn Worker │ │ └── stress_tests.py # 压力测试框架 │ ├── static/ # 静态资源(Swagger UI) │ └── third/ # 第三方集成 │ ├── grpc/ # gRPC 框架 │ │ ├── bridge.py # gRPC 网桥 │ │ ├── client.py # gRPC 客户端 │ │ └── server.py # gRPC 服务端 │ ├── nacos/ # Nacos 服务发现 │ │ ├── client.py # Nacos 客户端 │ │ └── config.py # Nacos 配置 │ └── rabbitmq/ # RabbitMQ 消息队列 │ ├── consume.py # 消费者 │ ├── publish.py # 生产者 │ └── rpc.py # RPC 实现 ├── example/ # 使用示例 │ ├── fastapi_demo/ # FastAPI 示例项目 │ ├── grpc_demo/ # gRPC 示例 │ ├── main_rabbitmq.py # RabbitMQ 示例 │ ├── main_tcp.py # TCP 服务示例 │ └── main_test.py # 压力测试示例 ├── testing/ # 单元测试 └── pyproject.toml # 项目配置 ``` --- ## 模块详解 --- ### hagworm.extend.base — 同步基础工具类 #### `Utils` 集成常用工具函数的静态工具类。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `get_class_name(val)` | `val: Type` | `str` | 获取类的全限定名 | | `environment()` | 无 | `Dict` | 获取 Python 和系统环境信息 | | `print_slogan()` | 无 | `None` | 打印 Hagworm 品牌 Logo | | `deprecation_warning(val)` | `val: str` | `None` | 发出弃用警告 | | `get_node_id()` | 无 | `str` | 获取基于 PID 和 MAC 的节点 ID(MD5) | | `utf8(val)` | `val: AnyStr` | `bytes` | 将字符串或 bytes 转为 UTF-8 bytes | | `basestring(val)` | `val: AnyStr` | `str` | 将 bytes 或 str 转为 str | | `vague_string(val, vague_str, vague_len, min_vague, max_vague)` | `val: str` 等 | `str` | 字符串脱敏,如 `"13812345678"` → `"138*****678"` | | `json_encode(val, **kwargs)` | `val: Any` | `str` | JSON 编码(使用 ujson) | | `json_decode(val, **kwargs)` | `val: AnyStr` | `Any` | JSON 解码(ujson 优先,回退 json5) | | `msgpack_encode(val, **kwargs)` | `val: Any` | `bytes` | MessagePack 编码 | | `msgpack_decode(val, **kwargs)` | `val: AnyStr` | `Any` | MessagePack 解码 | | `today()` | 无 | `datetime` | 获取今天零点时间 | | `yesterday()` | 无 | `datetime` | 获取昨天零点时间 | | `localnow()` | 无 | `datetime` | 获取当前本地时间(秒级截断) | | `localnow_on_hour()` | 无 | `datetime` | 获取当前本地时间(小时级截断) | | `utcnow()` | 无 | `datetime` | 获取当前 UTC 时间(秒级截断) | | `utcnow_on_hour()` | 无 | `datetime` | 获取当前 UTC 时间(小时级截断) | | `timestamp()` | 无 | `int` | 获取当前秒级时间戳 | | `timestamp_ms()` | 无 | `int` | 获取当前毫秒级时间戳 | | `convert_bool(val)` | `val: Any` | `bool` | 将值转为布尔值,`'false'/'0'/''/False/0` 为 False | | `convert_int(val, default)` | `val: Any, default: int=0` | `int` | 将 float 转为 int | | `convert_float(val, default)` | `val: Any, default: float=0` | `float` | 将值转为 float | | `interval_limit(val, min_val, max_val)` | `val, min_val, max_val` | `int/float` | 将值限制在区间内 | | `split_float(val, sep, min_split, max_split)` | `val: str` 等 | `List[float]` | 按分隔符拆分字符串为浮点数列表 | | `join_float(val, sep)` | `val: Iterable, sep: str` | `str` | 将浮点数列表合并为字符串 | | `split_int(val, sep, min_split, max_split)` | `val: str` 等 | `List[int]` | 按分隔符拆分字符串为整数列表 | | `join_int(val, sep)` | `val: Iterable, sep: str` | `str` | 将整数列表合并为字符串 | | `split_str(val, sep, min_split, max_split)` | `val: str` 等 | `List[str]` | 按分隔符拆分字符串 | | `join_str(val, sep)` | `val, sep` | `str` | 将值列表合并为字符串 | | `list_extend(iterable, val)` | `iterable: List, val: Any` | `None` | 智能扩展列表,可迭代则 extend,否则 append | | `str_len(str_val)` | `str_val: str` | `int` | 计算字符串显示宽度(中文占2) | | `sub_str(val, length, suffix)` | `val: str, length: int` 等 | `str` | 按显示宽度截断字符串 | | `re_match(pattern, value)` | `pattern: str, value: str` | `bool` | 正则匹配 | | `rand_hit(val, prob)` | `val: List, prob: List/Callable` | `Any` | 按概率随机命中 | | `urandom_seed()` | 无 | `int` | 从 /dev/urandom 读取随机种子 | | `read_urandom(size)` | `size: int` | `AnyStr` | 从 /dev/urandom 读取随机字节 | | `get_host_ip()` | 无 | `str` | 获取本机外网 IP | | `ip2int(val)` | `val: str` | `int` | IP 地址转整数 | | `int2ip(val)` | `val: int` | `str` | 整数转 IP 地址 | | `time2stamp(time_str, format_type, timezone)` | `time_str: str` 等 | `int` | 时间字符串转时间戳 | | `stamp2time(time_int, format_type, timezone)` | `time_int: int` 等 | `str` | 时间戳转时间字符串 | | `radix24(val, align)` | `val: int, align: int=0` | `str` | 10 进制转 24 进制 | | `radix24_to_10(val)` | `val: str` | `int` | 24 进制转 10 进制 | | `radix36(val, align)` | `val: int, align: int=0` | `str` | 10 进制转 36 进制 | | `radix36_to_10(val)` | `val: str` | `int` | 36 进制转 10 进制 | | `radix62(val, align)` | `val: int, align: int=0` | `str` | 10 进制转 62 进制 | | `radix62_to_10(val)` | `val: str` | `int` | 62 进制转 10 进制 | | `radix_n(val, base, radix, align)` | `val: int` 等 | `str` | 通用进制转换 | | `radix_n_to_10(val, base, radix)` | `val: str` 等 | `int` | 通用进制转 10 进制 | | `xml_encode(dict_val, root_tag)` | `dict_val: Dict` 等 | `xml.dom.minidom.Document` | 字典转 XML | | `xml_decode(val)` | `val: str` | `Dict` | XML 转字典 | | `b32_encode(val, standard)` | `val: AnyStr, standard: bool=False` | `str` | Base32 编码 | | `b32_decode(val, standard, for_bytes)` | `val: AnyStr` 等 | `AnyStr` | Base32 解码 | | `b64_encode(val, standard, for_bytes)` | `val: AnyStr` 等 | `AnyStr` | Base64 编码(默认 urlsafe) | | `b64_decode(val, standard, for_bytes)` | `val: AnyStr` 等 | `AnyStr` | Base64 解码(默认 urlsafe) | | `jwt_encode(val, key, algorithms)` | `val: str, key: str` 等 | `str` | JWT 编码 | | `jwt_decode(val, key, algorithms)` | `val: str, key: str` 等 | `str` | JWT 解码 | | `yaml_encode(data, stream)` | `data: Any` 等 | `str` | YAML 编码 | | `yaml_decode(stream)` | `stream: Any` | `Any` | YAML 解码 | | `pickle_dumps(val)` | `val: Any` | `bytes` | Pickle 序列化(带 zlib 压缩) | | `pickle_loads(val)` | `val: bytes` | `Any` | Pickle 反序列化(带 zlib 解压) | | `uuid1(node, clock_seq)` | `node, clock_seq` | `str` | 生成 UUID1(hex 格式) | | `uuid1_urn(node, clock_seq)` | `node, clock_seq` | `str` | 生成 UUID1(URN 格式) | | `crc32(val)` | `val: AnyStr` | `int` | CRC32 校验 | | `md5(val)` | `val: AnyStr` | `str` | MD5 哈希 | | `md5_u32(val)` | `val: AnyStr` | `int` | MD5 取前 32 位整数 | | `md5_u64(val)` | `val: AnyStr` | `int` | MD5 取前 64 位整数 | | `sha1(val)` | `val: AnyStr` | `str` | SHA1 哈希 | | `sha256(val)` | `val: AnyStr` | `str` | SHA256 哈希 | | `sha512(val)` | `val: AnyStr` | `str` | SHA512 哈希 | | `hmac_md5(key, data, b64_std)` | `key, data: AnyStr` 等 | `str` | HMAC-MD5 签名 | | `hmac_sha1(key, data, b64_std)` | `key, data: AnyStr` 等 | `str` | HMAC-SHA1 签名 | | `ordered_dict(val)` | `val: Dict=None` | `OrderedDict` | 创建按键排序的有序字典 | | `is_iterable(obj, standard)` | `obj: Any, standard: bool=False` | `bool` | 判断是否可迭代 | | `luhn_valid(val)` | `val: str` | `bool` | Luhn 算法校验(银行卡) | | `luhn_sign(val)` | `val: str` | `str` | Luhn 算法签名 | | `identity_card(val)` | `val: str` | `bool` | 中国身份证号校验 | | `params_join(params, excludes)` | `params: Dict, excludes: List` | `str` | 参数排序拼接 | | `params_sign(*args, **kwargs)` | `*args, **kwargs` | `str` | 参数签名(MD5) | | `get_today_region(today)` | `today: datetime=None` | `Tuple[int, int]` | 获取当天的时间戳范围 | | `get_month_region(today)` | `today: datetime=None` | `Tuple[int, int]` | 获取当月的时间戳范围 | | `get_week_region(today)` | `today: datetime=None` | `Tuple[int, int]` | 获取本周的时间戳范围 | | `zip_file(zip_file, *file_paths)` | `zip_file: str, *file_paths: str` | `None` | 压缩文件 | | `unzip_file(zip_file, file_paths)` | `zip_file: str, file_paths: str` | `None` | 解压文件 | | `get_cpu_percent()` | 无 | `float` | 获取 CPU 使用率(1秒缓存) | | `kill_process(*names)` | `*names: str` | `None` | 按名称杀死进程 | > ⚠️ **易混淆点**:`b64_encode`/`b64_decode` 默认使用 urlsafe 模式且去除 `=` 填充,如果需要标准 Base64 请设置 `standard=True`。 ```python from hagworm.extend.base import Utils # 字符串脱敏 Utils.vague_string("13812345678") # "138*****678" # 进制转换 Utils.radix62(12345) # "3D5" Utils.radix62_to_10("3D5") # 12345 # 时间范围 start, end = Utils.get_today_region() start, end = Utils.get_month_region() # Base64 编码(注意 urlsafe 与 standard 的区别) Utils.b64_encode("hello") # urlsafe, 无填充 Utils.b64_encode("hello", standard=True) # 标准 Base64 # JSON 编解码 Utils.json_encode({"key": "value"}) Utils.json_decode('{"key": "value"}') # 身份证校验 Utils.identity_card("110101199001011234") # True/False # 参数签名 Utils.params_sign("arg1", "arg2", key1="val1", key2="val2") ``` #### `FuncWrapper` 将多个函数包装成一个可调用对象,调用时依次执行所有注册的函数。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `__call__(*args, **kwargs)` | 任意参数 | `None` | 依次调用所有注册函数 | | `is_valid` (属性) | 无 | `bool` | 是否有注册函数 | | `add(func)` | `func: Callable` | `bool` | 添加函数,已存在返回 False | | `remove(func)` | `func: Callable` | `bool` | 移除函数,不存在返回 False | ```python from hagworm.extend.base import FuncWrapper wrapper = FuncWrapper() def handler1(data): print(f"handler1: {data}") def handler2(data): print(f"handler2: {data}") wrapper.add(handler1) wrapper.add(handler2) wrapper("test") # 依次调用 handler1 和 handler2 ``` --- ### hagworm.extend.struct — 数据结构 #### `Enum` / `IntEnum` 增强版的枚举类,提供字典转换和键值查询功能。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `items()` | 无 | `Iterable` | 返回 `(name, value)` 对 | | `keys()` | 无 | `Iterable` | 返回所有 name | | `values()` | 无 | `Iterable` | 返回所有 value | | `to_dict()` | 无 | `Dict` | 返回 `{name: value}` 字典 | | `to_keys_dict()` | 无 | `Dict` | 返回 `{name: enum_item}` 字典 | | `to_values_dict()` | 无 | `Dict` | 返回 `{value: enum_item}` 字典 | | `has_key(key)` | `key` | `bool` | 是否包含指定 name | | `has_value(value)` | `value` | `bool` | 是否包含指定 value | ```python from hagworm.extend.struct import Enum, IntEnum class Color(Enum): RED = "red" GREEN = "green" BLUE = "blue" Color.to_dict() # {"RED": "red", "GREEN": "green", "BLUE": "blue"} Color.has_key("RED") # True Color.has_value("red") # True class Status(IntEnum): ACTIVE = 1 INACTIVE = 0 Status.items() # [("ACTIVE", 1), ("INACTIVE", 0)] ``` #### `RoundRobin` / `RoundRobinSimple` 轮询调度器。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `RoundRobin(members)` | `members: Dict[str, T]=None` | `RoundRobin` | 创建命名轮询器 | | `append(name, member)` | `name: str, member: T` | `Optional[T]` | 添加成员,返回被替换的旧成员 | | `clear(names)` | `names: List[str]=None` | `Dict` | 清除成员,返回被清除的成员 | | `reset(members)` | `members: Dict=None` | `Dict` | 重置成员列表 | | `get()` | 无 | `Tuple[str, T]` | 获取下一个成员 | ```python from hagworm.extend.struct import RoundRobin, RoundRobinSimple # 命名轮询 rr = RoundRobin({"server1": "10.0.0.1", "server2": "10.0.0.2"}) name, member = rr.get() # 轮询获取 # 简单轮询 rrs = RoundRobinSimple([1, 2, 3]) val = rrs.get() # 轮询获取 ``` #### `ThreadList` / `ThreadDict` 多线程安全的列表和字典,基于 `threading.local` 实现。 ```python from hagworm.extend.struct import ThreadList, ThreadDict tls_list = ThreadList() tls_list.data.append("value") tls_dict = ThreadDict() tls_dict.data["key"] = "value" ``` #### `ByteArray` 扩展的 `BytesIO` 类,支持结构化二进制数据的读写。 支持的字段读写方法:`read_char`/`write_char`、`read_short`/`write_short`、`read_int`/`write_int`、`read_long`/`write_long`、`read_float`/`write_float`、`read_double`/`write_double`、`read_bool`/`write_bool`、`read_bytes`/`write_bytes`、`read_string`/`write_string` 等。 支持字节序:`NETWORK`(!)、`NATIVE`(=)、`LITTLE_ENDIAN`(<)、`BIG_ENDIAN`(>)。 ```python from hagworm.extend.struct import ByteArray buf = ByteArray() buf.set_endian(ByteArray.NETWORK) buf.write_int(42) buf.write_string("hello") buf.write_float(3.14) # 读取需要重新定位 buf.seek(0) val_int = buf.read_int() # 42 val_str = buf.read_string(5) # "hello" val_float = buf.read_float() # 3.14 ``` #### `KeyLowerDict` 将字典的 CamelCase 键自动转为 snake_case 小写键。 ```python from hagworm.extend.struct import KeyLowerDict data = KeyLowerDict({"MySQLHost": "localhost", "RedisPort": 6379}) # {"my_sql_host": "localhost", "redis_port": 6379} ``` > ⚠️ **注意**:`KeyLowerDict` 会递归处理嵌套字典,所有层级的键都会被转换。 --- ### hagworm.extend.cache — 堆栈缓存 #### `StackCache` 基于 `TTLCache` 的内存高速缓存。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `__init__(maxsize, ttl)` | `maxsize: float=0xff, ttl: float=60` | `StackCache` | 初始化缓存 | | `has(key)` | `key: str` | `bool` | 是否存在缓存 | | `get(key, default)` | `key: str, default: Any=None` | `Any` | 获取缓存值 | | `set(key, val)` | `key: str, val: Any` | `None` | 设置缓存 | | `incr(key, val)` | `key: str, val: float=1` | `float` | 自增 | | `decr(key, val)` | `key: str, val: float=1` | `float` | 自减 | | `delete(key)` | `key: str` | `None` | 删除缓存 | | `size()` | 无 | `int` | 缓存大小 | | `clear()` | 无 | `None` | 清空缓存 | ```python from hagworm.extend.cache import StackCache cache = StackCache(maxsize=100, ttl=60) cache.set("user:1", {"name": "Alice"}) cache.get("user:1") # {"name": "Alice"} cache.has("user:1") # True cache.incr("counter") # 1 cache.incr("counter") # 2 cache.decr("counter") # 1 ``` --- ### hagworm.extend.config — 配置管理 #### `Field` 配置字段定义,用于声明配置项所属 section 和默认值。 ```python from hagworm.extend.config import Field, HostType, StrListType Port: int = Field("Base", default=8080) Debug: bool = Field("Base", default=False) Server: HostType = Field("Server") # 自动解析 "host:port" 为 (host, port) Tags: StrListType = Field("App") # 自动按 "|" 分割为字符串列表 ``` #### `Configure` 配置管理类,支持从 INI、JSON、YAML、环境变量等多种来源加载配置。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `read(path, encoding)` | `path: str, encoding: str` | `None` | 从 INI 文件读取 | | `read_env()` | 无 | `None` | 从环境变量读取 | | `read_str(val)` | `val: str` | `None` | 从字符串读取 | | `read_dict(val)` | `val: Dict` | `None` | 从字典读取 | | `read_json(path, encoding)` | `path: str` 等 | `None` | 从 JSON 文件读取 | | `read_yaml(path, encoding)` | `path: str` 等 | `None` | 从 YAML 文件读取 | | `set_options(section, **options)` | `section: str, **options: str` | `None` | 动态设置配置 | | `to_dict(section)` | `section=None` | `Dict` | 导出为字典 | | `to_yaml(section)` | `section=None` | `str` | 导出为 YAML | > ⚠️ **注意**:环境变量优先级高于配置文件。环境变量格式为 `{SECTION}_{KEY}` 全大写。 ```python from hagworm.extend.config import Configure, Field, HostType, StrListType class AppConfig(Configure): Port: int = Field("Base", default=8080) Debug: bool = Field("Base", default=False) Server: HostType = Field("Server") Tags: StrListType = Field("App") config = AppConfig() config.read_yaml("./config.yaml") # config.Port, config.Debug, config.Server 等可直接访问 ``` --- ### hagworm.extend.error — 异常处理 #### `Ignore` 可忽略的异常,用于 `with` 语句块跳出或多层逻辑跳出。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `__init__(*args, layers)` | `layers: int=1` | `Ignore` | `layers` 控制跳出几层 with | | `log()` | 无 | `None` | 打印 warning 日志 | | `throw()` | 无 | `bool` | 返回是否继续抛出 | > ⚠️ **易混淆点**:`layers=1`(默认)表示跳出当前 with 块后不再继续抛出;`layers=2` 表示跳出两层 with 块。 #### `catch_warning()` / `catch_error()` 上下文管理器,捕获异常并打印日志。 ```python from hagworm.extend.error import Ignore, catch_error, catch_warning # 基本用法 with catch_error(): result = 1 / 0 # 异常被捕获并打印 error 日志 # 使用 Ignore 跳出 with 块 with catch_error(): if should_skip: raise Ignore("skip this block") # 安全跳出,打印 warning # 多层跳出 with catch_error(): with catch_error(): raise Ignore(layers=2) # 跳出两层 with 块 ``` --- ### hagworm.extend.event — 同步事件总线 #### `EventDispatcher` 同步事件总线,基于发布-订阅模式。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `dispatch(_type, *args, **kwargs)` | `_type: str` 等 | `None` | 触发事件 | | `add_listener(_type, _callable)` | `_type: str, _callable: Callable` | `bool` | 添加事件监听器 | | `remove_listener(_type, _callable)` | `_type: str, _callable: Callable` | `bool` | 移除事件监听器 | ```python from hagworm.extend.event import EventDispatcher dispatcher = EventDispatcher() def on_login(user_id): print(f"User {user_id} logged in") dispatcher.add_listener("login", on_login) dispatcher.dispatch("login", "user_123") dispatcher.remove_listener("login", on_login) ``` --- ### hagworm.extend.interface — 接口定义 #### `RunnableInterface` 可运行接口,需实现 `run()` 方法。 #### `TaskInterface` 任务接口,需实现 `start()`、`stop()`、`is_running()` 方法。 #### `ContextManager` / `AsyncContextManager` 上下文资源管理器基类,子类实现 `_context_release()` 即可使用 `with`/`async with` 语句管理资源。支持 `Ignore` 异常的安全跳出。 ```python from hagworm.extend.interface import ContextManager class MyResource(ContextManager): def _context_release(self): print("资源释放") with MyResource() as res: print("使用资源") # 自动调用 _context_release ``` --- ### hagworm.extend.metaclass — 单例元类 #### `Singleton` / `SafeSingleton` 单例基类。`Singleton` 非线程安全,`SafeSingleton` 线程安全。 > ⚠️ **注意**:在多进程 fork 场景下,单例可能会因内存复制导致资源冲突,会发出警告。 ```python from hagworm.extend.metaclass import Singleton, SafeSingleton class Database(Singleton): def __init__(self): self.connection = "connected" db1 = Database() db2 = Database() assert db1 is db2 # True,同一个实例 ``` --- ### hagworm.extend.text — 文本处理 #### `StrUtils` 字符串工具类。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `to_half_width(value)` | `value: str` | `str` | 全角转半角 | | `to_full_width(value)` | `value: str` | `str` | 半角转全角 | | `to_camel(val)` | `val: str` | `str` | snake_case → camelCase | | `to_snake(val)` | `val: str` | `str` | CamelCase → snake_case | | `to_upper_camel(val)` | `val: str` | `str` | snake_case → UpperCamelCase | | `to_camel_dict(val)` | `val: Any` | `Any` | 递归转换字典键为 camelCase | | `to_snake_dict(val)` | `val: Any` | `Any` | 递归转换字典键为 snake_case | | `to_upper_camel_dict(val)` | `val: Any` | `Any` | 递归转换字典键为 UpperCamelCase | ```python from hagworm.extend.text import StrUtils StrUtils.to_camel("hello_world") # "helloWorld" StrUtils.to_snake("helloWorld") # "hello_world" StrUtils.to_upper_camel("hello_world") # "HelloWorld" StrUtils.to_half_width("Hello") # "Hello" StrUtils.to_snake_dict({"firstName": "Alice", "lastName": "Bob"}) # {"first_name": "Alice", "last_name": "Bob"} ``` #### `TextFinder` 基于 Aho-Corasick 算法的关键词查找与替换。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `__init__(words)` | `words: List[str]` | `TextFinder` | 初始化关键词列表 | | `find_all(content)` | `content: str` | `List[str]` | 查找所有匹配关键词 | | `replace_all(content, _chars)` | `content: str, _chars: str='*'` | `str` | 替换所有匹配关键词 | ```python from hagworm.extend.text import TextFinder finder = TextFinder(["敏感词1", "敏感词2", "测试"]) finder.find_all("这是一个测试文本") # ["测试"] finder.replace_all("这是一个测试文本") # "这是一个**文本" ``` --- ### hagworm.extend.validator — 验证器 提供常用格式的正则验证函数。 | 函数 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `asc_visible(val)` | `val: str` | `bool` | ASC 可见字符验证 | | `uuid(val)` | `val: str` | `bool` | UUID 格式验证 | | `email(val)` | `val: str` | `bool` | 邮箱格式验证 | | `domain(val)` | `val: str` | `bool` | 域名格式验证 | | `url(val)` | `val: str` | `bool` | URL 格式验证 | | `mac_addr(val)` | `val: str` | `bool` | MAC 地址验证 | | `ipv4(val)` | `val: str` | `bool` | IPv4 地址验证 | | `ipv4_cidr(val)` | `val: str` | `bool` | IPv4 CIDR 验证 | | `ipv6(val)` | `val: str` | `bool` | IPv6 地址验证 | | `ipv6_cidr(val)` | `val: str` | `bool` | IPv6 CIDR 验证 | ```python from hagworm.extend.validator import email, url, ipv4 email("test@example.com") # True url("https://example.com") # True ipv4("192.168.1.1") # True ipv4_cidr("192.168.1.0/24") # True ``` --- ### hagworm.extend.trace — 链路追踪 | 函数/装饰器 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `get_trace_id()` | 无 | `str` | 获取当前 trace_id | | `refresh_trace_id(trace_id)` | `trace_id: str=None` | `str` | 刷新 trace_id | | `@trace_wrapper` | 函数 | 函数 | 为函数自动生成 trace_id | | `@tracing` | 函数 | 函数 | 同步函数耗时追踪日志 | | `@async_tracing` | 函数 | 函数 | 异步函数耗时追踪日志 | ```python from hagworm.extend.trace import trace_wrapper, async_tracing, refresh_trace_id @trace_wrapper async def handle_request(): # 自动生成新的 trace_id pass @async_tracing async def slow_operation(): # 自动记录执行耗时 pass refresh_trace_id("custom-trace-id-123") ``` --- ### hagworm.extend.logging — 日志系统 #### `init_logger` 初始化日志系统。 | 入参 | 类型 | 默认值 | 说明 | |------|------|--------|------| | `level` | `str` | 必填 | 日志级别 | | `handler` | `logging.Handler` | `None` | 自定义日志处理器 | | `file_path` | `str` | `None` | 日志文件路径 | | `file_name` | `str` | `runtime_{time}.log` | 日志文件名 | | `file_rotation` | `Callable` | 默认轮转器 | 文件轮转策略 | | `file_retention` | `int` | `0xff` | 保留文件数 | | `extra` | `Dict` | `None` | 额外上下文 | | `enqueue` | `bool` | `False` | 是否队列化 | | `debug` | `bool` | `False` | 调试模式 | #### `LogFileRotator` 日志文件轮转策略,支持按大小和时间轮转。 ```python from hagworm.extend.logging import init_logger, LogFileRotator init_logger( "INFO", file_path="/var/log/app", file_rotation=LogFileRotator.make(500, "00:00"), # 500MB 或每天0点轮转 file_retention=30, ) ``` #### `KafkaSink` / `ElasticsearchSink` 日志投递到 Kafka 或 Elasticsearch。 ```python from hagworm.extend.logging import KafkaSink, ElasticsearchSink # Kafka 日志投递 kafka_sink = KafkaSink("localhost:9092", "app-logs") # Elasticsearch 日志投递 es_sink = ElasticsearchSink("http://localhost:9200", "app-logs") ``` --- ### hagworm.extend.crypto — RSA 加解密 #### `RsaUtil` | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `gen_rsa_key(rsa_key, private)` | `rsa_key: str, private: bool=False` | `str` | 生成 PEM 格式密钥 | | `rsa_sign(rsa_key, sign_data)` | `rsa_key: str, sign_data: str` | `str` | RSA 签名 | | `rsa_verity(pubic_key, verity_data, verity_sign)` | 三个 `str` | `bool` | RSA 验签 | ```python from hagworm.extend.crypto import RsaUtil private_key = "MIIEvQ..." # 裸密钥字符串 public_key = "MIIBIj..." pem_private = RsaUtil.gen_rsa_key(private_key, private=True) pem_public = RsaUtil.gen_rsa_key(public_key) signature = RsaUtil.rsa_sign(private_key, "data to sign") is_valid = RsaUtil.rsa_verity(public_key, "data to sign", signature) ``` --- ### hagworm.extend.transaction — 同步事务 #### `Transaction` 基于上下文管理器的事务对象,未显式 commit 则自动 rollback。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `add_commit_callback(_callable, *args, **kwargs)` | `_callable: Callable` 等 | `None` | 添加提交回调 | | `add_rollback_callback(_callable, *args, **kwargs)` | `_callable: Callable` 等 | `None` | 添加回滚回调 | | `commit()` | 无 | `None` | 提交事务,执行所有提交回调 | | `rollback()` | 无 | `None` | 回滚事务,执行所有回滚回调 | | `bind(trx)` | `trx: TransactionAbstract` | `None` | 绑定另一个事务(级联提交/回滚) | ```python from hagworm.extend.transaction import Transaction with Transaction() as trx: trx.add_commit_callback(lambda: print("committed")) trx.add_rollback_callback(lambda: print("rolled back")) # 如果不调用 trx.commit(),退出 with 时自动 rollback trx.commit() ``` --- ### hagworm.extend.compile — PYC 编译 #### `compile_pyc` 将 Python 源码编译为 PYC 字节码。 | 入参 | 类型 | 说明 | |------|------|------| | `file_path` | `str` | 源码目录 | | `pyc_file_path` | `str` | 输出目录(默认同源码目录) | | `exclude` | `str` | 排除目录(用 `\|` 分隔) | ```bash python -m hagworm.extend.compile -i ./src -o ./dist -e "test|tmp" ``` --- ### hagworm.extend.qrcode — 二维码 #### `QRCode` | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `make(data, **kwargs)` | `data: Any` | `bytes` | 生成 PNG 二维码 | | `make_svg(data, **kwargs)` | `data: Any` | `str` | 生成 SVG 二维码 | ```python from hagworm.extend.qrcode import QRCode png_data = QRCode.make("https://example.com") svg_str = QRCode.make_svg("https://example.com") ``` --- ### hagworm.extend.igraph — 图数据结构 #### `Graph` / `DiGraph` 基于 `python-igraph` 的图数据结构封装。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `add_vertex(name, **kwargs)` | `name: str` | `Vertex` | 添加顶点 | | `find_vertex(name)` | `name: str` | `Vertex` | 查找顶点 | | `add_edge(source, target)` | 顶点标识 | `None` | 添加边 | | `del_edge(source, target)` | 顶点标识 | `None` | 删除边 | | `deep_search(name, mode)` | `name: str, mode: str` | `List[str]` | 深度搜索 | `DiGraph` 额外方法: | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `find_in(name)` | `name: str` | `List[Vertex]` | 查找入边顶点 | | `find_out(name)` | `name: str` | `List[Vertex]` | 查找出边顶点 | | `find_tree(name, depth, mode)` | `name: str` 等 | `Dict` | 查找依赖树 | ```python from hagworm.extend.igraph import DiGraph graph = DiGraph("service_deps") graph.add_vertex("service_a") graph.add_vertex("service_b") graph.add_vertex("service_c") graph.add_edge("service_a", "service_b") graph.add_edge("service_b", "service_c") graph.find_out("service_a") # [service_b] graph.deep_search("service_a", mode="out") # ["service_b", "service_c"] ``` --- ### hagworm.extend.media — M3U8 媒体解析 #### `M3U8` M3U8 播放列表解析与裁剪。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `__init__(value, host)` | `value: str, host: str=''` | `M3U8` | 解析 M3U8 内容 | | `split(_start, _end)` | `_start: int, _end: int` | `str` | 按时间裁剪 | | `__add__(other)` / `__iadd__(other)` | `other: M3U8` | `M3U8` | 合并播放列表 | ```python from hagworm.extend.media import M3U8 m3u8 = M3U8(m3u8_content, host="https://cdn.example.com/") clip = m3u8.split(10, 30) # 裁剪 10-30 秒的片段 ``` --- ### hagworm.extend.ntp — NTP 时间同步 #### `NTPClient` 异步 NTP 客户端,定期同步时间偏移。 | 入参 | 类型 | 默认值 | 说明 | |------|------|--------|------| | `host` | `str` | `time.windows.com` | NTP 服务器 | | `interval` | `int` | `3600` | 同步间隔(秒) | | `sampling` | `int` | `5` | 每次采样次数 | | 属性 | 类型 | 说明 | |------|------|------| | `offset` | `float` | 时间偏移量 | | `timestamp` | `float` | 校准后的秒级时间戳 | | `timestamp_ms` | `int` | 校准后的毫秒级时间戳 | ```python from hagworm.extend.ntp import NTPClient ntp = NTPClient("ntp.aliyun.com", interval=1800, sampling=5) ntp.start() calibrated_time = ntp.timestamp calibrated_time_ms = ntp.timestamp_ms ``` --- ### hagworm.extend.process — 多进程管理 #### `Daemon` 守护进程管理器,管理多个子进程。 | 入参 | 类型 | 说明 | |------|------|------| | `target` | `Callable` | 子进程入口函数 | | `sub_process_num` | `int` | 子进程数量 | | `cpu_affinity` | `bool` | 是否绑定 CPU 亲和性 | | `join_timeout` | `int` | 等待子进程退出超时 | #### `SharedByteArray` 基于共享内存的字节数组,支持跨进程读写。 ```python from hagworm.extend.process import SharedByteArray # 创建模式 with SharedByteArray(name="shared_buf", create=True, size=1024) as buf: buf.write_int(42) # 读取模式(另一个进程) with SharedByteArray(name="shared_buf", create=False) as buf: val = buf.read_int() # 42 ``` --- ### hagworm.extend.asyncio.base — 异步基础工具类 #### `Utils`(异步版) 继承同步 `Utils`,增加异步工具方法。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `sleep(delay)` | `delay: float` | `None` | 异步等待 | | `is_coroutine_function(func)` | `func: Callable` | `bool` | 判断是否协程函数(支持 partial) | | `awaitable_wrapper(obj)` | `obj: Any` | `Any` | 自适应 awaitable 对象 | | `wait_frame(count)` | `count: int=10` | `Generator` | 暂停指定帧数 | | `get_event_loop()` | 无 | `AbstractEventLoop` | 获取事件循环 | | `loop_time()` | 无 | `float` | 获取事件循环时间 | | `wait_any_completed(*args)` | 协程/任务 | `None` | 等待任一完成,取消其余 | | `call_soon(callback, *args, loop, context)` | `callback: Callable` 等 | `Handle` | 延时调用(隔离上下文) | | `call_later(delay, callback, *args, loop, context)` | `delay: float` 等 | `Handle` | 延时指定秒数调用 | | `call_at(when, callback, *args, loop, context)` | `when: float` 等 | `Handle` | 指定时间调用 | | `create_task(coro)` | `coro: Coroutine` | `Task` | 创建任务 | | `run_until_complete(coro)` | `coro: Coroutine` | `Any` | 运行直到完成 | | `async_timeout(timeout)` | `timeout: float` | 异步上下文管理器 | 异步超时等待 | #### `install_uvloop()` 尝试安装 uvloop 加速事件循环。 #### `async_adapter(func)` 异步函数适配装饰器,使异步函数可以在同步代码中非阻塞调用。 #### `WeakContextVar` 弱引用版的 `ContextVar`,避免上下文变量阻止对象回收。 #### `AsyncCirculatory` / `AsyncCirculatoryForSecond` 异步循环器,用于需要重复执行的场景。 | 入参 | 类型 | 默认值 | 说明 | |------|------|--------|------| | `timeout` | `int/float` | `0` | 超时时间(0=不限) | | `interval` | `int/float` | `0xff`/`1` | 间隔帧数/秒数 | | `max_times` | `int` | `0` | 最大执行次数(0=不限) | ```python from hagworm.extend.asyncio.base import Utils, AsyncCirculatoryForSecond, ShareFuture, FuncCache # 异步循环 async for index in AsyncCirculatoryForSecond(timeout=10, interval=1): # 每秒执行一次,最多10秒 await check_status() # 共享 Future(并发调用共享计算结果) @ShareFuture() async def fetch_data(key): # 同一时刻相同参数的调用共享结果 return await expensive_operation(key) # 函数缓存 @FuncCache(maxsize=100, ttl=10) async def get_config(key): return await load_config(key) ``` #### `AsyncConstructor` 支持异步构造函数的基类。 ```python from hagworm.extend.asyncio.base import AsyncConstructor class AsyncClient(AsyncConstructor): async def __async_init__(self): self.connection = await create_connection() client = await AsyncClient() # 异步构造 ``` #### `AsyncFuncWrapper` 阻塞式异步函数包装器,将多个同步/异步函数包装为一个可 await 调用对象。 ```python from hagworm.extend.asyncio.base import AsyncFuncWrapper wrapper = AsyncFuncWrapper() wrapper.add(async_handler1) wrapper.add(sync_handler2) await wrapper("event_data") # 依次 await 所有处理器 ``` --- ### hagworm.extend.asyncio.future — Future 与线程工具 #### `WaitForever` 永远等待的 Future,直到收到 SIGINT/SIGTERM 信号。 #### `FutureWithTimeout` 带超时功能的 Future。 | 入参 | 类型 | 说明 | |------|------|------| | `delay` | `float` | 超时秒数 | | `default` | `Any` | 超时后的默认返回值 | #### `FutureWithCoroutine` Future 与 Coroutine 的桥接,使一个 Future 可被多个协程 await。 #### `Thread` 增强版线程,支持优雅退出。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `stop(timeout)` | `timeout: int` | `None` | 设置退出标记 | | `is_stopped()` | 无 | `bool` | 是否已标记退出 | #### `ThreadPool` 线程池,桥接线程与协程。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `__init__(max_workers)` | `max_workers: int=None` | `ThreadPool` | 初始化 | | `run(_callable, *args, **kwargs)` | `_callable: Callable` | `Any` | 在线程中执行函数并 await 结果 | #### `ThreadWorker` 装饰器,将同步函数转为非阻塞异步函数。 ```python from hagworm.extend.asyncio.future import ThreadWorker, ThreadPool @ThreadWorker(max_workers=4) def blocking_io(path): with open(path) as f: return f.read() # 在异步代码中调用 result = await blocking_io("/path/to/file") # 也可以直接使用 ThreadPool pool = ThreadPool(max_workers=8) result = await pool.run(os.path.exists, "/path/to/file") ``` #### `SubProcess` 异步子进程管理。 ```python from hagworm.extend.asyncio.future import SubProcess proc = await SubProcess.create("python", "script.py", stdout=asyncio.subprocess.PIPE) await proc.wait(timeout=30) ``` --- ### hagworm.extend.asyncio.task — 定时任务与并发控制 #### `IntervalTask` 间隔任务,按固定秒数重复执行。 ```python from hagworm.extend.asyncio.task import IntervalTask task = IntervalTask.create(60, my_func, arg1, arg2) task.start() ``` #### `CronTask` Cron 定时任务。 ```python from hagworm.extend.asyncio.task import CronTask task = CronTask.create("0 8 * * *", morning_job) # 每天8点 task.start() ``` #### `DCSCronTask` 分布式 Cron 任务,基于 Redis 实现分布式锁,确保多实例环境下任务只执行一次。 ```python from hagworm.extend.asyncio.task import DCSCronTask from hagworm.extend.asyncio.redis import RedisPool dcs_task = DCSCronTask(redis_pool, "daily_report", "0 9 * * *", generate_report) dcs_task.start() ``` #### `MultiTasks` 多任务并发管理器,通过信号量控制并发数。 ```python from hagworm.extend.asyncio.task import MultiTasks tasks = MultiTasks(tasks_num=10) tasks.append(fetch_url("http://example.com/1")) tasks.append(fetch_url("http://example.com/2")) tasks.append(fetch_url("http://example.com/3")) results = await tasks # 最多 10 个并发 ``` #### `RateLimiter` 流量控制器,保护计算资源。 | 入参 | 类型 | 说明 | |------|------|------| | `task_limit` | `int` | 最大并发任务数 | | `wait_limit` | `int` | 等待队列长度 | | `timeout` | `int` | 等待超时 | | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `append(func, *args, **kwargs)` | `func: Callable` | `bool` | 提交任务(不等待结果) | | `call(func, *args, **kwargs)` | `func: Callable` | `Any` | 提交任务并等待结果 | | `join()` | 无 | `None` | 等待所有任务完成 | | `close()` | 无 | `None` | 关闭限流器 | ```python from hagworm.extend.asyncio.task import RateLimiter limiter = RateLimiter(task_limit=5, wait_limit=10, timeout=60) # 不等待结果 await limiter.append(process_data, data1) await limiter.append(process_data, data2) # 等待结果 result = await limiter.call(process_data, data3) await limiter.join() limiter.close() ``` --- ### hagworm.extend.asyncio.buffer — 缓冲区 #### `QueueBuffer` 异步队列缓冲区,批量消费数据。 | 入参 | 类型 | 说明 | |------|------|------| | `handler` | `Callable` | 批量处理函数 | | `slice_size` | `int` | 每批大小 | | `slice_time` | `float` | 最大等待时间 | | `task_limit` | `int` | 并发处理数 | | `wait_limit` | `int` | 等待队列长度 | | `timeout` | `int` | 超时 | ```python from hagworm.extend.asyncio.buffer import QueueBuffer async def batch_insert(records): await db.insert_many(records) buffer = QueueBuffer(batch_insert, slice_size=100, slice_time=5, task_limit=4) await buffer.append(record1) await buffer.append(record2) # 满100条或5秒后自动批量处理 await buffer.close() ``` #### `FileBuffer` 文件缓冲区,支持大文件分片读写。 ```python from hagworm.extend.asyncio.buffer import FileBuffer with FileBuffer(slice_size=0x1000000) as fb: fb.write(b"large data...") data = fb.read(1024) ``` --- ### hagworm.extend.asyncio.event — 异步事件总线 #### `EventDispatcher`(异步版) 支持异步函数的事件总线。 #### `EventWaiter` 带超时的临时消息接收器。 ```python from hagworm.extend.asyncio.event import EventDispatcher, EventWaiter dispatcher = EventDispatcher() async def on_message(msg): print(msg) dispatcher.add_listener("message", on_message) dispatcher.dispatch("message", "hello") ``` #### `DistributedEvent` 基于 Redis Pub/Sub 的分布式事件总线。 ```python from hagworm.extend.asyncio.event import DistributedEvent from hagworm.extend.asyncio.redis import RedisPool dist_event = DistributedEvent(redis_pool, "my_app_events", channel_count=4) dist_event.add_listener("user_login", handle_login) await dist_event.dispatch("user_login", user_id="123") ``` --- ### hagworm.extend.asyncio.net — HTTP 客户端 #### `HTTPClientPool` 基于 `httpx` 的异步 HTTP 客户端,内置重试机制。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `get(url, **kwargs)` | `url: str` 等 | `bytes` | GET 请求 | | `post(url, **kwargs)` | `url: str` 等 | `bytes` | POST 请求 | | `put(url, **kwargs)` | `url: str` 等 | `bytes` | PUT 请求 | | `patch(url, **kwargs)` | `url: str` 等 | `bytes` | PATCH 请求 | | `delete(url, **kwargs)` | `url: str` 等 | `bytes` | DELETE 请求 | | `send_request(method, url, **kwargs)` | `method: str` 等 | `Result` | 通用请求 | #### `HTTPTextClientPool` 返回文本响应的 HTTP 客户端。 #### `HTTPJsonClientPool` 返回 JSON 响应的 HTTP 客户端。 #### `Downloader` 流式文件下载器。 ```python from hagworm.extend.asyncio.net import HTTPClientPool, HTTPJsonClientPool, Downloader # 基本用法 client = HTTPClientPool(retries=3) body = await client.get("https://api.example.com/data") # JSON 客户端 json_client = HTTPJsonClientPool() data = await json_client.get("https://api.example.com/json") # 返回 dict/list # 文件下载 downloader = Downloader() with open("output.zip", "wb") as f: await downloader.fetch(f, "https://example.com/file.zip") await client.close() ``` --- ### hagworm.extend.asyncio.redis — Redis 连接管理 #### `RedisPool` / `RedisClusterPool` Redis 单机/集群连接池管理。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `open()` | 无 | `self` | 初始化连接池 | | `close()` | 无 | `None` | 关闭连接池 | | `set_key_prefix(value)` | `value: str` | `None` | 设置键前缀 | | `get_safe_key(key, *args, **kwargs)` | `key: str` 等 | `str` | 生成安全键名 | | `allocate_lock(name, **kwargs)` | `name: str` 等 | `LuaLock` | 分配分布式锁 | | `get_obj(name)` | `name: str` | `Any` | 获取 Pickle 对象 | | `set_obj(name, value, seconds)` | `name: str` 等 | `None` | 存储 Pickle 对象 | #### `RedisDelegate` Redis 功能组件,简化初始化。 #### `ShareCache` 基于分布式锁的共享缓存,保证同一时刻业务逻辑只执行一次。 #### `PeriodCounter` 周期计数器,基于 Redis 的 incrby/decrby。 ```python from hagworm.extend.asyncio.redis import RedisPool, RedisDelegate, ShareCache # 直接使用 redis = await RedisPool("localhost", 6379, max_connections=32).open() await redis.set("key", "value") val = await redis.get("key") # 使用 Delegate class MyService(RedisDelegate): async def init(self): await self.init_redis_single("localhost", 6379) async def get_data(self, key): return await self.redis_pool.get(key) # 共享缓存 async with ShareCache(redis, "expensive_result", lock_timeout=60) as cache: result = await cache.get() if result is None: result = await expensive_computation() await cache.set(result, expire=300) ``` --- ### hagworm.extend.asyncio.mysql — MySQL 连接管理 #### `MySQLPool` MySQL 连接池管理。 | 入参 | 类型 | 说明 | |------|------|------| | `host` | `str` | 主机 | | `port` | `int` | 端口 | | `db` | `str` | 数据库名 | | `user` | `str` | 用户名 | | `password` | `str` | 密码 | | `minsize` | `int` | 最小连接数 | | `maxsize` | `int` | 最大连接数 | | `readonly` | `bool` | 是否只读 | #### `MySQLClient` MySQL 客户端,支持 SQLAlchemy 查询构建器。 | 方法 | 入参 | 返回值 | 说明 | |------|------|--------|------| | `execute(query, args)` | `query: str, args` | `Cursor` | 执行 SQL | | `select(query)` | `query: Select` | `List` | 查询列表 | | `find(query)` | `query: Select` | `Any` | 查询单条 | | `count(column, where_clause)` | `column, where` | `int` | 计数 | | `insert(query)` | `query: Insert` | `int` | 插入,返回 insert_id | | `update(query)` | `query: Update` | `int` | 更新,返回影响行数 | | `delete(query)` | `query: Delete` | `int` | 删除,返回影响行数 | #### `MySQLTransaction` MySQL 事务对象,未显式 commit 则自动 rollback。 #### `MySQLDelegate` MySQL 功能组件。 ```python from hagworm.extend.asyncio.mysql import MySQLPool, MySQLClient, MySQLDelegate import sqlalchemy as sa # 直接使用 pool = MySQLPool("localhost", 3306, "mydb", "root", "password", minsize=8, maxsize=32) await pool.open() async with pool.get_client() as client: users = await client.select(sa.select(sa.table("users"))) user = await client.find(sa.select(sa.table("users")).where(sa.column("id") == 1)) # 事务 async with pool.get_transaction() as trx: await trx.insert(sa.insert(sa.table("users")).values(name="Alice")) await trx.insert(sa.insert(sa.table("logs")).values(action="create")) await trx.commit() # 不调用则自动 rollback # 使用 Delegate class MyService(MySQLDelegate): async def init(self): await self.init_mysql_rw("localhost", 3306, "mydb", "root", "password") await self.init_mysql_ro("localhost", 3307, "mydb", "reader", "password") ``` > ⚠️ **易混淆点**:`readonly=True` 的连接池调用 `insert`/`update`/`delete` 会抛出 `MySQLReadOnlyError`。 --- ### hagworm.extend.asyncio.mongo — MongoDB 连接管理 #### `MongoPool` MongoDB 连接池管理。 ```python from hagworm.extend.asyncio.mongo import MongoPool, MongoDelegate pool = MongoPool( host=["mongo1:27017", "mongo2:27017"], username="admin", password="secret", min_pool_size=8, max_pool_size=32 ) db = pool.get_database("mydb") collection = db["users"] await collection.insert_one({"name": "Alice"}) ``` --- ### hagworm.extend.asyncio.mail — SMTP 邮件 #### `SMTPClient` 异步 SMTP 邮件客户端。 ```python from hagworm.extend.asyncio.mail import SMTPClient, EmailBody client = SMTPClient("user@gmail.com", "password", "smtp.gmail.com", 587) # 简单发送 await client.send( sender="sender@example.com", recipients=["receiver@example.com"], subject="Hello", content="

Hi there!

" ) # 带附件或自定义消息 message = EmailBody.create_message( sender="sender@example.com", recipients=["receiver@example.com"], subject="Report", content="

Please find the report attached.

" ) await client.send_message("sender@example.com", ["receiver@example.com"], message) ``` --- ### hagworm.extend.asyncio.socket — TCP/Unix Socket #### `AsyncTcpServer` 异步 TCP 服务器。 ```python from hagworm.extend.asyncio.socket import AsyncTcpServer async def handle_connection(reader, writer): while True: data = await reader.readline() if not data: break writer.write(data) await writer.drain() writer.close() await writer.wait_closed() server = AsyncTcpServer(handle_connection, ("0.0.0.0", 8080)) server.run() ``` #### `RobustConnection` 自动重连的 TCP 客户端连接。 #### `UnixSocketServer` / `UnixSocketClient` Unix Domain Socket 服务端与客户端,使用 msgpack + base64 编码通信。 --- ### hagworm.extend.asyncio.command — 多进程命令框架 #### `MainProcess` / `SubProcess` 异步多进程框架,主进程管理子进程的生命周期。 ```python from hagworm.extend.asyncio.command import MainProcess, SubProcess class MyWorker(SubProcess): async def initialize(self): print(f"Worker {self._process_id} initializing...") async def _execute(self): # 业务逻辑 pass async def release(self): print(f"Worker {self._process_id} releasing...") class MyMain(MainProcess): async def initialize(self): print("Main process initializing...") async def release(self): print("Main process releasing...") MyMain(MyWorker.create, sub_process_num=4).run() ``` --- ### hagworm.extend.asyncio.transaction — 异步事务 #### `Transaction`(异步版) 异步事务对象,支持同步和异步回调。 ```python from hagworm.extend.asyncio.transaction import Transaction async with Transaction() as trx: trx.add_commit_callback(lambda: print("committed")) trx.add_commit_callback(async_cleanup) await trx.commit() ``` --- ### hagworm.extend.asyncio.file — 文件加载器 #### `FileLoader` 带缓存的网络/本地文件加载器。 ```python from hagworm.extend.asyncio.file import FileLoader loader = FileLoader(maxsize=100, ttl=3600, thread=32) content = await loader.read("/path/to/file") html = await loader.fetch("https://example.com/page.html") ``` --- ### hagworm.extend.asyncio.pool — 通用对象池 #### `ObjectPool` 通用异步对象池,需子类实现 `_create()` 方法。 ```python from hagworm.extend.asyncio.pool import ObjectPool, ObjectInterface class MyConnection(ObjectInterface): async def open(self): self.conn = await create_connection() async def close(self): await self.conn.close() class MyConnectionPool(ObjectPool): def _create(self): return MyConnection() pool = MyConnectionPool(maxsize=10) await pool.open() async with pool.get() as conn: # 使用连接 pass await pool.close() ``` --- ### hagworm.frame.fastapi — FastAPI 集成 #### `create_fastapi` 创建 FastAPI 应用,自动配置日志、uvloop、中间件和异常处理。 ```python from hagworm.frame.fastapi.base import create_fastapi, uvicorn_run app = create_fastapi( log_level="INFO", log_file_path="/var/log/app", debug=False, ) if __name__ == "__main__": uvicorn_run(lambda: app, port=8080) ``` #### `APIRouter` 支持末尾斜杠兼容的路由器。 #### `Request` 增强的请求对象,提供 `client_ip`、`referer` 等便捷属性。 #### `Response` / `ErrorResponse` 统一响应格式:`{"code": 0, "data": ..., "trace_id": ...}` / `{"code": -1, "error": ..., "trace_id": ...}`。 #### `BaseModel` / `Depends` 增强的 Pydantic 模型,支持 `params` 方法用于 FastAPI 依赖注入。 #### `field` — 自定义字段类型 提供 `IDCardType`、`BankCardType`、`UUIDType`、`EmailType`、`URLType`、`IPv4Type`、`IPv6Type`、`JsonType`、`Base64Type`、`StrListType`、`IntListType`、`FloatListType` 等验证字段。 ```python from hagworm.frame.fastapi.base import APIRouter, Request from hagworm.frame.fastapi.response import ErrorResponse from hagworm.frame.fastapi.field import EmailType, IntListType from hagworm.frame.fastapi.model import BaseModel, Depends router = APIRouter() class QueryModel(BaseModel): email: EmailType ids: IntListType @router.get("/users") async def get_users(request: Request): return request.client_ip @router.get("/error") async def error(request: Request): raise ErrorResponse(-1, "something went wrong", 400) @router.get("/search") async def search(params: Depends(QueryModel)): # params.email 已验证为合法邮箱 # params.ids 已解析为整数列表 return {"email": params.email, "ids": params.ids} ``` --- ### hagworm.frame.gunicorn — Gunicorn Worker #### `Worker` 基于 `uvicorn-worker` 的 Gunicorn Worker 类。 ```python # gunicorn.conf.py worker_class = "hagworm.frame.gunicorn.Worker" workers = 4 bind = "0.0.0.0:8080" ``` --- ### hagworm.frame.stress_tests — 压力测试框架 #### `Launcher` / `RunnerAbstract` 多进程压力测试框架,自动收集统计数据。 ```python from hagworm.frame.stress_tests import Launcher, RunnerAbstract, TimerMS class MyRunner(RunnerAbstract): async def _execute(self): for _ in range(100): with TimerMS() as timer: await Utils.sleep(Utils.randint(10, 99) / 1000) if Utils.rand_hit([True, False], [80, 20]): await self.success("API", timer.value) else: await self.failure("API", timer.value) Launcher(MyRunner.create, sub_process_num=4).run() ``` --- ### hagworm.third.grpc — gRPC 框架 #### `GRPCServer` gRPC 服务端,使用 msgpack 序列化。 #### `Router` gRPC 路由器,支持装饰器注册方法。 #### `GRPCClient` gRPC 客户端,支持 RoundRobin 负载均衡。 #### `GRPCBridge` gRPC 网桥,实现请求转发。 #### `GRPCMainProcess` / `GRPCWorker` gRPC 多进程框架。 ```python from hagworm.third.grpc.server import Router, StreamHandler from hagworm.third.grpc.bridge import GRPCMainProcess, GRPCWorker from hagworm.third.grpc.client import GRPCClient # 服务端 router = Router("demo") @router.unary_unary async def echo(request, context): return {"echo": request} @router.stream_stream async def stream_echo(request, context): await StreamEchoHandler(request, context).join() GRPCMainProcess(("0.0.0.0", 8080), [router], GRPCWorker, 2).run() # 客户端 client = GRPCClient() await client.open(["127.0.0.1:8080"]) result = await client.unary_unary("/demo/echo", {"msg": "hello"}) ``` --- ### hagworm.third.rabbitmq — RabbitMQ 消息队列 #### `RabbitMQConsumer` / `RabbitMQConsumerForExchange` RabbitMQ 消费者。 #### `RabbitMQProducer` / `RabbitMQProducerForExchange` RabbitMQ 生产者。 #### `RabbitMQProducerPool` / `RabbitMQProducerForExchangePool` RabbitMQ 生产者连接池。 #### `RpcServer` / `RpcClient` 基于 RabbitMQ 的 RPC 实现,支持 Unary 和 Stream 两种模式。 ```python from hagworm.third.rabbitmq import create_connection from hagworm.third.rabbitmq.publish import RabbitMQProducerForExchange from hagworm.third.rabbitmq.consume import RabbitMQConsumerForExchange # 创建连接 connection = create_connection("amqp://guest:guest@localhost/") await connection.connect() # 生产者 producer = RabbitMQProducerForExchange(connection, "my_exchange") await producer.open(exchange_type=aio_pika.ExchangeType.TOPIC) await producer.publish(b"hello", "routing_key") # 消费者 async def handle_message(message: aio_pika.IncomingMessage): print(message.body) await message.ack() consumer = RabbitMQConsumerForExchange(connection, "my_queue", "my_exchange") await consumer.open(consume_func=handle_message, routing_key="routing_key") ``` --- ### hagworm.third.nacos — Nacos 服务发现 #### `NacosConfig` Nacos 配置中心客户端,支持长轮询监听配置变更。 #### `NacosInstanceRegister` Nacos 服务实例注册,自动发送心跳。 #### `NacosInstanceQuery` Nacos 服务实例查询,监听实例变更。 #### `Configure`(Nacos 版) 从 Nacos 配置中心加载配置。 ```python from hagworm.third.nacos.client import NacosConfig, NacosInstanceRegister, NacosInstanceQuery from hagworm.third.nacos.config import Configure # 配置中心 def on_config_change(content): print(f"Config updated: {content}") nacos_config = NacosConfig( on_config_change, "localhost:8848", "my-app.yaml", group="DEFAULT_GROUP", namespace="dev" ) nacos_config.start() # 服务注册 register = NacosInstanceRegister( "localhost:8848", "my-service", "10.0.0.1", 8080, namespace="dev" ) register.start() # 服务发现 query = NacosInstanceQuery("localhost:8848", "my-service", namespace="dev") host = query.get_host() # 按权重随机选择实例 ``` --- ## 完整示例 ### FastAPI 项目示例 参见 `example/fastapi_demo/` 目录,包含完整的 FastAPI 项目结构: - [main.py](example/fastapi_demo/main.py) — 应用入口 - [setting.py](example/fastapi_demo/setting.py) — 配置定义 - [controller/home.py](example/fastapi_demo/controller/home.py) — 路由控制器 ### gRPC 示例 参见 `example/grpc_demo/` 目录: - [server.py](example/grpc_demo/server.py) — gRPC 服务端 - [client.py](example/grpc_demo/client.py) — gRPC 客户端 ### TCP 服务示例 参见 [example/main_tcp.py](example/main_tcp.py): ```python from hagworm.extend.asyncio.base import Utils from hagworm.extend.asyncio.socket import AsyncTcpServer async def connection_handle(reader, writer): while True: request = await reader.readline() if not request: break writer.write(request) await writer.drain() writer.close() await writer.wait_closed() server = AsyncTcpServer(connection_handle, ("0.0.0.0", 8080)) server.run() ``` ### RabbitMQ 示例 参见 [example/main_rabbitmq.py](example/main_rabbitmq.py): ```python from hagworm.third.rabbitmq import create_connection from hagworm.third.rabbitmq.publish import RabbitMQProducerForExchange from hagworm.third.rabbitmq.consume import RabbitMQConsumerForExchange connection = create_connection("amqp://guest:guest@localhost/") await connection.connect() # 生产者 producer = RabbitMQProducerForExchange(connection, "test_exchange") await producer.open(exchange_type=aio_pika.ExchangeType.TOPIC) await producer.publish(b"hello", "test_0") # 消费者 async def on_message(message): print(message.body) await message.ack() consumer = RabbitMQConsumerForExchange(connection, "test_queue", "test_exchange") await consumer.open(routing_key="test_0") await consumer.block_pull(on_message) ``` ### 压力测试示例 参见 [example/main_test.py](example/main_test.py): ```python from hagworm.frame.stress_tests import Launcher, RunnerAbstract, TimerMS class Runner(RunnerAbstract): async def _execute(self): for index in range(5): with TimerMS() as timer: await Utils.sleep(Utils.randint(10, 99) / 1000) if Utils.rand_hit([True, False], [50, 50]): await self.success(f"Test{index}", timer.value) else: await self.failure(f"Test{index}", timer.value) Launcher(Runner.create, 5).run() ``` --- ## 单元测试 运行测试: ```bash cd testing pytest ``` 测试覆盖模块: - `test_extend_base.py` — 基础工具、异常处理、上下文管理器、FuncWrapper - `test_extend_cache.py` — StackCache 缓存 - `test_extend_struct.py` — KeyLowerDict 数据结构 - `test_extend_asyncio_base.py` — 异步工具、ShareFuture、FuncCache、AsyncCirculatory、事务 - `test_extend_asyncio_future.py` — ThreadWorker 线程转协程 - `test_extend_asyncio_task.py` — MultiTasks 并发、RateLimiter 限流 - `test_extend_asyncio_net.py` — HTTP 客户端 --- ## 许可证 Apache-2.0