python 基础语法 编码 1 2 3 4 5 gb2312:专门为中文设计的编码 gbk:gb2312的扩展 ascii :英文和符号 一个字节unicode:所有语言设计的编码 大于等于两个字节 缺点:像英文这种符号也变得占空间 utf-8 :动态编码 长度不固定
优先级 1 2 3 4 5 6 7 8 () 指数运算符** 乘法*,除法/ //,取模% 加减法 赋值运算符= 比较运算符 <=, <, >, >=, ==, != 赋值运算符:=, %=, /=, //=, -=, +=, *=, **= 逻辑运算符:not, or, and(& | ^是位运算符)
除
变量 引用 id() 在python中,值是靠引用来传递来的。
我们可以用id()来判断两个变量是否为同一个值的引用。我们可以将id值理解为那块内存的地址标 识。
不可变类型 整型、浮点、字符串、元组都是不可变类型
1 2 3 4 5 6 7 8 a = 1 b = a print(id(a)) print(id(b)) # 两个地址相同 a=2 print(id(a)) print(id(b)) # 两个地址不同
可变类型 列表、字典、集合都是可变类型
1 2 3 4 5 6 7 8 a = [1 ,2 ] b = a print (id (a))print (id (b)) a.append(5 ) print (id (a))print (id (b))
引用当实参 1 2 3 4 5 6 7 8 9 def test (a ): print (id (a)) a+=a print (id (a)) b=1 print (id (b)) test(b)
1 2 3 4 5 6 7 8 9 def test (a ): print (id (a)) a+=a print (id (a)) b=[1 ,3 ] print (id (b)) test(b)
全局变量global 1 2 3 4 5 在Python中,对全局变量进行赋值操作时(特别是对不可变变量修改值时,内存地址会变,所以要加global),需要在函数内部用global声明 明确告诉阅读代码的人,这里修改的是全局变量 避免混淆,特别是在复杂的代码中
1 2 3 4 5 6 7 8 9 10 11 epsg_cache = None # 模块级全局变量 def example1(): # 仅仅读取全局变量的值,不需要global if epsg_cache: print(epsg_cache) def example2(): # 需要修改全局变量,必须用global声明 global epsg_cache epsg_cache = "new value" # 重新赋值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import multiprocessingimport osimport timeg_list = [] def add_data (): for i in range (5 ): g_list.append(i) time.sleep(0.3 ) print ("add完成:" ,g_list) def read_data (): print ("read:" ,g_list) if __name__ == '__main__' : main_pid = os.getpid() process_name = multiprocessing.current_process() print (f"当前进程id:{main_pid} " ,f"进程:{process_name} " ) sub_process1 = multiprocessing.Process(target=add_data) sub_process2 = multiprocessing.Process(target=read_data) sub_process1.start() sub_process1.join() print ("主进程g_list:" ,g_list) sub_process2.start()
交换变量的值 1 2 3 4 5 6 7 8 9 10 11 12 复杂版: a=1 b=2 c=a a=b b=c 简洁版: a,b = 1,2 b,a = a,b
数据类型转换 int()
float()
str()
list()
set()
eval():将python中的有效表达式转换成原本类型
tuple():将序列转换成元组 tuple([1,2])
repr():将对象转换成字符串
chr():将整数转换成unicode字符
ord():将一个字符转换成它的ascii码
hex():将整数转换成十六进制字符串
1 2 3 4 5 6 7 8 str1 = '1' eval () eval (str1) str2 = '1.1' eval (str2) str3 = '(1,2,3)' eval (str3) str4 = '[1,2,3]' eval (str4)
字节数据(待)
1 2 3 4 5 # 你的字节字符串数据 data = b'{"unitName": "CD701_LS6C3G0Y6RA400437_2025-10-09_21-34-22", "unitIndex": "", "dataversion": "v0.0.5.1", "vehicleType": "CD701", "vehicleId": "LS6C3G0Y6RA400437", "tileId": [], "dataGenerationTime": 1769998914, "dataDurationTime": null, "basicInfo": ["Collect_dat", "Hd_Map"], "sceneTags": [{"labelCode": "Wet_road", "labelSignalValue": "", "labelPositionCoordinates": "", "labelStartTime": 1760016863780, "labelEndTime": 1760016920780}]}' # 解码字节字符串并转换为字典 data_dict = json.loads(data.decode('utf-8'))
1 2 如果你确定数据总是 UTF-8 编码,也可以使用更简洁的方式: data_dict = json.loads(data) # json.loads 可以直接处理字节字符串
for else 循环执行完会执行else。循环未完成break结束,不会执行else
1 2 3 4 5 for i in range (5 ): break else : print (1 )
1 2 3 4 5 for i in range (5 ): pass else : print (1 )
range 1 2 3 range(1,13) 1-12 range(1,13,3) 1 4 7 10
isinstance和type isinstance(i, dict)
match语句 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 try : inp = int (input ("请输入功能序号:" )) except : inp = None match inp: case 1 : clean_screen() add_info() clean_screen() case 2 : clean_screen() del_info() clean_screen() case 3 : clean_screen() modify_info() clean_screen() case 4 : clean_screen() search_info() clean_screen() case 5 : clean_screen() show_all_info() clean_screen() case 6 : clean_screen() exit() case _: clean_screen() print ("输入功能需要有误,请重新输入!" ) time.sleep(3 ) clean_screen()
前向引用 1 2 3 4 5 6 7 8 这是Python的前向引用(Forward Reference),当类型在定义时还未完全定义时使用引号包裹 实际使用场景: 返回同模块后面定义的类 返回自身类(在类方法中) 是另一个还未导入的模块中的类 @classmethod def get_cos_util(cls, bucket_name: Optional[str] = None) -> 'COSBrowser':
三元运算符
pass等同于… 1 2 3 4 5 6 pass等同于... def fun(): pass def fun(): ...
\ 1 2 3 4 if agent_ts_idx - ego_ts_idx > agent_ahead_frame or \ ego_ts_idx - agent_ts_idx > ego_ahead_frame or \ agent_polygon.distance(ego_polygon) > IOU_MAX_DISTANCE: ego_agent_iou_map[ego_ts_idx][agent_ts_idx] = 0.0
\对应windows目录符号,同时也是python转义字符
/linux目录符号
debug print(xx,end=’\n’) # print默认结束符是\n
input(“xxxx”) # input会把输入的任何数据当成字符串
assert 1 2 3 assert语句用于在代码中设置断言,即声明某个条件必须为真。如果条件为假,则程序会抛出 AssertionError异常并终止执行。 assert 条件, "错误提示信息"
1 2 3 4 5 6 7 8 args = parse_args() assert args.out or args.eval or args.format_only or args.show \ or args.show_dir, \ ('Please specify at least one operation (save/eval/format/show the ' 'results / save the results) with the argument "--out", "--eval"' ', "--format-only", "--show" or "--show-dir"')
-O 1 2 3 4 5 6 7 8 9 10 11 12 13 在正式生产环境中,有时会使用 -O(优化)标志运行 Python,这会禁用所有 assert语句。因此,对于关键性检查,通常建议使用 if条件判断并手动抛出异常(如 ValueError)。 在开发和调试阶段,assert是非常有用的工具。 # test.py def calculate(x): assert x > 0, "x必须大于0" return x * 2 print(calculate(5)) # 使用 -O 参数,assert会被忽略 python -O test.py # 输出: 10(即使传入负数也不会触发断言)
推导式 使用范围列表、字典、集合
列表推导式(列表生成式) 1 2 3 l = [i*i for i in range(5) if i%2==0] l = [(i,n) for i in range(1) for n in range(1)]
字典推导式 1 2 3 4 d = {i:i**2 for i in range (1 ,5 )} d_info = {'MBP' :268 ,'HP' :80 ,'DELL' :500 ,'Lenovo' :800 } l = {key:value for key,value in d_info.items() if value>200 }
合并两个列表为字典 1 2 3 4 5 l1 = ["name","age","gender"] l2 = ["tom",28,'male'] d = {l1[i]:l2[i] for i in range(len(l1))} # 或者直接使用zip()函数
集合推导式 1 s = {i**2 for i in range(5)}
生成器generator 在自动化测试框架的夹具那一章节涉及生成器yield
1 2 3 4 5 6 7 8 def fun (): print (xxx) yield print (xxx) generate_obj = fun() generate_obj.next () generate_obj.next ()
next()和.send() 恢复执行,yield之前的变量还可以使用
案例 1 2 3 4 5 6 7 8 def generator (n ): for i in range (n): yield i g = generator(100 ) for v in g: print (v)
模块、包、库(待) .py module模块 单个py文件就是一个模块
模块不能以数字开头,否则其它模块不能import(不管是import xx还是from xx import xx都不行)
main 主模块 程序入口的模块叫主模块,最好使用if –name– == “–main–”:
它的作用:
防止别人导入文件的时候执行main里面的代码
防止windows系统无限递归创建子进程
–name– name的两个核心值
当直接运行 某个 Python 文件时,这个文件就是程序的”入口点”,它的 __name__被设置为 "__main__"。
1 2 3 4 5 6 7 print (f"我是 {__name__} " )$ python script.py
当某个 Python 文件被其他文件导入 时,它只是一个普通模块,它的 __name__被设置为其模块名 (通常是文件名去掉 .py后缀)
1 2 3 4 5 6 7 8 9 10 11 12 print (f"module_a: 我是 {__name__} " )import module_aprint (f"module_b: 我是 {__name__} " )$ python module_b.py
1 2 3 4 5 6 7 def add (a, b ): return a + b result = add(2 , 3 ) print (f"测试结果: {result} " )
1 2 3 4 5 6 7 8 9 10 11 12 13 def add (a, b ): return a + b def multiply (a, b ): return a * b if __name__ == "__main__" : print ("正在运行测试..." ) print (f"2 + 3 = {add(2 , 3 )} " ) print (f"2 * 3 = {multiply(2 , 3 )} " )
导入模块 1 2 3 4 5 6 7 8 import 模块名 # from 路径 import 模块名 import 模块名1,模块名2,.... from [路径.]模块名 import 功能1,功能2,功能3,... # 可以是函数,类,变量等 from [路径.]模块名 import * # 使用别名 import 模块名 as 别名 # import 模块名.功能 是错误不行的 from 模块名 import 功能 as 别名
模块搜索顺序 当导入一个模块,Python解析器对模块位置的搜索顺序是: 1.当前目录 2.如果不在当前目录,Python则搜索在shell变量PYTHONPATH下的每个目录。 3.如果都找不到,Python会察看默认路径。UNIX下,默认路径一般为/usr/local/lib/python/含__init__.py文件的就是包,还有一些模块文件.py
模块搜索路径存储在 :system模块的sys.path变量中。变量里包含当前目录,PYTHONPATH和由安装过程决定的默认目录。
注意:
–all– 如果一个模块文件中有–all–列表,当使用from xxx import *导入时,只能导入这个列表中的元素。(其它的导入方式,如直接导入模块、导入具体功能还是能够导入其它非列表里面的)
init .py package包包是将有联系的模块组织在一起,即放到同一个文件夹下,并且在这个文件夹创建一个名字为–init–.py文件,那么这个文件夹就称之为包。
1 2 3 4 5 6 7 8 9 10 11 my_project/ ├── my_package/ │ ├── __init__.py │ ├── module_a.py │ └── module_b.py └── main.py
1 2 3 4 5 6 7 8 9 10 11 12 13 在 __init__.py 中(如果它内容为空,还是可以from 包名 import 模块): from .module_a import function_a from .module_b import function_b 在 main.py 中: # main.py from my_package import function_a, function_b # 直接从包名导入模块里的具体东西 function_a() function_b()
作用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 标记目录为 Python 包 如果没有 __init__.py,Python 不会将目录识别为包。无法通过from 包 import 模块里的具体东西 初始化代码 可以在 __init__.py 中执行初始化操作,例如: # __init__.py def init_db(): print("数据库初始化") init_db() 简化 API 接口 将常用模块或函数集中导入到 __init__.py,方便用户直接调用。 控制通配符导入 使用 __all__ 定义允许导入的模块或变量: # __init__.py __all__ = ['function_a', 'function_b'] 延迟加载 按需导入较大的模块以提高性能: # __init__.py def get_large_module(): from .large_module import LargeClass return LargeClass()
案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #!/usr/bin/env python # -*- coding: utf-8 -*- """ dataengine 包初始化文件 """ from .xxx import xxx from .xxx import xxx __version__ = "1.0.0" __author__ = "liumin" __all__ = [ "xx", "xxx" ] __description__ = "提供本地存储和COS存储的统一接口封装"
1 2 3 4 5 6 7 8 导入时更方便 # 没有 __init__.py 时的导入 from data_engine.data_engine import DataEngine from data_engine.storage import IStorage from data_engine.cos_storage import COSStorage # 有 __init__.py 时的导入(更简洁) from data_engine import DataEngine, IStorage, COSStorage
导入包 1 2 3 4 5 6 7 8 9 10 11 12 import 包名.模块名使用:包名.模块名.目标 from [路径.]包名 import *from [路径.]包名 import 具体内容from [路径.]包名.模块 import 具体内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from .data_engine import DataEnginefrom .storage import IStoragefrom .local_storage import LocalStoragefrom .cos_storage import COSStoragefrom .cos_factory import CosFactoryfrom .cos_browser import COSBrowserfrom .exceptions import StorageError,IOManagerError,FileNotFoundError,PermissionError,UnsupportedProtocolError__version__ = "1.0.0" __author__ = "xxxxx" __all__ = [ "IStorage" , "LocalStorage" , "COSStorage" , "CosFactory" , "DataEngine" , 'COSBrowser' ] __description__ = "xxxxxx"
–all– 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # module.py def public_func(): return "public" def _private_func(): return "private" class PublicClass: pass class _PrivateClass: pass __all__ = ['public_func', 'PublicClass'] # 只导出这两个 # 另一个文件 from module import * # 现在只会导入: public_func 和 PublicClass # _private_func 和 _PrivateClass 不会被导入 ✅
library库 库是多个包和模块的集合,提供特定领域的功能。Python 的库分为标准库和第三方库。
with 数据库 1 2 3 with是一个 Python 的关键字,用于上下文管理器。 自动资源管理:当你用 with语句打开一个数据库连接(这里是通过 connection.cursor()获取一个游标)时,它会确保在代码块执行完毕后,无论是否发生异常,都会自动、正确地关闭游标,释放相关资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 connection = context.resources.database with connection.cursor() as cursor: cursor.execute(query) 等同于 connection = context.resources.database cursor = connection.cursor() # 手动获取游标 try: # 在这里执行你的数据库操作... # cursor.execute(...) pass finally: cursor.close() # 无论如何,最后都要手动关闭游标
1 2 3 4 5 6 7 import sqlite3with sqlite3.connect('database.db' ) as conn: cursor = conn.cursor() cursor.execute('SELECT * FROM users' ) results = cursor.fetchall()
文件资源管理 1 2 with open('xxx','xxx') as f: xxxx
异常捕获 except: 大多数时候用的都是Exception这个异常。但是像键盘按键终止程序、sys.exit(1)是捕获不到的,except:可以
1 2 3 4 5 6 7 8 9 BaseException ├── Exception ✅ 你抓的是这一类 │ ├── ValueError │ ├── TypeError │ ├── ZeroDivisionError │ └── ... ├── SystemExit ❌ 抓不到 ├── KeyboardInterrupt❌ 抓不到 └── GeneratorExit ❌ 抓不到
捕获单个异常 1 2 3 4 5 6 7 8 9 try: pass except Exception as e: context.log.error(f"数据分组失败: {str(e)}") raise else: pass # 如果没有发生异常,执行此块 finally: pass # 无论是否发生异常,都会执行 如:关闭文件等
1 2 3 4 5 6 7 8 def learn_python(): try: a = 1/0 except Exception as e: print(str(e)) ZeroDivisionError: division by zero
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def learn_python(): try: a = 1/0 except Exception as e: raise e # raise 差不多,前者报错多当前这行 Traceback (most recent call last): File "/workspace/production_system/tag_test/test2.py", line 293, in <module> learn_python() File "/workspace/production_system/tag_test/test2.py", line 78, in learn_python raise e File "/workspace/production_system/tag_test/test2.py", line 75, in learn_python a = 1/0
捕获多个异常 捕获多个用元组,捕获所有常见的异常用Exception。
1 2 3 4 5 6 7 8 try: pass except (xxx,xxx) as e: pass else: pass # 如果没有发生异常,执行此块 finally: pass # 无论是否发生异常,都会执行
捕获异常描述信息 错误类型冒号后面的描述信息,直接打印就可以了
1 2 3 4 5 6 7 8 try: pass except (xxx,xxx) as e: print(e) else: pass # 如果没有发生异常,执行此块 finally: pass # 无论是否发生异常,都会执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def learn_python (): try : d = {"a" :1 ,"b" :False } l = [] d["c" ] l[1 ] except (IndexError,KeyError) as e: raise e Traceback (most recent call last): File "/workspace/production_system/tag_test/test2.py" , line 295 , in <module> learn_python() File "/workspace/production_system/tag_test/test2.py" , line 80 , in learn_python raise e File "/workspace/production_system/tag_test/test2.py" , line 77 , in learn_python d["c" ] ~^^^^^ KeyError: 'c' print (e)结果为:'c'
e.get_origin_msg() 获取整个错误信息包括:错误位置、代码、错误类型、错误描述
1 2 3 4 except CosServiceError as e: downLoad_dir_from_cos_json['msg'] = e.get_error_msg() e.get_origin_msg() downLoad_dir_from_cos_json['response'] = e
1 2 3 4 5 Traceback (most recent call last): File "d:\code\PYTHON\src\继承\私有属性.py", line 51 , in <module> print (e.get_error_msg()) ^^^^^^^^^^^^^^^ AttributeError: 'ZeroDivisionError ' object has no attribute 'get_error_msg '
异常传递 异常可以嵌套,而且内层raise的异常,外层可以捕获
1 2 3 4 5 6 7 try : try : 1 /0 except : raise except Exception as e: print (e.get_error_msg())
自定义异常 集成Exception自定义异常类,比如密码长度等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class ShortInputError (Exception ): def __init__ (self,length,min_len ): self .length = length self .min_len = min_len def __str__ (self ): return f'输入长度为{self.length} ,不能小于{self.min_len} ' try : con = input () if len (con)<3 : raise ShortInputError(len (con),3 ) except : pass else : pass
转换异常类型 1 2 3 4 5 6 7 except (IndexError, KeyError) as e: raise ValueError(f"路径模板格式化失败: {path_template}, clip_name: {clip_name}") from e 用括号表示捕获这两种异常中的任意一种 将底层技术性异常(IndexError/KeyError)转换为更语义化的业务异常(ValueError) 使用 from e保留原始异常链,便于调试时查看完整堆栈(有无from e其实效果一样的) 提供更友好的错误信息,包含相关上下文数据
raise 1 2 3 4 #等同于raise e或者说这里没有Exception as e也是可以直接raise的 except Exception as e: print(f"Error getting JSON object from {data_path}: {e}") raise
数据流 流和块 1 2 3 4 5 6 7 8 9 10 11 流: 以字节为最小单位,数据按顺序连续传输,常用于逐个字符或字节处理的场景,如文件逐行读取、网络套接字通信等。 通常顺序访问,只能从头到尾依次读取,不能直接跳转到任意位置(除非底层支持seek)。 数据在传输过程中可即时处理,适合实时数据(如传感器、HTTP响应流)。 块: 以固定大小的数据块为单位(如4KB、8KB),一次传输多个字节,适合批量数据处理,如磁盘I/O、数据库分页读取。 支持随机访问,可直接跳到指定块偏移位置进行读写,效率高 数据通常先缓存到内存,再整体处理,适合需要多次访问或复杂计算的场景。 用流:当数据量大、需节省内存、实时处理时(如日志监控、网络流)。 用块:当需要高效随机访问或批量处理时(如数据库文件、视频解码)。
1 2 3 4 5 6 7 8 9 # 流式读取(逐行处理,节省内存) with open('large_file.txt', 'r', encoding='utf-8') as f: for line in f: # 每次只读取一行 process(line) # 块读取(一次读取固定大小字节) with open('large_file.txt', 'rb') as f: while chunk := f.read(4096): # 每次读取4KB process(chunk)
流
numpy 1 2 float(np.sqrt(np.power(agent.vel[0], 2) + np.power(agent.vel[1], 2)))
pandas 1 2 3 4 5 import pandas as pd df1 = pd.read_csv() df2 = pd.read_excel() 如何读取某一列,直接使用df取列的方法就行
1 2 3 4 5 6 7 8 from io import StringIO,BytesIO data = """ CD701_LS6C3G0Y2RA400869_2025-09-14_20-39-34 Wet_road 1757853574346 1757853632426 CD701_LS6C3G0Y6RA401183_2025-09-17_08-06-56 ------------ ------------- --------- ----------- CD701_LS6C3G0Y2RA400922_2025-09-13_11-53-24 ------------ ------------- --------- ----------- CD701_LS6C3G0Y9RA401081_2025-09-30_02-10-59 Wet_road 1759169460779 1759169512559 """ df = pd.read_csv(StringIO(data), sep='\s+', header=None)
read_excel 1 2 3 bdata = skdata_dataengine.data_read("project-data-1329117257/spatiotemporal_data_application_datasets_auto/sub_dataset/lable/标签可视化分类.xlsx") excel_file = io.BytesIO(bdata) df = pd.read_excel(excel_file, sheet_name=1, header=None)
read_csv 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 def read_csv( filepath_or_buffer: FilePath | ReadCsvBuffer[bytes] | ReadCsvBuffer[str], *, sep: str | None | lib.NoDefault = ..., delimiter: str | None | lib.NoDefault = ..., header: int | Sequence[int] | None | Literal["infer"] = ..., names: Sequence[Hashable] | None | lib.NoDefault = ..., index_col: IndexLabel | Literal[False] | None = ..., usecols: UsecolsArgType = ..., dtype: DtypeArg | None = ..., engine: CSVEngine | None = ..., converters: Mapping[Hashable, Callable] | None = ..., true_values: list | None = ..., false_values: list | None = ..., skipinitialspace: bool = ..., skiprows: list[int] | int | Callable[[Hashable], bool] | None = ..., skipfooter: int = ..., nrows: int | None = ..., na_values: Hashable | Iterable[Hashable] | Mapping[Hashable, Iterable[Hashable]] | None = ..., na_filter: bool = ..., verbose: bool | lib.NoDefault = ..., skip_blank_lines: bool = ..., parse_dates: bool | Sequence[Hashable] | None = ..., infer_datetime_format: bool | lib.NoDefault = ..., keep_date_col: bool | lib.NoDefault = ..., date_parser: Callable | lib.NoDefault = ..., date_format: str | dict[Hashable, str] | None = ..., dayfirst: bool = ..., cache_dates: bool = ..., iterator: Literal[True], chunksize: int | None = ..., compression: CompressionOptions = ..., thousands: str | None = ..., decimal: str = ..., lineterminator: str | None = ..., quotechar: str = ..., quoting: int = ..., doublequote: bool = ..., escapechar: str | None = ..., comment: str | None = ..., encoding: str | None = ..., encoding_errors: str | None = ..., dialect: str | csv.Dialect | None = ..., on_bad_lines=..., delim_whitespace: bool | lib.NoDefault = ..., low_memory: bool = ..., memory_map: bool = ..., float_precision: Literal["high", "legacy"] | None = ..., storage_options: StorageOptions = ..., dtype_backend: DtypeBackend | lib.NoDefault = ..., ) -> TextFileReader:
1 2 3 4 5 6 7 8 from io import StringIO space_data = """name age city Alice 25 Beijing Bob 30 Shanghai""" # 正则表达式匹配一个或多个空格 df = pd.read_csv(StringIO(space_data), sep='\s+', engine='python')
1 2 3 4 5 6 7 8 engine='c'(默认) 用C语言编写,速度更快 功能相对有限 支持基本的分隔符解析 engine='python' 用Python编写,速度较慢 功能更丰富 支持复杂的正则表达式分隔符
1 2 3 4 5 不指定列名(自动生成数字列名) df = pd.read_csv('data.csv', header=None) # 指定自定义列名 df = pd.read_csv('data.csv', header=None, names=['A', 'B', 'C'])
打印列不要行标签 1 print(r.iloc[:,get_index].to_string(index=False))
1 2 3 直接遍历Series的值 for value in df['A']: print(value)
1 2 3 转换为列表后遍历 for value in df['A'].tolist(): print(value)
1 2 3 4 5 6 7 8 9 使用values或to_numpy() # 方法3: 用numpy数组 for value in df['A'].values: print(value) # 或 for value in df['A'].to_numpy(): print(value)
1 print('\n'.join(map(str, df['A'])))
set 集合没有顺序,不支持使用下标取值。值不重复,所以有去重功能
set() 创建空集合可以用s=set(),不能用空{},因为空字典是使用的{}
set(“dhfidhjfdi”) # 字符串里的单个字符作为集合元素 列表、元组也可以
1 2 3 4 >>> set(1,5,8) # 不可以这样创建集合 Traceback (most recent call last): File "<stdin>", line 1, in <module> TypeError: set expected at most 1 argument, got 3
.add() 只能追加单个数据,不能追加数据序列
1 2 3 4 5 6 7 8 9 10 tmp = set () tmp.add(1 ) print (my_set) my_set.add(2 ) print (my_set) my_set.add(1 ) print (my_set)
1 2 3 4 5 6 7 8 9 set ().add(1 ) my_set = set ().add(1 ) my_set = set () my_set.add(1 )
1 tmp.add([xxxx]) # 添加整个列表会报错 TypeError:unhashable type 'list'
.update() 追加数据序列,不能追加不可迭代数据
1 2 3 4 5 6 my_set = {1 , 2 , 3 } my_set.update([4 , 5 , 6 ]) print (my_set) my_set.update(100 )
.remove() 删除集合中的指定数据,如果数据不存在则报错。
.discard() 删除集合中的指定数据,如果数据不存在也不会报错。
.pop() 随机删除集合中的某个数据,并返回这个数据。
.clear() in和not in in:判断数据在集合序列
not in:判断数据不在集合序列
差操作 1 2 set().difference(set()) set() - set()
对称差集操作
交操作
并操作
dict 1 2 3 4 5 python 3.7+ 字典是有序的 .items() .keys() .values()
1 2 3 4 5 6 d = {"a":1,"b":False} for k,v in enumerate(d): print(k,v) # 打出来的顺序和之前一样 0 "a" 1 "b"
in和not in dict()和{}创建空字典 1 2 3 4 5 6 d1 = {} d2 = dict() >>> dict(name="ty",age=50 ) {'name': 'ty', 'age': 50 } {'name': 'ty'}
del/del() 1 2 3 4 5 6 7 8 # 删除字典或字典指定键值对(键值对不存在报错) d = {"a":1,"b":2} del d["a"] # 删除指定键值对 >>d {"b":2} del d # 删除整个字典
.clear() 1 2 d = dict() d.clear() # 清空字典键值对
.get() 和[] 查字典键值:
使用.get(“key”,”不存在使用该值值”) # 默认不存在返回None
或者 d[key]不存在报错
for feature in features:
navigation= feature['properties'].get('navigation',0)
.items() 1 2 3 4 5 6 7 8 9 10 items()的作用是把字典中的每对key和value组成一个元组,并把这些元祖放在列表中返回。 举个例子: d = {'a': 1, 'b': 2, 'c': 3, 'd': 4} d.items() dict_items([('a', 1), ('b', 2), ('c', 3), ('d', 4)]) for key,value in d.items(): print(key,value)
.keys() 返回一个关于key值类似列表的可迭代对象
.values() 返回一个关于value值类似列表的可迭代对象
拆包 1 2 3 4 5 d = {"name":"tom","age":18} a,b = d print(d[a]) print(d[b])
defaultdict 1 2 3 daily_groups = defaultdict(list) daily_groups[time_period].append({'data_name':dat_name,'sorce_path':sorce_path}) keys = list(daily_groups.keys())
循环生成键值 1 2 3 >>> d = { str(i):i for i in range(5)} >>> d {'0': 0, '1': 1, '2': 2, '3': 3, '4': 4}
数值作为key 1 2 3 4 5 6 7 8 9 DEFAULT_TYPE = -1 EGO_LENGTH = 5.0 EGO_WIDTH = 2.0 EPSILON = 10 ** -6 IOU_MAX_DISTANCE = 50.0 MAP_OBJECT_TYPE_LW = { "pedestrian": {'value': {'length': 0.8, 'width': 0.8}}, DEFAULT_TYPE:{'scale': {'length': 1.2, 'width': 1.0}}, }
len() 返回键值对个数
list list[n,m] 左闭右开
+ 乘*
in和not in 列表生成式 1 2 3 [i for i in x if i xxx] # 如果是圆括号则是生成器 # sum(1 for r in results if r["success"])
切片 1 file_infos = file_infos[::num]
list()和[] 创建空列表
或者list(可迭代对象) 集合、元组可以
.sort() 1 2 3 .sort(key=None,reverse=False) l = [] l.sort()
.reverse() .append() .extend() 将序列中的元素逐一增加到目标列表
[].extend(“hidiid”) 字符串中的元素逐一增加到列表
.insert() .count()
.index() 1 2 3 4 和字符串的.index()方法是一样的,不存在也报错 .index(值,开始位置,结束位置) # 结束位置是取不到的 .index(值) temp = group_keys.index(context.cursor)
del/del() 1 2 3 4 5 6 7 del xx 或者 del (xx)my_list = [] del my_list my_list = [0 ,1 ] del my_list[0 ]
.pop() 默认删除最后一个元素,或者指定下标,都返回被删除元素
.remove() 传入具体值删除
.clear() 清空列表
.copy() 是一个浅拷贝(和list([xxx])一样)
1 2 3 4 5 6 7 >>> l1 = [1,2,[1,4,5]] >>> l2 = l1.copy() >>> l2 [1, 2, [1, 4, 5]] >>> l1[2][0] = 1004 >>> l2 [1, 2, [1004, 4, 5]]
tuple 元组不可修改(但是里面有列表这种可变类型则可变),元组和列表都可以存储不同类型的数据
1 2 3 4 (1,2,3) (3,) # 单个元素要加逗号,否则为单个元素的类型 return dir_info,file_info
1 2 3 t = (1 ,[5 ,7 ,9 ]) t[1 ] = 0 t[1 ][1 ] = 500
+ 1 2 字符串、列表、元组支持直接+运算 dict、set不支持+运算
* del/del() 可以删除整个,但是具体元素不可以
in和not in tuple() 创建空元组
tuple([]) # 或者传入数据序列 列表或者集合
.index() 和字符串、列表的.index()一模一样
.count() 拆包 1 2 3 4 5 def fun (): return 1 ,5 t = fun() a,b = fun()
字符串 字符串不可变,涉及到修改的方法都是返回新的字符串
* + in和not in del/del() 切片 序列[开始下标位置:结束下标位置:步长] (左闭右开,结束位置取不到;下标和步长正数和负数都可以)
字符串、列表、元组都支持
1 2 3 4 5 6 7 8 9 10 11 12 13 14 str1 = "012345678" >> str1[2 :5 :2 ] 24 >> str1[::-1 ] 876543210 >>str1[-4 :-1 ] 567 >> str1[-4 :-1 :-1 ] >> str1[-1 :-4 :-1 ] 876
.encode()和.decode() 1 2 3 4 encode()函数用于将字符串按照指定的编码格式进行编码,返回一个bytes 类型的对象。默认的编码格式是'utf-8' 。 str .encode(encoding='utf-8' , errors='strict' )errors:指定不同的错误处理方案。默认是'strict' ,表示编码错误会引发一个UnicodeEncodeError。
1 str .decode(encoding='utf-8' )
格式化 1 2 3 %03d 数字对齐,最大3位,不足前面补零,超出的原样输出 %s 原本是格式化字符串的,但是整形或者浮点型也可以用它格式化 %.3f
1 2 3 4 5 转义字符 \ \反斜杠 /斜杠 \n # print(xxx,end="\n") print默认结束符号是\n \t
1 2 3 4 5 6 7 8 9 "xx%sxx%s" % (xx,xx)"{}" .format (x)f"{v} xx"
.strip() .lstrip() .rstrip() 1 2 3 字符串.strip()去除左右两侧空格 字符串.rstrip('/') 移除末尾斜杠 字符串.lstrip('/') 移除头部斜杠
.repalce() 1 2 3 4 5 filename = filename.replace("/./", "/") 字符串.replace('xxx','替代成什么',次数) # 返回的新数据 字符串.replace('xxx','替代成什么')
.upper() .lower() .swapcase() .capitalize() 1 2 3 4 5 .upper() 全大写 .lower() 全小写 .swapcase() 大小写互换 .capitalize() 整个字符串第一个字符大写 .title() 整个字符串中所有单词第一个字符大写 "My Name Is Tome"
1 def format(self, *args: object, **kwargs: object) -> str
1 2 3 # 这也可以格式化 l = [1,2,3] "{}".format(l)
.title() 1 2 3 4 转换成每个单词首字母大写 s = "hello world" print(s.title()) # 输出: Hello World
.center() .ljust() .rjust() 1 2 3 4 5 6 使用 center()、ljust()、rjust() 对字符串进行对齐(第一个参数是长度,第二个是填充字符,默认是使用空格填充对齐) 字符串长度大于指定长度时不填充,显示完整长度 s = "Python" print(s.center(10, "*")) # 输出: **Python** print(s.ljust(10, "-")) # 输出: Python---- print(s.rjust(10, "-")) # 输出: ----Python
.count() 1 2 3 4 "xxx".count(值,开始位置,结束位置) # 结束位置是取不到的 "xxx".count("/") 'a{]dfef{}ddf'.count('{}') 计算{}的数量
.endswith() 1 2 3 .endswith(xxx) .endswith(xxx,开始下标位置,结束下标位置) # 结束位置是取不到的 "".endswith()
.startswith() 1 2 3 4 5 6 7 name.startswith(xxx) name.startswith(xxx,开始下标位置,结束下标位置) # 结束位置是取不到的 name.startswith("_") name.startswith("_") >>> s.startswith("Name",3,-1) True
.find() .rfind() .index() .rindex() 1 2 3 4 5 6 7 8 找对应子字符串第一次出现的位置 .xx(子串,开始下标位置,结束下标位置) .xx(子串) .find() 不存在返回-1 .rfind() 从右边找第一次,返回位置仍是从左到右,不存在返回-1 .index() 不存在报错 .rindex() 从右边找第一次,返回位置仍是从左到右,不存在报错
.isdigit() .isalpha() .isalnum() .isspace() 1 2 3 4 .isdigit() 判断字符串是否全是数字 .isalpha() 判断是否全是字母 各种语言的字母都可以 .isalnum() 判断字符串是否都由字母或者数字构成 .isspace() 判断字符串是否都由空白字符构成
.join([])
.split() 1 2 3 .strip()可以将字符串两边的\n和空格去掉 字符串.split('xxx') 所有分割 字符串.split('xxx',1) 从左开始分割一次
1 2 3 4 5 ''.split(" ") >> [''] 'a\n'.split("\n") >> ['a','']
函数 函数说明文档 1 2 3 4 5 def fun (): """解释说明,只能在最前面且是多行注释""" pass help (fun)
函数传参 1 2 3 4 5 6 7 8 9 10 11 12 13 14 位置参数:个数相同且顺序传入(调用是形参不写明) 默认参数(缺省参数):位置参数必须在默认参数前面(包括函数的定义和调用),且默认参数只能是不可变对象 关键字参数(调用时):调用时位置参数必须写在关键字参数前面。关键字参数就是把形参写明,关键字参数没有顺序要求 不定长参数之位置参数:*args,函数内部会获得一个tuple。如果本来传的参就是list或者tuple等则在前加* 不定长参数之关键字参数:**kwargs 如果原本传入的就是字典则字典前加** 参数定义的顺序必须是:位置参数、默认参数、可变参数、命名关键字参数和关键字参数。 *args和**kwargs不一定是要是这个名字(默认最好使用这两个名字) 调用时如果不传参数则得到空元组或者空字典
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 model_dump_json有很多参数(12+),但 dumps方法想要: 不重复所有参数定义 不限制可传递的参数 自动传递所有参数 解决方案:使用 *args, **kwargs def get_agents_info(self, data, **kwargs) -> Dict: """ 获取目标信息 :param data: 链路的DatData :return: 包含所需信息的所有目标物的字典 """ agents_info = defaultdict(dict) pop_list = set() filter_static = kwargs.get('filter_static', False) agent_min_exist_num = kwargs.get('agent_min_exist_num', 10) agent_min_speed = kwargs.get('agent_min_speed', None) perception_range = kwargs.get('perception_range', [(5, 50), (-20, 20)])
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class A: def model_dump_json( self, *, indent: int | None = None, ensure_ascii: bool = False, include: IncEx | None = None, exclude: IncEx | None = None, context: Any | None = None, by_alias: bool | None = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False, exclude_computed_fields: bool = False, round_trip: bool = False, warnings: bool | Literal['none', 'warn', 'error'] = True, fallback: Callable[[Any], Any] | None = None, serialize_as_any: bool = False, ) -> str: class B: def dumps(self, *args, **kwargs) -> str: """ output json string """ return self.model_dump_json(*args, **kwargs)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 # 假设 model_dump_json 有这些参数 def model_dump_json(indent=2, ensure_ascii=False, ...): pass # 包装方法 def dumps(*args, **kwargs): return model_dump_json(*args, **kwargs) # 调用时: # 位置参数通过 *args 传递 dumps(4, True) # → model_dump_json(4, True) # 关键字参数通过 **kwargs 传递 dumps(indent=4, ensure_ascii=True) # → model_dump_json(indent=4, ensure_ascii=True)
打包(组包) 定义函数时直接使用*args,**kwargs。此时调用时传入的不定长参数或者键值对会转换成tuple和dict
解包 *可以对列表,tuple解包,**可以对字典解包
是在函数调用 时使用的
1 2 3 4 5 6 7 print (*[1 , 2 , 3 ]) def func (a, b, c ): return a + b + c func(*[1 , 2 , 3 ])
1 2 3 4 def func (*args ): print (args) func(1 , 2 , 3 )
1 2 3 4 *l *[1 , 2 , 3 ] *(1 , 2 , 3 )
返回值return 遇到return 当前函数执行结束,后续代码不执行
lambda表达式(匿名函数) 如果一个函数只有一个返回值,并且只有一句代码,可以用lambda简化。对于符合这个条件的函数和lambda表达式,前者更占内存。
lambda [参数列表]:表达式 # lambda的参数和函数参数一样
注意:
lambda表达式的参数可有可无,函数的参数在lambda表达式中完全适用。
lambda表达式能接收任何数量的参数但只能返回一个表达式的值。
1 2 3 4 5 6 7 # 调用 fn = lambda x:x fn(2) 或者 (lambda x:x)(2)
lambda的参数形式 无参 1 2 3 4 5 6 7 8 9 10 def fn1 (): return 100 fn1 fn2 = lambda :100 调用 fn2()
有参 1 2 3 4 5 def add (a,b ): return a+b add1 = lambda a,b:a+b print (add1(1 ,2 ))
默认参数 1 fn = lambda a,b,c=1 :a+b+c
可变参数 *args
1 2 3 4 5 fn = lambda *args:args print (fn(1 ,5 ,7 )) print (fn()
**kwargs
1 (lambda **kwargs:kwargs)(name="tom" ,age=18 )
带判断的lambda 1 (lambda a,b: a if a>b else b)(100 ,500 )
列表数据按字典key值排序 1 2 students = [{"name" :"tom" ,"age" :20 },{"name" :"jerry" ,"age" :50 }] students.sort(key=lambda x:x['age' ],reverse=True )
递归Recursion 函数内部自己调用自己
必须有出口
1 2 如果要遍历一个文件夹下面所有的文件,通常会使用递归来实现; 快速排序算法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 0 1 1 2 3 5 8 def g(n:int): num = 1 c = 0 a = 0 b = 1 while num <= n c = a + b if n == 1: yield 0 if n == 2: yield 1 yield c a = b b = c num += 1
案例2 1 2 3 4 def fun {n}: if n == 1 : return 1 return n + fun(n-1 )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def fun {n}: return n + fun(n-1 ) def fun1 (n ): if n == 1 : return 1 return n + fun(n-1 ) if __name__ == "__main__" : print (fun1(1000 )) [Previous line repeated 996 more times] RecursionError: maximum recursion depth exceeded
1 2 3 4 def fun (n ): if n == 1 : return 1 return n*fun(n-1 )
尾递归 解决递归调用栈溢出的方法是通过尾递归 优化,尾递归是指,在函数返回的时候,调用自身本身,并且,return语句不能包含表达式
遗憾的是,大多数编程语言没有针对尾递归做优化,Python解释器也没有做优化
1 2 3 4 5 6 7 8 9 10 def fact_iter (n,res ): if n == 1 : return res return fact_iter(n-1 ,n*res) def fact (n ): return fact_iter(n,1 ) print (fact(10000 ))
1 2 [Previous line repeated 995 more times] RecursionError: maximum recursion depth exceeded
高阶函数 把函数作为参数传入这样的函数称为高阶函数
1 2 3 4 def sum_num (a,b,f ): return f(a) + f(b) result = sum_num(-1 ,2 ,abs )
map map(func, lst),将传入的函数变量func作用到lst变量的每个元素中,并将结果组成新的列表(Python2)/迭代器(Python3)返回。
1 2 3 4 5 6 7 def func (x ): return x**2 l = [4 ,1 ,3 ] result = map (lambda x:x**2 ,l) print (list (result))
reduce reduce(func,lst),其中func必须有两个参数。每次func计算的结果继续和序列的下一个元素做累积计算。返回的就是结果。
1 2 3 4 5 6 7 8 import functoolsdef func (a,b ) return a+b l = [5 ,4 ,8 ,1 ] result = functools.reduce(func,l)
filter filter(func,Ist)函数用于过滤序列,过滤掉不符合条件的元素,返回一个filter 对象。如果要转换为列表,可以使用list()来转换。
1 2 3 4 5 6 7 def fun (x ): return x%2 ==0 l=[5 ,6 ,4 ,1 ] f = filter (fun,l) print (list (f))
zip 1 2 3 配对机制:将多个可迭代对象中相同位置的元素组合成元组 结果长度:以最短的可迭代对象为准 惰性计算:返回的是一个迭代器,而不是列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # 基础示例 names = ['小明', '小红', '小刚'] scores = [85, 92, 78] # 将两个列表配对 zipped = zip(names, scores) print(list(zipped)) # [('小明', 85), ('小红', 92), ('小刚', 78)] # 长度不等的处理 list1 = [1, 2, 3, 4] list2 = ['a', 'b', 'c'] print(list(zip(list1, list2))) # [(1, 'a'), (2, 'b'), (3, 'c')] 以最短的为准 # 解压(反向操作) pairs = [('a', 1), ('b', 2), ('c', 3)] unzipped = zip(*pairs) print(list(unzipped)) # [('a', 'b', 'c'), (1, 2, 3)] # 遍历多个列表 for name, score in zip(names, scores): print(f"{name}: {score}分") # 创建字典 dict_from_zip = dict(zip(names, scores)) print(dict_from_zip) # {'小明': 85, '小红': 92, '小刚': 78}
any() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 列表中存在至少一个 True 元素 my_list = [False, False, True, False] print(any(my_list)) # 输出: True 元组中不存在 True 元素 my_tuple = (False, False, False) print(any(my_tuple)) # 输出: False 字典中存在至少一个 True 值 my_dict = {'a': False, 'b': True, 'c': False} print(any(my_dict.values())) # 输出: True 集合中不存在 True 元素 my_set = {False, False, False} print(any(my_set)) # 输出: False
enumerate() 1 for i,e in enumerate(list) # 列表、元组、字符串都可以
dir() 可以查看函数、类、字典等里有什么东西
dir(dict())
min() 1 closest_time = min(original_data.keys(), key=lambda x: abs(x - original_time))
abs()
round() 可以四舍五入至指定的小数位数
round(1.9) # 2
round(1.11,1) # 1.1
reversed 1 2 3 4 l = [5,4,1,3] ll = reversed(l) # 整个列表地址变了,和l.sort(不同) >>list(ll) [3,1,4,5]
sorted() sorted() 函数用于对所有可迭代对象进行排序操作,返回新列表。
列表的l.sort()不同,是原地排列不返回新列表
1 2 3 4 5 6 ey: None = None, reverse: bool = False sorted(file_infos, key=lambda file_info: file_info["Key"])
eval()和exec() 和ast eval 1 2 eval函数是一个内置函数,用于将字符串解析并执行为Python表达式 用于执行单个表达式,并返回计算结果
1 2 3 4 5 6 x = 1 print(eval('x+1')) # 输出:2 print(eval('x+y', {'x': 1, 'y': 2})) # 输出:3 print(eval('[x**2 for x in range(5)]')) # 输出:[0, 1, 4, 9, 16]
1 2 3 # 危险的使用示例 user_input = "os.system('rm -rf /')" # 一段恶意代码 eval(user_input) # 这将执行恶意代码
exec 1 2 3 4 5 6 7 8 9 10 eval函数和exec函数在一定程度上是相似的,都可以执行字符串形式的Python代码。 eval函数返回表达式的结果,而exec函数不返回任何结果。用于执行多行代码或语句块 exec可以执行更复杂的Python代码结构,比如类定义、函数定义和多行语句,而eval只能解析单个表达式。 eval('x = 5') # 这会导致语法错误,因为'x = 5'不是一个表达式 exec('x = 5') # 这可以正常执行,因为'x = 5'是一个语句 print(x) # 输出:5
ast 1 2 3 ast.parse()将Python代码字符串解析为抽象语法树 如果代码有语法错误,会抛出 SyntaxError异常
1 2 3 4 5 6 7 def _validate_py_syntax(filename): with PathManager.open(filename, "r") as f: content = f.read() try: ast.parse(content) except SyntaxError as e: raise SyntaxError(f"Config file {filename} has syntax error!") from e
类 self self是指调用该函数的对象
1 2 3 4 5 6 7 8 class Washer (): def wash (self ): print (self ) w = Washer() print (w) 1 和2 完全一样
self.–class–() 1 2 3 4 5 6 def _create_new_instance(self) -> 'SensorData': """工厂方法:创建当前类型的实例""" return self.__class__(self.clipvistor, self.sensor_name_) self.__class__获取当前对象的类 ()调用类的构造函数
return self 这里假如有一个实例化对象o = xx(),然后s=o.align_and_conver_line()返回的是还是o,s和o完全一样。如果有多个这样的方法还可以使用链式调用。
1 2 3 def align_and_conver_line(self, al_time_line: TimeLine, target_time_line: TimeLine, delt_time: int) -> SensorData: return self
if self: 如果类定义了 __bool__()方法,调用它
如果没有定义 __bool__(),但有 __len__()方法,调用它(长度非0为True)
如果都没有,自定义对象始终为True
None、0、0.0、””、[]、{}、()的bool(x)值都为False
1 2 3 4 5 6 7 8 9 10 11 def to_json(self)->dict: out_object={ 'ticks':None, 'intervals':None, 'ticksize':0 } if self: out_object['ticks']=self.time_line_tickslist() out_object['intervals']=self.intervals out_object['ticksize']=self.time_line_ticksize() return out_object
1 2 3 4 5 6 7 8 9 10 # 可以通过下面的方法设计,判断实例真值而不是是否存在 class TimeLine: def __init__(self, is_valid=False): self._is_valid = is_valid def __bool__(self): # 或者__len__(self)方法 return self._is_valid def to_json(self): if self: # 只有明确标记为有效的实例才处理
类外面操作属性 对象.属性
对象.属性 = 值
类里面操作属性 init和new方法 1 2 3 __init__():不是一定要有的。后调用,第一个参数self,初始化器,初始化实例(设置属性),不返回任何值(返回None) __new__():先调用,第一个参数cls,构造器,创建实例(分配内存),必须返回实例
延迟初始化类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class IoProcess: _instance = None _initialized = False def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if not self._initialized: pass def init(self, yaml_file): # 加载配置文 configs = yaml.load(open(yaml_file), Loader=yaml.FullLoader) configs = configs['output_reloc_result'] self.config_base = configs["config_base"] self.match_dict = configs["match_dict"] self._initialized = True # 初始化对象 self.cos = CosUtils(self.config_base)
1 2 3 4 5 # 1. 获取单例实例 ioProcess = IoProcess() # 此时 __init__ 是空的 # 2. 在需要时进行实际初始化 ioProcess.init(config_yaml) # 延迟加载配置
类方法注释 1 2 3 4 5 6 7 8 9 10 11 12 class AgentBackwardBehavior: def __init__(self): self.window_len = 5 # 0.5s滑窗时长 self.velocity_threshold = 0.1 # 速度阈值(m/s) def extract(self, data: DatData) -> DatData: """ 倒车标签提取 :param data: DatData :param slices: None :return: """
继承 继承父类所有的属性和方法
1 2 3 4 5 6 7 class A(B): def __init__(self, config, ioprocess, args): super().__init__(config=config , ioprocess=ioprocess, args=args) self.xxx = args.xxx
单继承 多继承 注意:当一个类有多个父类的时候,默认使用第一个父类的同名属性和方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class Master (object ): def __init__ (self ): self .kongfu = "【古法煎饼果子配方】" def make_cake (self ): print (f'运用{self.kongfu} 制作煎饼果子' ) class School (object ): def __init__ (self ): self .kongfu = "【黑马煎饼果子配方】" def make_cake (self ): print (f'运用{self.kongfu} 制作煎饼果子' ) class Prentice (School,Master): pass daqiu = Prentice() print (daqiu.kongfu)daqiu.make_cake()
1 2 【黑马煎饼果子配方】 运用【黑马煎饼果子配方】制作煎饼果子
mro 方法解析顺序(Method Resolution Order),简称 MRO
查看类的继承关系:类名.–mro–
1 2 >>print(Prentice.__mro__) (<class '__main__.Prentice'>, <class '__main__.School'>, <class '__main__.Master'>, <class 'object'>)
重写父类方法 如果子类和父类拥有同名的属性和方法,子类调用的是自己的属性和方法。
只要子类的方法名和父类方法名一样,不管参数是否一样都是调用的子类自己的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class Master (object ): def __init__ (self ): self .kongfu = "【古法煎饼果子配方】" def make_cake (self ): print (f'运用{self.kongfu} 制作煎饼果子' ) class School (object ): def __init__ (self ): self .kongfu = "【黑马煎饼果子配方】" def make_cake (self ): print (f'运用{self.kongfu} 制作煎饼果子' ) class Prentice (School,Master): def __init__ (self ): self .kongfu = "【独创煎饼果子配方】" def make_cake (self ): print (f'运用{self.kongfu} 制作煎饼果子' ) daqiu = Prentice() print (daqiu.kongfu)daqiu.make_cake()
1 2 3 (venv) D:\code\PYTHON>d:/code/PYTHON/venv/Scripts/python.exe d:/code/PYTHON/src/继承/多继承.py 【独创煎饼果子配方】 运用【独创煎饼果子配方】制作煎饼果子
多层继承 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class Master (object ): def __init__ (self ): self .kongfu = "【古法煎饼果子配方】" def make_cake (self ): print (f'运用{self.kongfu} 制作煎饼果子' ) class School (object ): def __init__ (self ): self .kongfu = "【黑马煎饼果子配方】" def make_cake (self ): print (f'运用{self.kongfu} 制作煎饼果子' ) class Prentice (School,Master): def __init__ (self ): self .kongfu = "【独创煎饼果子配方】" def make_cake (self ): self .__init__() print (f'运用{self.kongfu} 制作煎饼果子' ) def make_master_cake (self ): Master.__init__(self ) Master.make_cake(self ) def make_school_cake (self ): School.__init__(self ) School.make_cake(self ) class Tusun (Prentice ): pass tusun = Tusun() tusun.make_cake() tusun.make_master_cake() tusun.make_school_cake()
调用父类的方法 调用父类的方法需要加self,为了接收将来调用的对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 class Master (object ): def __init__ (self ): self .kongfu = "【古法煎饼果子配方】" def make_cake (self ): print (f'运用{self.kongfu} 制作煎饼果子' ) class School (object ): def __init__ (self ): self .kongfu = "【黑马煎饼果子配方】" def make_cake (self ): print (f'运用{self.kongfu} 制作煎饼果子' ) class Prentice (School,Master): def __init__ (self ): self .kongfu = "【独创煎饼果子配方】" def make_cake (self ): self .__init__() print (f'运用{self.kongfu} 制作煎饼果子' ) def make_master_cake (self ): Master.__init__(self ) Master.make_cake(self ) def make_school_cake (self ): School.__init__(self ) School.make_cake(self ) daqiu = Prentice() daqiu.make_school_cake() print (Prentice.__mro__)
super() super用于调用父类方法
1 2 3 4 super().__init__()的作用是调用父类的 __init__构造函数 这确保了父类 ABC的初始化逻辑能够正确执行,这是多重继承中正确的初始化方式 如果 SensorData有多个父类,super()能确保所有父类的构造函数按正确的顺序被调用 从代码规范角度来说,显式调用父类构造函数是最佳实践
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class ABC(metaclass=ABCMeta): """Helper class that provides a standard way to create an ABC using inheritance. """ __slots__ = () class SensorData(ABC): # 传感器基类 def __init__(self, clipvistor:DataEngine,sensor_name:str) -> None: super().__init__() self.sensor_name_=sensor_name self.raw_data = {} # 存储原始帧数据 self.frame_datas = None # 存储TimeLine实例化后的数据 self.clipvistor = clipvistor @abstractmethod # DataLoad()加载数据实现函数, 调用ClipVistor 格式转换 FrameData def load_sensor_data(self,sensor_name:str, path_list)->bool: pass
1 2 3 class AgentRefine(BaseModule): def __init__(self, cfg: EasyDict, output_root: str): super().__init__(cfg, output_root)
1 2 3 4 5 6 7 8 9 10 11 12 class CosUtils(): _instance = None def __new__(cls, *args, **kwargs): """ 单例 :param args: :param kwargs: """ if not cls._instance: cls._instance = super().__new__(cls) return cls._instance
带参super() super(当前类名,self).xxx()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 class Master (object ): def __init__ (self ): self .kongfu = "【古法煎饼果子配方】" def make_cake (self ): print (f'运用{self.kongfu} 制作煎饼果子' ) class School (Master ): def __init__ (self ): self .kongfu = "【黑马煎饼果子配方】" def make_cake (self ): print (f'运用{self.kongfu} 制作煎饼果子' ) super (School,self ).__init__() super (School,self ).make_cake() class Prentice (School ): def __init__ (self ): self .kongfu = "【独创煎饼果子配方】" def make_cake (self ): self .__init__() print (f'运用{self.kongfu} 制作煎饼果子' ) def make_master_cake (self ): Master.__init__(self ) Master.make_cake(self ) def make_school_cake (self ): School.__init__(self ) School.make_cake(self ) def make_old_cake (self ): super (Prentice,self ).__init__() super (Prentice,self ).make_cake() daqiu = Prentice() daqiu.make_old_cake() print (Prentice.__mro__)
上下两种效果一样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 class Master (object ): def __init__ (self ): self .kongfu = "【古法煎饼果子配方】" def make_cake (self ): print (f'运用{self.kongfu} 制作煎饼果子' ) class School (Master ): def __init__ (self ): self .kongfu = "【黑马煎饼果子配方】" def make_cake (self ): print (f'运用{self.kongfu} 制作煎饼果子' ) super (School,self ).__init__() super (School,self ).make_cake() class Prentice (School ): def __init__ (self ): self .kongfu = "【独创煎饼果子配方】" def make_cake (self ): self .__init__() print (f'运用{self.kongfu} 制作煎饼果子' ) def make_master_cake (self ): Master.__init__(self ) Master.make_cake(self ) def make_school_cake (self ): School.__init__(self ) School.make_cake(self ) def make_old_cake (self ): super (Prentice,self ).__init__() super (Prentice,self ).make_cake() daqiu = Prentice() daqiu.make_old_cake() print (Prentice.__mro__)
不带参super() 注意:使用super()可以自动查找父类。调用顺序遵循mro类的继承顺序。比较适合单继承 使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 class Master (object ): def __init__ (self ): self .kongfu = "【古法煎饼果子配方】" def make_cake (self ): print (f'运用{self.kongfu} 制作煎饼果子' ) class School (Master ): def __init__ (self ): self .kongfu = "【黑马煎饼果子配方】" def make_cake (self ): print (f'运用{self.kongfu} 制作煎饼果子' ) super ().__init__() super ().make_cake() class Prentice (School ): def __init__ (self ): self .kongfu = "【独创煎饼果子配方】" def make_cake (self ): self .__init__() print (f'运用{self.kongfu} 制作煎饼果子' ) def make_master_cake (self ): Master.__init__(self ) Master.make_cake(self ) def make_school_cake (self ): School.__init__(self ) School.make_cake(self ) def make_old_cake (self ): super ().__init__() super ().make_cake() daqiu = Prentice() daqiu.make_old_cake() print (Prentice.__mro__)
私有权限 在Python中,可以为实例属性和方法设置私有权限,通过本身实例对象获取属性或者调用方法,即设置某个实例属性或实例方法不继承给子类。
私有属性和私有方法在其前面加__
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 class Master (object ): def __init__ (self ): self .kongfu = "【古法煎饼果子配方】" def make_cake (self ): print (f'运用{self.kongfu} 制作煎饼果子' ) class School (object ): def __init__ (self ): self .kongfu = "【黑马煎饼果子配方】" def make_cake (self ): print (f'运用{self.kongfu} 制作煎饼果子' ) class Prentice (School,Master): def __init__ (self ): self .kongfu = "【独创煎饼果子配方】" self .__money = 10000000 def __info_print (self ): print (self .kongfu,self .__money) def make_cake (self ): self .__init__() print (f'运用{self.kongfu} 制作煎饼果子' ) def make_master_cake (self ): Master.__init__(self ) Master.make_cake(self ) def make_school_cake (self ): School.__init__(self ) School.make_cake(self ) class Tusun (Prentice ): pass tusun = Tusun() print (tusun.__money)tusun.__info_print()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 (venv) D:\code\PYTHON> d:/code /PYTHON /venv /Scripts /python.exe d :/code /PYTHON /src /继承/私有属性.py Traceback (most recent call last ): File "d :\code \PYTHON \src \继承\私有属性.py ", line 45, in <module > print (tusun.__money ) ^^^^^^^^^^^^^ AttributeError : 'Tusun ' object has no attribute '__money '(venv ) D :\code \PYTHON >d :/code /PYTHON /venv /Scripts /python.exe d :/code /PYTHON /src /继承/私有属性.py Traceback (most recent call last ): File "d :\code \PYTHON \src \继承\私有属性.py ", line 46, in <module > tusun.__info_print () ^^^^^^^^^^^^^^^^^^ AttributeError : 'Tusun ' object has no attribute '__info_print '
多态 传入不同对象,产生不同结果
定义:多态是一种使用对象的方式,子类重写父类方法,调用不同子类对象的相同父类方法,可以产生不同的执行结果
实现步骤:
定义父类,并提供公共方法
定义子类,并重写父类方法
传递子类对象给调用者,可以看到不同子类执行效果不同
类属性和实例属性 类属性就是类对象所拥有的属性,它被该类的所有实例对象所共有。 类属性可以使用类对象或实例对象访问。
class xx():
属性 = 值
修改类属性 类属性只能通过类对象修改,不能通过实例对象修改,如果通过实例对象修改类属性,表示的是创建了一个实例属性。
类.属性 = 值
类方法和静态方法 类方法 特点:
需要用装饰器@classmethod来标识其为类方法,对于类方法,第一个参数必须是类对象,一般以 cls作为第一个参数。
实例对象或者类都可以调用类方法
使用场景:
当方法中需要使用类对象(如访问私有类属性等)时,定义类方法类方法一般和类属性配合使用
1 2 3 4 5 6 7 8 9 10 class Dog (): __tooth = 10 @classmethod def get_tooth (cls ): return cls.__tooth d = Dog() result = d.get_tooth()
静态方法 特点:
需要通过装饰器@staticmethod来进行修饰,静态方法既不需要传递类对象也不需要传递实例对象(形参没有self/cls) 静态方法也能够通过实例对象和类对象去访问。
使用场景:
当方法中既不需要用实例对象(如实例对象,实例属性),也不需要使用类对象(如类属性、类方法、创建实例等)时,定义静态方法取消不需要的参数传递,有利于减少不必要的内存占用和性能消耗
1 2 3 4 5 6 7 8 class Dog (): @staticmethod def info_print (): print ('xxx' ) d = Dog() d.info_print() Dog.info_print()
案例 1 2 3 4 5 class utils: @classmethod def SaveJSON(cls, file_path, json_obj): with open(file_path, "w") as json_file: json.dump(json_obj, json_file)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 class IoProcess: _instance = None # 类变量,存储单例实例 def __init__(self, yaml_file=None): """初始化方法,但实际初始化是延迟的""" self._initialized = False if yaml_file: self._init_from_yaml(yaml_file) def _init_from_yaml(self, yaml_file): """实际执行初始化的私有方法""" if not self._initialized: # 加载配置 with open(yaml_file) as f: config = yaml.safe_load(f) self.config = config self._initialized = True print(f"初始化完成: {yaml_file}") @classmethod def get_instance(cls, yaml_file=None): """获取单例实例的工厂方法""" if cls._instance is None: # 第一次调用:创建实例 print("创建新实例") cls._instance = cls(yaml_file) elif yaml_file and not cls._instance._initialized: # 实例存在但未初始化,且提供了配置文件 print("初始化现有实例") cls._instance._init_from_yaml(yaml_file) # 其他情况:直接返回现有实例 return cls._instance
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 cls是 类本身(class object),而不是类的实例。类方法第一个参数必须是它 class IoProcess: _instance = None # 类变量,属于类本身 @classmethod def get_instance(cls, yaml_file=None): # cls 指向 IoProcess 类 print(cls) # <class '__main__.IoProcess'> print(cls.__name__) # "IoProcess" # cls 可以用来: # 1. 访问类变量 print(cls._instance) # 访问类变量 _instance # 2. 创建新实例 instance = cls() # 等价于 IoProcess() # 3. 调用其他类方法 cls.other_classmethod() # 通过类调用 IoProcess.get_instance() # cls 自动传入 IoProcess
@classmethod和@staticmethod和实例方法 1 2 3 4 5 6 7 8 9 10 11 @classmethod:参数cls 实例和类都可以访问类方法 @staticmethod:无特殊参数 实例和类都可以访问类方法 实例方法:参数self 实例可以访问 只有实例才可以访问实例属性 @classmethod和@staticmethod都可以访问类属性
hasattr 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class MyClass: def __init__(self): self.name = "Alice" self.age = 30 obj = MyClass() # 检查属性是否存在 print(hasattr(obj, "name")) # 输出: True print(hasattr(obj, "age")) # 输出: True print(hasattr(obj, "email")) # 输出: False
1 2 3 4 5 6 7 8 9 class Calculator: def add(self, a, b): return a + b calc = Calculator() # 检查方法是否存在 if hasattr(calc, "add"): print(calc.add(5, 3)) # 输出: 8 else: print("方法不存在")
1 2 if hasattr(obj, "name"): print(getattr(obj, "name")) # 输出: Alice
iter() 1 内置函数,用于生成迭代器。迭代器是一种对象,它可以记住遍历的位置,允许程序在数据集合中进行遍历操作。迭代器对象从集合的第一个元素开始访问,直到所有元素被访问完毕。迭代器只能向前移动,不能后退。
1 2 3 4 5 6 7 8 9 # 创建一个字典 person = {"name": "Bob", "age": 25, "city": "London"} # 获取视图对象并转换为迭代器 items_view = person.items() items_iterator = iter(items_view) # 使用 next() 获取元素 print(next(items_iterator)) # ('name', 'Bob') print(next(items_iterator)) # ('age', 25) print(next(items_iterator)) # ('city', 'London')
–xx– Python 魔法方法(三) __getattr__,__setattr__, delattr -CSDN博客
1 --len()--() --repr--() --str--() hasattr() setattr() getattr() --setattr--() --setitem--() --getitem--() --delattr--() --delitem--() --eq--()
–str–和–repr– 更改打印时默认的内存地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class PythonCS: def __init__(self, name): self.name = name def __str__(self): return '公众号: ' + self.name def __repr__(self): return "PythonCS('{}')".format(self.name) object = PythonCS('Python禅师') # str调用 print(object) print(str(object)) # repr调用 ex = repr(object) print(ex) # 重新创建对象 object2 = eval(ex) print(object.name == object2.name)
1 2 3 4 公众号: Python禅师 公众号: Python禅师 PythonCS('Python禅师') True
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Product: def __init__(self, name, price, discount=0): self.name = name self.price = price self.discount = discount def __str__(self): """给用户看的商品信息""" final_price = self.price * (1 - self.discount/100) if self.discount > 0: return f"{self.name}: 原价¥{self.price},现价¥{final_price:.2f}({self.discount}%折扣)" return f"{self.name}: ¥{self.price}" def __repr__(self): """给开发者看的完整信息""" return f"Product('{self.name}', {self.price}, discount={self.discount})" product = Product("iPhone 15", 6999, discount=10) print(str(product)) # iPhone 15: 原价¥6999,现价¥6299.10(10%折扣) print(repr(product)) # Product('iPhone 15', 6999, discount=10)
–dict– 类.–dict–:返回类内部所有类属性(没有实例属性)、类方法、实例方法对应的字典
实例对象.–dict–:返回实例对象里实例属性组成的dict
–call–() 给类实现–call–()方法,让对象变成可以调用的对象,可调用的对象能够像函数一样使用
类装饰器有用到
–getattr–()和–getattribute–() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 # 当我们访问一个不存在的属性的时候,用--getattr--()处理 class A(object): def __init__(self, x): self.x = x def hello(self): return 'hello func' def __getattr__(self, item): print('in __getattr__') return 100 def __getattribute__(self, item): print('in __getattribute__') return super(A, self).__getattribute__(item) a = A(10) print(a.x) print(a.y) 运行代码,得到下面输出: in __getattribute__ 10 in __getattribute__ in __getattr__ 100 可以看出,在获到对象属性时,__getattribute__()是一定会被调用的,无论属性存不存在,首先都会调用这个魔法方法。 如果调用像a.y这种不存在的对象时,调用__getattribute__()找不到y这个属性,就会再调用__getattr__()这个魔法方法,可以通过在这个方法里实 来设置属性不存在时的默认值。使用上面的getattr()方法获取属性时,也是同样的调用关系,只不过只有在getattr()带第三个参数作为默认值时,才会调用 __getattr__()方法。
–setattr–() 1 2 3 4 5 6 7 8 9 10 11 # --setattr--() 在对一个属性设置值的时候,会调用到这个函数,每个设置值的方式都会进入这个方法。 需要注意的地方是,在重写__setattr__方法的时候千万不要重复调用造成死循环。 class MyClass: def __setattr__(self, name, value): print(f"设置属性 {name} 为 {value}") # 避免递归调用,直接操作 `__dict__` self.__dict__[name] = value obj = MyClass() obj.name = "Alice" # 输出: 设置属性 name 为 Alice
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class A(object): def __init__(self, value): print "into __init__" self.value = value def __setattr__(self, name, value): print "into __setattr__" if value == 10: print "from __init__" object.__setattr__(self, name, value) a = A(10) # into __init__ # into __setattr__ # from __init__ print a.value # 10 a.value = 100 # into __setattr__ print a.value # 100
1 2 3 4 5 6 7 8 9 10 11 12 13 14 在实例化的时候,会进行初始化,在__init__里,对value的属性值进行了设置,这时候会调用__setattr__方法。 在对a.value重新设置值100的时候,会再次进入__setattr__方法。 需要注意的地方是,在重写__setattr__方法的时候千万不要重复调用造成死循环。 class A(object): def __init__(self, value): self.value = value def __setattr__(self, name, value): self.name = value 这是个死循环。当我们实例化这个类的时候,会进入__init__,然后对value进行设置值,设置值会进入__setattr__方法,而__setattr__方法里面又有一个self.name=value设置值的操作,会再次调用自身__setattr__,造成死循环。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 除了上面调用object类的__setattr__避开死循环,还可以如下重写__setattr__避开循环。 class A(object): def __init__(self, value): self.value = value def __setattr__(self, name, value): self.__dict__[name] = value a = A(10) print a.value # 10 __dict__:Python 中,几乎每个对象(类、实例、函数等)都有一个 __dict__属性。它是一个字典,存储了这个对象自身拥有的属性和方法的映射。
–setitem–() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 __setitem__ 用于拦截容器类型对象(如字典、列表)的索引赋值操作。当通过 obj[key] = value 设置键值对时,会自动调用此方法。 class MyDict: def __init__(self): self.data = {} def __setitem__(self, key, value): print(f"设置键 {key} 为 {value}") self.data[key] = value d = MyDict() d["age"] = 30 # 输出: 设置键 age 为 30 在 __setattr__ 和 __setitem__ 内部,避免直接调用 self.attr = value 或 self[key] = value,否则会导致递归调用。 推荐使用 self.__dict__[name] = value 或自定义的内部存储结构(如字典)来完成赋值操作。
实例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 if platform.system() == 'Windows' : import regex as re else : import re class ConfigDict (Dict ): """A dictionary for config which has the same interface as python's built- in dictionary and can be used as a normal dictionary. The Config class would transform the nested fields (dictionary-like fields) in config file into ``ConfigDict``. """ def __missing__ (self, name ): raise KeyError(name) def __getattr__ (self, name ): try : value = super ().__getattr__(name) except KeyError: raise AttributeError(f"'{self.__class__.__name__} ' object has no " f"attribute '{name} '" ) except Exception as e: raise e else : return value
–del–() del\del() 对象时,会调用该方法。但是程序结束完也会默认调用该方法
1 2 3 4 5 6 7 8 9 10 class Washer (): def __init__ (self,width,height ): self .width = width self .height = height def __del__ (self ): print (f'{self} 对象已被删除' ) haier = Wash(10 ,20 )
–delitem–() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Tag: def __init__(self): self.change = {'python': 'This is python', 'php': 'PHP is a good language'} def __getitem__(self, item): print('调用 __getitem__') return self.change[item] def __setitem__(self, key, value): print('调用 __setitem__') self.change[key] = value def __delitem__(self, key): print('调用 __delitem__') del self.change[key] # 创建对象 a = Tag() # 访问元素 print(a['php']) # 删除元素 del a['php'] # 查看删除后的字典 print(a.change)
–delattr–() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # --delattr--() class A(object): def __init__(self, value): self.value = value def __delattr__(self, item): object.__delattr__(self, item) def __getattr__(self, item): return "when can not find attribute into __getattr__" a = A(10) print a.value # 10 del a.value print a.value # when can not find attribute into __getattr__
–eq–() 1 2 3 4 5 6 def __eq__(self, other:'TimeLine'): if other is None: return False if not isinstance(other, TimeLine): return False return ( self.time_points == other.time_points)
–sub–() 1 2 def __sub__(self,other:'TimeLine')->bool: return (self.time_points[0]-other.time_points[0])
–getitem–() 通过索引访问 1 2 3 4 5 6 7 8 9 10 11 12 13 __getitem__ 是 Python 中的一个特殊方法,用于使对象支持通过索引或键访问其元素,类似于列表、字典等内置数据结构的行为。通过灵活使用 __getitem__,可以让自定义类像内置数据结构一样直观且易用。 示例:自定义类实现 __getitem__ class MyList: def __init__(self, data): self.data = data def __getitem__(self, index): return self.data[index] my_list = MyList([1, 2, 3, 4, 5]) print(my_list[2]) # 输出: 3
切片操作 1 2 3 4 5 6 7 8 9 class Sequence: def __init__(self, seq): self.seq = seq def __getitem__(self, key): if isinstance(key, slice): return self.seq[key.start:key.stop:key.step] return self.seq[key] seq = Sequence([10, 20, 30, 40, 50]) print(seq[1:4]) # 输出: [20, 30, 40]
字典式访问 1 2 3 4 5 6 7 8 9 class MyDict: def __init__(self): self.data = {"a": 1, "b": 2} def __getitem__(self, key): if key in self.data: return self.data[key] raise KeyError(f"Key '{key}' not found.") d = MyDict() print(d["a"]) # 输出: 1
–bool–() 1 2 3 4 5 6 7 8 9 class TimeLine: def __init__(self, is_valid=False): self._is_valid = is_valid def __bool__(self): return self._is_valid def to_json(self): if self: # 只有明确标记为有效的实例才处理
闭包 定义 在函数嵌套的前提下,内部函数使用了外部函数的变量,并且外部函数返回了内部函数,我们把这个使用 外部函数变量的内部函数称为闭包。
作用 可保存外部函数的变量,不会随外部函数的调用完成而销毁
注意点:由于闭包引用了外部函数的变量或者参数,则外部函数的变量没有及时释放,消耗内存。
闭包的形成条件 1.函数嵌套 2.内部函数使用了外部函数的变量或者参数
3.外部函数返回内部函数,这个使用了外部函数变量的内部函数称为闭包
案例 1 2 3 4 5 6 7 8 9 def func (): a = 15 def func_inner (num ): print (a+num) return func_inner if __name__ == '__main__' : new_func = func() new_func(100 )
1 2 3 4 5 6 7 8 9 10 11 def test (a ): b = 10 def test2 (): return a+b return test2 if __name__ == '__main__' : t = test(10 ) print (t())
1 2 3 4 5 6 7 8 9 10 11 def func (name ): def func_inner (msg ): return f'{name} :{msg} ' return func_inner tom = func("tom" ) jerry = func("jerry" ) tom("你过来!" ) jerry("NO!" ) tom("放心我不吃你!" ) jerry("我不信!" )
用闭包修改外部函数的变量 1 2 3 4 5 6 7 8 9 10 11 def func (): num = 10 def func_inner (): nonlocal num num = 20 func_inner() print (num) return func_inner func()()
装饰器 定义 就是给已有函数增加额外功能的函数,它本质上就是一个闭包函数。 装饰器的功能特点: 1.不修改已有函数的源代码 2.不修改已有函数的调用方式 3.给已有函数增加额外的功能
闭包和装饰器区分:如果的闭包函数的参数有且只有一个且为函数类型,那这个闭包函数是装饰器
使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import timedef decorator (func ): def inner (): print (time.ctime()) func() return inner def comment (): print ("发表评论" ) comment = decorator(comment) comment()
装饰器的语法糖写法 装饰器的语法糖写法:@装饰器名称,装饰器的语法糖就是在装饰以后函数的时候写法更加简单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import timedef decorator (func ): def inner (): print (time.ctime()) func() return inner @decorator def comment (): print ("发表评论" ) comment()
装饰器执行的时机 当当前模块加载完成以后,装饰器会立即执行对函数进行封装
案例 1 2 3 4 5 6 7 8 9 10 11 12 13 def decorator (func ): def func_inner (): start_time = time.time() func() end_time = time.time() print (end_time-start_time) return func_innner @decorator def func (): for i in range (1000 ): print (i)
通用装饰器 原函数带参数 1 2 3 4 5 6 7 8 9 10 11 12 13 def decorator (func ): def func_inner (a,b ): print ("正在练习加法运算!" ) func(a,b) return func_inner @decorator def func (a,b ): print (a+b) func(400 ,5 )
原函数带参数和返回值 1 2 3 4 5 6 7 8 9 10 11 12 13 14 def decorator (func ): def func_inner (a,b ): print ("正在练习加法运算!" ) r = func(a,b) return r return func_inner @decorator def func (a,b ): return a+b print (func(400 ,5 ))
原函数带不定长参数和返回值(通用装饰器) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def decorator (func ): def func_inner (*args,**kwargs ): print ("正在练习加法运算!" ) r = func(*args,**kwargs) return r return func_inner @decorator def func (*args,**kwargs ): result = 0 for v in args: result += v for e in kwargs.values(): result += e return result print (func(400 ,5 ))
多个装饰器一起使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def make_div (func ): def inner (): return f"<div>{func()} </div>" return inner def make_p (func ): def inner (): return f"<p>{func()} </p>" return inner @make_div @make_p def func (): return "人生苦短,多用PYTHON!" print (func())
带参数的装饰器(重要) 带参数的装饰器,其实就是定义了一个函数,让函数接受参数,在函数内部返回的一个是装饰器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def return_decorator (flag ): def decorator (func ): def inner (a,b ): if flag == "+" : print ("正在执行加法运输!" ) if flag == "-" : print ("正在执行减法运算!" ) return func(a,b) return inner return decorator @return_decorator("+" ) def add (a,b ): return a+b @return_decorator("-" ) def sub (a,b ): return a-b print (add(5 ,10 ))print (sub(15 ,10 ))
类装饰器 使用类装饰器装饰已有函数
–call–() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 class MyDecorator (object ): def __init__ (self,func ): self .__func = func def __call__ (self, *args, **kwds ): print ("课已讲完!" ) self .__func() @MyDecorator def show (): print ("快要下课了!" ) show()
dataclass库 1 2 3 4 5 6 7 8 # 它可以自动生成常用方法(如 __init__、__repr__ 等),减少重复代码。 from dataclasses import dataclass @dataclass class Gnss: lng: float lat: float height: float
1 2 3 4 5 6 7 from dataclasses import dataclass @dataclass class Person: name: str age: int person = Person(name="Alice", age=30) print(person) # 输出:Person(name='Alice', age=30)
默认值 1 2 3 4 5 6 7 8 9 10 from dataclasses import dataclass from typing import Optional @dataclass class Person: name: str age: Optional[int] = None person1 = Person(name="Tom", age=20) person2 = Person(name="Jerry") print(person1) # 输出:Person(name='Tom', age=20) print(person2) # 输出:Person(name='Jerry', age=None)
frozen 1 2 3 4 5 6 7 # 通过 frozen=True 参数,可以将数据类设置为不可变 @dataclass(frozen=True) class ImmutablePerson: name: str age: int person = ImmutablePerson(name="Alice", age=30) # person.age = 40 # 会抛出 FrozenInstanceError
转换为字典或元组 1 2 3 4 5 # dataclass 提供了 asdict 和 astuple 方法,方便将对象转换为字典或元组 from dataclasses import asdict, astuple person = Person(name="Alice", age=30) print(asdict(person)) # 输出:{'name': 'Alice', 'age': 30} print(astuple(person)) # 输出:('Alice', 30)
单例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 class CosUtils(): _instance = None def __new__(cls, *args, **kwargs): """ 单例 :param args: :param kwargs: """ if not cls._instance: cls._instance = super().__new__(cls) return cls._instance def __init__(self, cos_config): self.client = CosS3Client(CosConfig(Region=cos_config['Region'], stId=cos_config['stId'], stKey=cos_config['stKey'])) self.Region = cos_config['Region'] # code<0,都是错误 # 需要copy.deepcopy,用于多线程 self.res_json = { 'code': -1, 'msg': 'fail', 'response': None } super().__new__(cls): __new__是Python的构造方法,在 __init__之前执行 super().__new__(cls)调用父类(object)的 __new__方法 创建并返回一个 cls类的新实例对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 # 非单例使用 class COSBrowser: def __init__(self, cos_config,pool_thread_size=10): self.client = CosS3Client(CosConfig(Region=cos_config['Region'], stId=cos_config['stId'], stKey=cos_config['stKey'], ServiceDomain=cos_config['ServiceDomain'])) self.cos_threadpool=SimpleThreadPool(num_threads= pool_thread_size) self.Region = cos_config['Region'] # code<0,都是错误 # 需要copy.deepcopy,用于多线程 self.res_json = { 'code': -1, 'msg': 'fail', 'response': None }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class IoProcess: _instance = None _initialized = False def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if not self._initialized: # # 加载配置文件 # configs = yaml.load(open(yaml_file), Loader=yaml.FullLoader) def init(self, yaml_file): # 加载配置文 xxx self._initialized = True
1 2 3 4 def __new__(cls, *args, **kwargs): if cls._instance is None: # 检查是否已存在实例 cls._instance = super().__new__(cls) # 首次调用时创建实例 return cls._instance # 总是返回同一个实例
1 先执行__new__() 后执行__init__()
1 ioprocess=IoProcess()->ioprocess.init()
抽象类 abc.ABC 1 2 Python 的抽象基类(Abstract Base Class,简称 ABC)是通过 abc 模块提供的一种工具,用于定义接口并强制子类实现特定方法。 通过继承 abc.ABC 或指定元类为 abc.ABCMeta,可以定义抽象基类。使用@abstractmethod 装饰器标记抽象方法,子类必须实现这些方法,否则无法实例化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from abc import ABC, abstractmethod class FileHandler(ABC): @abstractmethod def read(self, filename: str): """读取文件内容""" pass @abstractmethod def write(self, filename: str, data: any): """写入文件内容""" pass class JsonHandler(FileHandler): def read(self, filename: str): import json with open(filename, 'r') as f: return json.load(f) def write(self, filename: str, data: any): import json with open(filename, 'w') as f: json.dump(data, f)
1 2 abc.ABCMeta是 Python 抽象基类的元类 抽象基类(Abstract Base Class, ABC)是不能被实例化,只能被继承的类。它定义了一个接口,强制子类实现特定的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import abc class Animal(abc.ABC): # 抽象基类 @abc.abstractmethod def speak(self): pass # 子类必须实现 class Dog(Animal): def speak(self): # 必须实现 return "Woof!" class Cat(Animal): def speak(self): # 必须实现 return "Meow!" # 可以实例化子类 dog = Dog() cat = Cat() # 但不能实例化抽象基类 # animal = Animal() # TypeError!
1 2 3 4 5 6 7 8 9 10 class Data(BaseModel, metaclass=abc.ABCMeta): Data类继承自 BaseModel(Pydantic 基类) 同时使用 ABCMeta元类,使其成为抽象基类 元类(metaclass)是什么? 元类是"类的类" 控制类的创建行为 ABCMeta元类让类具有抽象基类的特性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 1. 是的,它是抽象基类,但可以有实例方法! 关键点:抽象基类 ≠ 只有抽象方法 抽象基类可以包含: 抽象方法(用 @abc.abstractmethod装饰) 具体方法(普通方法) 属性 类属性 类方法 静态方法 2. 抽象基类的定义标准 一个类是抽象基类,只需要满足以下任一条件: 包含至少一个抽象方法 元类是 ABCMeta 继承了抽象基类 你的 Data类满足 1 和 2,所以是抽象基类。
两者关系 1 2 3 4 5 6 7 8 9 10 11 # abc.ABC是使用 abc.ABCMeta的语法糖(更方便的写法) # 方式1:使用 abc.ABC(推荐) class MyClass1(abc.ABC): pass # 方式2:使用 metaclass=abc.ABCMeta(传统) class MyClass2(metaclass=abc.ABCMeta): pass # 两者完全等价!
1 2 3 4 5 6 # abc.py 中的实际定义 class ABC(metaclass=ABCMeta): """Helper class that provides a standard way to create an ABC using inheritance. """ __slots__ = ()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import abc # 方式A:使用 abc.ABC(更简洁) class AnimalA(abc.ABC): @abc.abstractmethod def speak(self): pass # 方式B:使用 metaclass=abc.ABCMeta(更明确) class AnimalB(metaclass=abc.ABCMeta): @abc.abstractmethod def speak(self): pass # 两者效果完全相同
原类
type 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 1.查看对象类型 2.创建类 >>> def fn(self, name='world'): # 先定义函数 ... print('Hello, %s.' % name) ... >>> Hello = type('Hello', (object,), dict(hello=fn)) # 创建Hello class >>> h = Hello() >>> h.hello() Hello, world. >>> print(type(Hello)) <class 'type'> >>> print(type(h)) <class '__main__.Hello'> class的名称; 继承的父类集合,注意Python支持多重继承,如果只有一个父类,别忘了tuple的单元素写法; class的方法名称与函数绑定,这里我们把函数fn绑定到方法名hello上。
1 2 3 4 5 除了使用type()动态创建类以外,要控制类的创建行为,还可以使用metaclass元类 当我们定义了类以后,就可以根据这个类创建出实例,所以:先定义类,然后创建实例。 但是如果我们想创建出类呢?那就必须根据metaclass创建出类,所以:先定义metaclass,然后创建类。 连接起来就是:先定义metaclass,就可以创建类,最后创建实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 # metaclass是类的模板,所以必须从`type`类型派生: class ListMetaclass(type): def __new__(cls, name, bases, attrs): attrs['add'] = lambda self, value: self.append(value) return type.__new__(cls, name, bases, attrs) class MyList(list, metaclass=ListMetaclass): pass Python解释器在创建MyList时,要通过ListMetaclass.__new__()来创建,在此,我们可以修改类的定义,比如,加上新的方法,然后,返回修改后的定义。 __new__()方法接收到的参数依次是: 当前准备创建的类的对象; 类的名字; 类继承的父类集合; 类的方法集合。
1 2 3 4 5 6 >>> L = MyList() >>> L.add(1) >> L [1] 而普通的list没有add()方法
ORM 时间库 time .strptime() 1 2 3 >>>time.strptime('2025-12-12_13-59-30', '%Y-%m-%d_%H-%M-%S').tm_hour time.struct_time(tm_year=2025, tm_mon=12, tm_mday=12, tm_hour=13, tm_min=59, tm_sec=30, tm_wday=4, tm_yday=346, tm_isdst=-1)
.gmtime() 1 2 >>>time.gmtime(time.time()) # UTC时间 是一个对象 time.struct_time(tm_year=2026, tm_mon=1, tm_mday=12, tm_hour=10, tm_min=0, tm_sec=51, tm_wday=0, tm_yday=12, tm_isdst=0)
.strftime() 1 2 3 4 import time curr_time = time.strftime("%Y-%m-%d_%H-%M-%S") # 返回字符串 '2026-01-08_11-14-36'
1 2 3 4 time.strftime('%Y-%m-%d %H:%M:%S', utc_time) >>> time.strftime('%Y-%m-%d_%H-%M-%S', time.strptime('2025-12-12_13-59-30', '%Y-%m-%d_%H-%M-%S')) '2025-12-12_13-59-30'
时间戳 1 2 time.time() # UTC时间 秒小数点15位 与时区无关,全球统一 time.localtime(time.time()) # 时区不同时间不同
1 2 3 4 time.mktime(time.strptime('时间字符串','%Y-%m-%d也就是时间格式')) >>> time.mktime(time.strptime('2025-12-12_13-59-30','%Y-%m-%d_%H-%M-%S')) 1765519170.0
datetime 1 from datetime import datetime
now() 1 2 3 4 5 from datetime import datetime >>>clip_create_time= datetime.now().isoformat() '2026-01-08T11:19:16.862023'
时间戳 ->时间戳 1 2 3 4 datetime.now().timestamp() # year month day hour minute second 1768210913.418071 startDate = datetime.strptime(startDate, "%Y-%m-%d %H:%M:%S").timestamp()
时间戳-> 1 2 3 4 timestamp = 1750665600 # 这是示例,实际值需要计算 dt_object = datetime.fromtimestamp(timestamp) # 这里需要是秒单位,毫秒的时间戳报错 formatted_date = dt_object.strftime("%Y-%m-%d")
.strftime() 1 2 >>> datetime.now().strftime("%Y-%m-%d") '2026-01-08'
1 2 >>> datetime.now() datetime.datetime(2026, 1, 8, 11, 22, 14, 313776)
1 2 shanghai_time = datetime.now(pytz.timezone('Asia/Shanghai')) partition = shanghai_time.strftime("%Y-%m-%d %H:%M:%S")
.strptime 1 2 from datetime import datetime, timedelta datetime.strptime(date_key, "%Y-%m-%d") - timedelta(days=1)
.timedelta 1 (datetime.strptime(date_key, "%Y-%m-%d") - timedelta(days=1)).strftime("%Y-%m-%d")
读写文件with open(待) 1 with open打开一个文件时,文件路径与python打开的工作路径有关!!!
1 2 3 4 5 6 7 8 9 10 11 12 13 #动态获取脚本所在目录、可以确保无论从哪里运行脚本,都能正确定位到同目录下的文件。 import os # 获取当前执行的 Python 文件所在的目录 script_dir = os.path.dirname(os.path.abspath(__file__)) # 构建文件的绝对路径 file_path = os.path.join(script_dir, 'data.txt') # 安全地打开文件 with open(file_path, 'r', encoding='utf-8') as f: content = f.read()
访问模式 r:默认模式,文件不存在报错。文件指针在文件开头
w:文件不存在创建并写入,存在则覆盖。文件指针在文件开头
a:文件不存在创建并写入,存在追加。指针在末尾
rb:以二进制格式打开一个文件用于只读。文件指针将会放在文件的开头。这是默认模式。
r+:打开一个文件用于读写。文件指针将会放在文件的开头。
wb:以二进制格式打开一个文件只用于写入。如果该文件已存在则打开文件,并从开头开始编辑(文件指针在文件开头),即原有内容会被删除。如果该文件不存在,创建新文件。
w+:打开一个文件用于读写。如果该文件已存在则打开文件,并从开头开始编辑(文件指针在文件开头),即原有内容会被删除。如果该文件不存在,创建新文件。
wb+:以二进制格式打开一个文件用于读写。如果该文件已存在则打开文件,并从开头开始编辑(文件指针在文件开头) ,即原有内容会被删除。如果该文件不存在,创建新文件。
ab:以二进制格式打开一个文件用于追加。如果该文件已存在,文件指针将会放在文件的结尾
a+:打开一个文件用于读写。如果该文件已存在,文件指针将会放在文件的结尾。文件打开时会是追加模式。如果该文件不存在,创建新文件用于读写。
ab+:以二进制格式打开一个文件用于追加。如果该文件已存在,文件指针将会放在文件的结尾。如果该文件不存在,创建新文件用于读写。
主访问模式 r w a:是主访问模式,另外的模式都会保留其特点 。带有r的文件不存在报错,带w或a的文件不存在创建
只需要要记住访问模式的特征,还有指针位置便很好理解和记忆
w+ w+虽然有读写功能,但是读的时候先写,由于写入空内容覆盖,所以没有读取到
1 2 3 4 # text.txt假如这里文件里有实际内容,但是这里打印没有 f = open("text.txt","w+") f.read() f.close
a+ 由于指针在末尾,所以读不到内容
1 2 3 4 f = open ("text.txt" ,"w+" ) f.read() f.close
文件指针seek() 作用:用来移动文件指针位置
语法:文件对象.seek(偏移量,起始位置)
起始位置: 0:文件开头 1:当前位置 2:文件结尾
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 """ data.txt aaaaa bbbbb ccccc """ f = open ("data.txt" ,"r+" ) f.seek(2 ,1 ) f.read() f.close() """ aaa bbbbb ccccc """
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 """ data.txt aaaaa bbbbb ccccc """ f = open ("data.txt" ,"r+" ) f.seek(0 ,2 ) f.read() f.close() """ """
读 readlines 1 2 3 readlines()函数适用于需要一次性读取所有行,并将它们保存在列表中的情况。它返回一个列表,每个元素代表文件中的一行(包含换行符),便于对整个文件内容进行操作。 with open(filter_file_name,'r') as f: filter_contents = [line.strip() for line in f.readlines() if line.strip()]
readline 1 2 3 4 5 6 7 8 readline()函数适用于按行读取文件的情况。如果文件较大,或者只需要处理文件的一部分内容,可以使用readline()逐行读取,节省内存。 with open('example.txt', 'r') as file: line = file.readline() while line: print(line, end='') # 打印每一行 line = file.readline()
read 1 2 3 read()函数适用于文件较小且可以一次性读取到内存的情况。它将整个文件内容读取到一个字符串中,适合用于对文件内容进行整体处理。 参数size:字节数,没有指定则读取全部
用循环读 1 2 3 4 5 6 7 8 9 def read_data_from_txt(txt_path): data_list=[] with open(txt_path, mode='r', newline='', encoding='utf-8') as txt_file: # 逐行读取txt文件 for row in txt_file: data_list.append(row.strip()) return data_list
1 2 3 with open('failed_tasks.txt', 'w', encoding='utf-8') as f: for task_name, error in failure_list: f.write(f"{task_name}: {error}\n")
写 write 1 2 3 4 5 6 7 8 9 10 11 12 13 with open('example.txt', 'w') as file: file.write('Hello, Python!') 写入模式: 'w': 写入模式,如果文件存在,则覆盖原有内容;如果文件不存在,则创建新文件。 'a': 追加模式,如果文件存在,则在文件末尾追加内容;如果文件不存在,则创建新文件。 'wb': 二进制写入模式,用于写入二进制数据,如图片或视频等。 注意事项: 在写入文本时,如果需要指定特定的编码格式,可以通过encoding参数来设置。 在处理非UTF-8编码的文本时,可能会遇到编码错误。此时,可以通过设置errors='ignore'来忽略这些错误。 当处理大型文件或需要频繁写入时,应该注意内存和资源的管理。
1 2 3 4 5 dumped = yaml.dump( dict, default_flow_style=None, allow_unicode=True, width=9999 ) with PathManager.open(filename, "w") as f: f.write(dumped)
循环写 1 2 3 4 with open(success_file, 'w') as f: for dataset in sorted(clip_success_list): f.write(dataset + '\n') # success_file存在先清空,然后写入clip_success_list中所有内容
writelines 1 2 3 4 tasks_list = [f"{tasks_dir}/task_{idx:03d}.json\n" for idx in range(len(tasks))] tasks_list_path = f"{task_infos_dir}/filtered_tasks.txt" # 列表文件名标记为“筛选后” with open(tasks_list_path, "w", encoding="utf-8") as f: f.writelines(tasks_list)
用json.dump() 1 2 3 4 5 6 7 8 with open("./output.json", "w", encoding="utf-8") as f: json.dump(result_data, f, indent=2, ensure_ascii=False) # indent 用于美化格式 indent=None(默认)时输出紧凑格式;设置为整数时表示每层缩进的空格数。 ensure_ascii 参数: 默认值 True:所有非 ASCII 字符会被转义为 \uXXXX 格式。 设置为 False:保留原始字符(如中文),便于直接阅读。
案例(备份文件) 1 2 3 4 5 6 7 8 9 10 11 12 13 def bak (): old_name = input ("请输入备份文件名:" ) index = old_name.rfind("." ) new_name = old_name[:index] +'备份' + old_name[index:] f1 = open (old_name,'rb' ) f2 = open (new_name,'wb' ) while True : c = f1.read(1024 ) if len (c) == 0 : break f2.write(c) f1.close() f2.close()
open 1 在读取文件时,我们需要记得关闭文件,以释放资源(不然一直占用内存)。为了避免忘记关闭文件,可以使用with语句来自动关闭文件。
1 2 3 4 5 6 7 8 9 10 11 # 打开文件 file_path = "data.txt" file = open(file_path, "r") # 使用readline()函数逐行读取文件内容 line1 = file.readline() line2 = file.readline() # 关闭文件 file.close() # 打印文件内容 print("Line 1:", line1) print("Line 2:", line2)
打包 1 data_engine>data_engine、build.sh、requirements.txt、setup.py
1 2 # build.sh python setup.py bdist_wheel
1 2 3 4 # requirements.txt cos_python_sdk_v5==1.9.38 setuptools==65.5.1 PyYAML==6.0.3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from setuptools import setup, find_packagesimport oshere = os.path.abspath(os.path.dirname(__file__)) with open (os.path.join(here, 'requirements.txt' ), 'r' ) as f: requirements = f.read().splitlines() requirements = [req for req in requirements if req and not req.startswith('#' )] setup( name="dataengine" , version="1.0.1" , description="xxx" , author="" , author_email="" , packages=["data_engine" ], install_requires=requirements, python_requires=">=3.8" , classifiers=[ "Programming Language :: Python :: 3" , "License :: OSI Approved :: MIT License" , "Operating System :: OS Independent" , ], )
1 2 3 4 # 自动查找 packages packages=["data_engine"] # 包含的包目录 # 或者使用 find_packages() 自动查找所有包 # packages=find_packages() # 自动发现所有包
1 2 3 4 5 1. 安装项目到当前环境 # 开发模式安装(可编辑模式) pip install -e . # 普通安装 pip install .
1 2 3 4 5 6 7 2. 构建发布包 # 构建源码包 python setup.py sdist # 构建二进制 wheel 包 python setup.py bdist_wheel # 两者都构建 python setup.py sdist bdist_wheel
1 2 3 4 5 6 3. 检查包信息 # 验证 setup.py 语法 python setup.py check # 查看包信息 python setup.py --name python setup.py --version
1 2 3 4 5 6 7 8 9 10 11 pip install -e . 在系统的 site-packages/中创建链接文件(.egg-link 或 .pth 文件) 链接指向源代码目录 修改源代码立即生效(无需重新安装) 适合开发和调试 # 普通安装 pip install . 将包复制到系统的 site-packages/目录 修改源代码不会影响已安装的包 两种方式都需要 setup.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 # 创建源代码分发包 python setup.py sdist 生成的文件: dist/ └── dataengine-1.0.1.tar.gz dataengine-1.0.1.tar.gz ├── PKG-INFO # 包元信息 ├── setup.cfg # 额外配置 ├── setup.py # 构建脚本 └── dataengine/ # 源代码 ├── __init__.py ├── data_engine.py └── ... ✅ 需要编译(如果有C扩展) ✅ 最原始的发布格式 ❌ 安装速度较慢 ✅ 兼容性好(所有平台和Python版本) pip install dataengine-1.0.1.tar.gz # 1. 解压tar.gz # 2. 运行setup.py # 3. 编译(如果有C扩展) # 4. 安装到site-packages 只包含源代码,没有编译好的字节码 安装时在目标机器上实时编译 编译时使用目标机器的Python版本 # setup.py # 源代码是通用的,编译时适应目标环境 # 用户机器上的Python 3.8编译 -> Python 3.8字节码 # 用户机器上的Python 3.9编译 -> Python 3.9字节码 pip install dataengine-1.0.1.tar.gz # 1. 解压源代码 # 2. 在用户机器上运行:python setup.py install # 3. 用用户机器的Python版本编译.py为.pyc # 4. 安装到用户环境的site-packages
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 # 创建wheel包 python setup.py bdist_wheel dist/ └── dataengine-1.0.1-py3-none-any.whl # 文件名格式:{包名}-{版本}-{Python标签}-{ABI标签}-{平台标签}.whl dataengine-1.0.1-py3-none-any.whl ├── dataengine-1.0.1.dist-info/ │ ├── METADATA # 元数据 │ ├── WHEEL # Wheel规范 │ └── RECORD # 文件清单 └── data_engine/ # 预编译的包文件 ├── __init__.py ├── data_engine.py └── ... ✅ 安装速度快(无需编译) ✅ 包含编译好的文件 ❌ 可能需要多个版本(不同平台) ❌ 不包含源代码 ✅ 现代推荐的发布格式 pip install dataengine-1.0.1-py3-none-any.whl # 1. 直接解压到site-packages # 2. 无需编译 # 3. 完成安装 包含预编译的字节码(.pyc文件) 字节码是特定Python版本生成的 直接复制到site-packages,无需编译
读取配置文件 LazyConfig 1 2 cfg = LazyConfig.load(config_yaml) cfg = cfg.output_reloc_result
Config 1 2 from mmcv import Config, DictAction cfg = Config.fromfile(args.config)
yaml
读 yaml.load 1 2 configs = yaml.load(open(yaml_file), Loader=yaml.FullLoader) configs = configs['xxxxxx']
yaml.safe_load和yaml.unsafe_load和yaml.full_load 1 2 3 4 5 6 yaml.safe_load() 高 基本类型 加载可信YAML配置文件 同 yaml.load(file, Loader=yaml.SafeLoader)
1 2 3 4 yaml.unsafe_load() 低 任意Python对象 加载完全可信的内部数据
1 2 3 4 5 6 yaml.full_load() 中 扩展类型 有选择的扩展对象 同 yaml.load(open(yaml_file), Loader=yaml.FullLoader)
写 yaml.dump safe_dump 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import yaml # Data to be written data = { "name": "John Doe", "age": 30, "skills": ["Python", "JavaScript", "SQL"] } # Writing to a YAML file with open("output.yaml", "w") as file: yaml.dump(data, file, allow_unicode=True, default_flow_style=False) age: 30 name: John Doe skills: - Python - JavaScript - SQL
1 2 3 将 Python 对象(例如字典、列表)转换为 YAML 格式 Unicode 支持:使用 allow_unicode=True 处理非 ASCII 字符 格式化:设置 default_flow_style=False 以使用块状格式(更易读)
1 2 3 4 5 6 7 8 9 10 11 12 示例:将 YAML 作为字符串返回 yaml_string = yaml.dump(data, allow_unicode=True, default_flow_style=False) print(yaml_string) Output: 输出: age: 30 年龄:30 name: John Doe 姓名:约翰·多伊 skills: 技能: - Python - JavaScript - SQL
1 2 3 4 5 6 yaml.dump() 支持最多对象类型 低(可能序列化任意对象) yaml.safe_dump() 只序列化基本类型 高
实例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 dataset: base_path: "models-changan-1322983810/lidarextcalib" cosfacotyconfiger: account: - name: '100038458456' congiger: xx: 'xx' xx: 'xx' Region: 'ap-shanghai-adc' ServiceDomain: 'cos.ap-shanghai-adc.myqcloud.com' - name: '100036871817' congiger: xx: 'xx' xx: 'xx' Region: 'ap-shanghai-adc' ServiceDomain: 'cos.ap-shanghai-adc.myqcloud.com'
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 with open(config_path, 'r', encoding='utf-8') as f: config_data = yaml.safe_load(f) if not config_data or 'cosfacotyconfiger' not in config_data: print(f"Invalid config file format: {config_path}") return False cos_configer = config_data['cosfacotyconfiger'] if 'account' not in cos_configer: print(f"No accounts found in config file: {config_path}") return False # 加载所有账号 for account_info in cos_configer['account']: account_name = account_info.get('name') account_config = account_info.get('congiger', {})
单元测试 1 2 import unittest from unittest.mock import Mock, patch
日志 python输出日志你会使用啥?print还是logging ,这几个模块更加不错哦。 - 知乎
loguru 1 from loguru import logger
.info
.warning logging 1 2 3 4 5 6 7 8 9 #控制日志输出级别 root = RootLogger(WARNING) Logger.root = root Logger.manager = Manager(Logger.root) # 输出样式 WARNING:root:这是一条warning日志 ERROR:root:这是一条error日志 总结起来就是: 日志级别:日志名称:日志内容
1 2 3 4 5 6 7 8 9 10 11 12 # 日志类型 logging.debug("") logging.warning("") logging.error("") logging.info("") logging.critical("") 或者 logging.log(logging.DEBUG,"") logging.log(logging.WARNING,"") logging.log(logging.ERROR,"") logging.log(logging.INFO,"") logging.log(logging.CRITICAL,"")
1 2 logging.basicConfig(filename='有这个参数就不会输出在终端',filemode='默认追加模式',format='设置日志格式',level='日志级别',stream='不能与filename同时存在,可以是sys.stdout、sys.stderr、网络stream')
warnings 1 2 3 import warnings if args.options: warnings.warn('xxxx')
@property和setter 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 class MyClass: def __init__(self, value, items=None): self.value = value self._items = [] if items is not None: self._items = items @property def items(self): return self._items.copy() @items.setter def items(self, value): self._items = list(value) def __repr__(self): return f"MyClass(value={self.value}, items={self._items})" # 测试 if __name__ == "__main__": # 1. 测试属性保护 obj = MyClass(1, [1, 2, 3]) print(f"初始化: {obj}") # 获取items(返回的是副本) items_copy = obj.items print(f"获取的items: {items_copy}") print(f"items_copy is obj._items: {items_copy is obj._items}") # False # 尝试修改副本 items_copy.append(999) print(f"修改副本后: {obj}") # 内部未改变 # 2. 测试设置器 external_list = [10, 20, 30] obj.items = external_list print(f"\n设置新列表后: {obj}") # 修改外部列表 external_list.append(999) print(f"修改外部列表后: {obj}") # 内部未改变 # 3. 直接操作内部列表(不推荐,但可能发生) obj._items.append(888) # 可以,但违反封装原则 print(f"直接修改_items后: {obj}")
库 pip 1 pip install -r xxx.txt -i http.... --trusted-host mirrors.tencentyun.com
os getenv 1 2 3 4 5 这是一个专门的函数,用于安全地获取环境变量的值。 特点: 只读:仅用于获取值,不能修改环境变量。 更安全:当变量不存在时,不会抛出异常,而是返回默认值(默认为 None)。 返回字符串或 None:如果变量存在则返回字符串值,不存在则返回默认值。
1 2 os.getenv('ENV_NAME',''): 这行代码用于从系统环境变量中获取值,并设置默认值,环境变量名为ENV_NAME
environ 1 2 3 4 5 这是一个类似字典的对象,包含了当前进程的所有环境变量。 特点: 可读写:你可以读取、修改、添加或删除环境变量。 影响当前进程:修改会直接影响当前 Python 进程及其子进程的环境。 始终返回字符串:键和值都是字符串类型。
1 st_key = os.environ['COS_st_KEY']
os.name 1 2 3 4 5 6 7 os.name != 'nt' 返回值的含义: 'nt': 这表示你正在运行 Windows 系统。 'posix': 这表示你正在运行一个符合 POSIX 标准 的类 Unix 系统。
.listdir() 1 os.listdir(PATH_TO_REQUESTS)
.getcwd() 1 2 import os os.getcwd() 获取当前工作路径
.chdir() os.chdir更改CWD当前工作目录,影响加载文件路径
.rename() 重命名文件名、或者文件夹
os.rename(文件名,文件名)
os.rename(文件夹名,文件夹名)
.remove() os.remove(文件)
os.path 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 os.path.abspath(__file__) 获取文件路径包含文件名 # 用当前打开的工作目录拼接 os.path.dirname() 获取文件的文件夹目录 os.path.join("dir","file") os.path.isfile(file_path) os.path.isdir(file_path) os.path.splitext() 是 Python 中用于分离文件名和扩展名的函数。 os.path.splitext("a/b.txt") 文件名包含路径时返回 ["a/b",".txt"] os.path.basename() os.path.exists("xxx")和Path("xxx").exists()一样可以判断目录是否存在
os.path.isdir os.path.isfile os.path.join os.path.splitext os.path.exists os.path.abspath 1 with open (f"{os.path.abspath(os.path.dirname(__file__))} /../version_info.txt" ) as f
os.path.dirname 1 os.path.dirname(os.path.abspath(__file__)) # 这里os.path.abspath不是多余的,可以避免(如符号链接、特殊导入方式)不是指向真正文件的问题
os.path.basename 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import os # 示例:获取文件名 file_path = "/home/user/documents/file.txt" print(os.path.basename(file_path)) # 输出: file.txt # 示例:获取目录名 dir_path = "/home/user/documents/" print(os.path.basename(dir_path)) # 输出: documents # 示例:仅文件名 print(os.path.basename("file.txt")) # 输出: file.txt # 示例:根目录 print(os.path.basename("/")) # 输出: ''
–file– 1 2 3 4 5 6 7 8 9 10 11 12 13 PATH_TO_REQUESTS = os.path.join(os.path.dirname(__file__), "../", "data/requests") __file__ - 当前Python脚本文件的完整路径 print(__file__) 比如:/home/user/project/src/utils/config.py os.path.dirname(__file__) - 获取当前文件所在的目录路径 比如:/home/user/project/src/utils "../" - 相对路径,表示上一级目录 从 /home/user/project/src/utils回到 /home/user/project/src 最终结果:/home/user/project/src/data/requests
1 filename.endswith(".json")
1 2 3 with open(f"{os.path.abspath(os.path.dirname(__file__))}/../version_info.txt") as f 获取的是当前文件位置,而不是调用它的函数文件的位置
os.remove(file) 1 os.remove(file):只能删除单个文件
os.rmdir(dir) os.rmdir(path):只能删除空目录
os.mkdir() os.mkdir(文件夹名字)
os.makedirs() 1 2 os.makedirs(os.path.dirname(out_label_path), exist_ok=True) # exist_ok=True,当文件夹存在不会报错
pathlib.Path 1 2 3 from pathlib import Path Path("/a//b//") >> /a/b
.cwd() # 获取当前工作目录
cwd = Path.cwd()
.joinpath() 1 2 3 4 from pathlib import Path dir = Path(xxx)/partition+'/' # 得到的是path相关的对象 dir = str( Path(xxx).joinpath(xxx))+'/'+check_path # 字符串路径
1 2 3 str(Path(dir).joinpath("lidar_left/")) # 这里会将末尾的‘/’删除 str(Path(dir).joinpath("lidar_left/"))+‘/’ 可以这样
.exists() 1 Path(config_yaml).exists()
.mkdir() 1 通过设置参数 parents=True 和 exist_ok=True,可以递归创建多级目录,并忽略已存在的目录。
1 2 logs_dir = Path(cfg.temp_dir) logs_dir.mkdir(parents=True, exist_ok=True)
.rmdir() 删除单目录
path(xxx).rmdir()
.parent 1 self._mkdir(Path(local_file).parent)
.is_file() /.is_dir()
.is_dir()
.suffix 1 2 3 Path.suffix:获取扩展名(包括点),例如 '.txt' path.suffix in ['.yaml', '.yml']
.suffixes 1 Path.suffixes:获取所有扩展名列表,例如 ['.tar', '.gz']
.resolve 1 2 3 4 5 # 用当前打开的工作目录拼接 pathlib.Path.resolve() 会将一个路径对象转换为绝对路径,并解析路径中所有的符号链接(os.path.abspath(xxxx)也有同样的效果),直到得到最终的物理路径 strict=True:路径不存在时抛出异常。 strict=False(默认):即使路径不存在,也返回一个尽可能解析的绝对路径。
1 BASE_DIR = Path(__file__).resolve().parent # 获取一个文件的父目录
.absolute 用于返回当前路径的绝对路径 ,但不会解析符号链接 。它返回一个新的 Path 对象,适合在需要获取完整路径但保留符号链接时使用。
1 2 3 abs_path = Path(path).absolute() if not abs_path.exists(): os.makedirs(str(abs_path))
.is_absolute is_absolute() 判断是否为绝对路径
.name 1 Path.name:获取完整的文件名(包括扩展名),例如 'document.txt'
.stem 1 2 3 Path.stem:获取不包含扩展名的文件名,例如 'document' 如果文件名以点开头(例如 '.gitignore')或有多个点(例如 'archive.tar.gz'),它只去除最后一个点之后的部分
shutil 1 2 3 4 5 6 shutil.rmtree(down_path)是Python标准库中的一个函数,用于递归删除整个目录树。 删除指定路径 down_path下的所有文件和子目录 删除是永久性的,不会移动到回收站 会删除目录本身及其所有内容 shutil.rmtree(down_path, ignore_errors=True)
1 2 os.remove(path):只能删除单个文件 os.rmdir(path):只能删除空目录
sys.path 1 2 workspace_path = "/workspace" sys.path.insert(0, workspace_path) # sys.path 是一个列表 插入前面可以避免标准库的覆盖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # sys.path 示例 sys.path = [ '/usr/local/lib/python3.8/site-packages', '/usr/lib/python3.8', '/home/user/project' ] # 在索引 0 位置插入 sys.path.insert(0, '/workspace') # 插入后的 sys.path print(sys.path) # 输出: # [ # '/workspace', # 新插入的路径 # '/usr/local/lib/python3.8/site-packages', # 原来的第一个元素 # '/usr/lib/python3.8', # 原来的第二个元素 # '/home/user/project' # 原来的第三个元素 # ]
1 2 # 导入不同位置的模块代码 workspace>production_system->src>workspace_tests>test>test.py
1 2 3 4 5 6 7 8 9 10 # 获取当前文件的路径 current_file = Path(__file__).resolve() # workspace>production_system->src>workspace_tests>test>test.py # 向上跳三级,到 src 目录 src_dir = current_file.parent.parent.parent sys.path.insert(0, str(src_dir.parent)) # 添加 production_system 到路径 使用append不行,是添加到列表最末尾。insert添加到0位置 # 从 src 开始导入 from src.workspace.resources.base_sources import MongoDBResource
调试源码 1 2 3 4 5 6 7 8 9 10 # 在终端中执行 cd /workspace/src/workspace/sensors/test # 创建指向源码的符号链接 ln -s /workspace/data_engine/data_engine ./data_engine # 现在可以直接导入 # test.py import data_engine from data_engine import DataEngine, COSStorage
1 除了之前用的创建软连接方法,因为sys.path里第一个是优先读取当前目录。还可以使用sys.path
1 2 3 4 5 productition_system>data_engine>data_engine>init.py productition_system>data_engine>test>test.py sys.path.insert(0,str(Path(__file__).parent.parent)) from data_engine import DataEngine,LocalStorage,COSStorage
sys.path和CWD sys.path 模块导入:取决于 sys.path
当你执行 import my_module时,Python 解释器会去一个列表(sys.path)里按顺序查找名为 my_module的模块。
sys.path的生成规则
模块导入不关心 你终端的当前目录,只关心 sys.path。
如果你在 /home/user/目录下运行 python /project/src/main.py,那么 main.py中导入模块的基准目录是 /project/src/,而不是 /home/user/。
CWD 当你在代码中使用相对路径操作文件(如 open('data.txt')、pd.read_csv('./data.csv'))时,这个路径是相对于当前工作目录 (Current Working Directory, CWD) 的。
CWD 的确定规则:
继承自终端 :默认情况下,Python 进程会继承启动它的那个终端(Terminal/Shell)的当前目录。
如果你在终端 /home/user下执行 python script.py,CWD 就是 /home/user。
可以被代码改变:使用 os.chdir('/new/path')可以改变当前进程的 CWD。
⚠️ 常见坑点:
pickle 1 2 3 4 5 6 7 # 序列化为二进制 binary_content = pickle.dumps(dynamic_pkl) # 上传到COS upload_result = self.cos_utils.upload_from_binary( cos_file_path, binary_content, self.bucket_name_ )
1 2 3 4 5 6 7 pickle.load() pickle.loads() pickle.dump() pickle.dumps() with open(pkl_file, 'rb') as f: data = pickle.load(f)
pydantic 1 2 3 4 5 6 7 自动类型转换:即使输入的类型不完全正确,也会自动尝试转换(比如字符串 "123" 自动转为整数)。 数据验证:根据字段的类型和限制条件自动校验。 这正是 Pydantic 的神奇之处——它完全不需要写 __init__方法和 self.xxx! from pydantic import BaseModel, ConfigDict, Field
不需–init– 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 # 传统Python类(需要写__init__) class TraditionalClass: def __init__(self, agent_ids=None, is_abnormal=False): self.agent_ids = agent_ids or [] self.is_abnormal = is_abnormal # Pydantic类(完全自动 自动生成) class BehaviorData(Data): agent_ids: List[str] = Field(default=[]) is_abnormal: bool = Field(default=False) # 不需要写__init__! # 使用方式完全相同 data = BehaviorData(agent_ids=["car1"], is_abnormal=True) print(data.agent_ids) # ["car1"] print(data.is_abnormal) # True
ConfigDict 1 2 3 4 5 6 7 class Data(BaseModel, metaclass=abc.ABCMeta): model_config = ConfigDict(...) # 父类配置 class BehaviorData(Data): # 继承Data # 自动继承父类的model_config! # 不需要重复写配置 pass
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from pydantic import BaseModel, ConfigDict, Field from enum import Enum class Color(Enum): RED = "red" GREEN = "green" # strict=True 的效果 class StrictModel(BaseModel): model_config = ConfigDict(strict=True) number: int # 传统Python会进行隐式转换 # 但strict=True时不会: # StrictModel(number="123") # 错误!字符串不能转int # StrictModel(number=123) # 正确 # use_enum_values=False 的效果 class EnumModel(BaseModel): model_config = ConfigDict(use_enum_values=False) color: Color obj = EnumModel(color=Color.RED) print(obj.color) # Color.RED(枚举对象) print(obj.model_dump()) # {'color': Color.RED} # 如果use_enum_values=True: # print(obj.model_dump()) # {'color': 'red'}
model_fields 1 2 3 4 5 6 7 8 9 10 11 12 13 14 # 创建实例 data = BehaviorData() # 查看所有字段 print(data.model_fields) # 输出:包含所有字段定义的字典 # 查看字段值 print(data.agent_ids) # [](默认值) print(data.is_abnormal) # False # 查看字段配置 print(data.model_fields['agent_ids'].default) # [] print(data.model_fields['agent_ids'].title) # '自车:存在交互目标的ids_list'
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from pydantic import BaseModel, Fieldclass User (BaseModel ): id : int name: str age: int = Field(gt=0 , lt=120 ) user = User(id ="123" , name="Alice" , age=25 ) print (user.id , type (user.id )) User(id ="abc" , name="Bob" , age=-5 )
Filed 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Field是 Pydantic 的字段定义函数,用于为模型字段添加额外的元数据和验证规则。 Field( default=..., # 默认值 title=..., # 标题/描述 description=..., # 详细描述 examples=..., # 示例值 gt/ge/lt/le=..., # 数值范围限制 min_length/max_length=..., # 长度限制 pattern=..., # 正则表达式 frozen=True/False, # 是否可修改 alias=..., # 字段别名 validate_default=..., # 是否验证默认值 repr=True/False, # 是否在repr中显示 # ... 更多参数 )
field_validator 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 class MapData(Data): """ 高精地图 """ coords: str = Field(default="ENU", title='坐标系类型') meta: MetaData = Field(default=None, title='地图元数据', description='采集时间 天气 数据来源等') lanes: Dict[int, LaneData] = Field(default={}, title='中心线') lines: Dict[int, LineData] = Field(default={}, title='车道线') paths: Dict[int, PathData] = Field(default={}, title='对应lanev1,一条路径上的所有车道') links: Dict[int, CityMapLinkData] = Field(default={}, title='道路级信息') # 车道信息 ground_markings: Dict[int, GroundMarkingData] = Field(default={}, title='车信') cross_walks: Dict[int, CrossWalkData] = Field(default={}, title='人行横道') stop_lines: Dict[int, StopLineData] = Field(default={}, title='停止线') arrows: Dict[int, ArrowData] = Field(default={}, title='地面箭头') boundaries: Dict[int, BoundariesData] = Field(default={}, title='障碍物/边线') trafficlights: Dict[int, TrafficLightData] = Field(default={}, title='地图交通灯') routing: Union[RoutingLocalData, RoutingGlobalData] = Field(default=None, title='导航数据') ego_lane_id: int = Field(default=-1, title='自车所在的车道中线id') seg_dict: Dict[int, List[List[List[float]]]] = Field(default={}, title='用于local_map中path的计算,seg_roi的id信息') # 路口信息 intersections: Dict[int, IntersectionData] = Field(default={}, title='路口') # trafficsign_speed trafficsign pole信息 trafficsign: Dict[int, TrafficSignData] = Field(default={}, title='交通标志') pole: Dict = Field(default={}, title='pole') # 矩阵的数据格式 是否使用二维数组 affinity_matrix: List[List[int]] = Field( default=[[]], title='按照lanes_order_list_for_matrix存储的lane的邻接矩阵') associate_lane_lane: List[List[int]] = Field( default=[[]], title='中心线左右相邻关系') associate_lane_line: List[List[int]] = Field( default=[[]], title='车道线左右相邻关系') lanes_order_list_for_matrix: List[int] = Field( default=[], title='对应到中心线邻接矩阵的顺序') lines_order_list_for_matrix: List[int] = Field( default=[], title='对应到车道线矩阵的顺序') # 地图标签(from citymap) map_tags: List[MapTagData] = Field( default=[], title='地图标签原始数据') @field_validator('lanes', 'lines', 'paths', 'ground_markings', 'boundaries', 'cross_walks', 'stop_lines', 'trafficlights', 'intersections') @classmethod def _check_field_valid(cls, var: dict) -> dict: """ validation decorator of the Dict logit: the key and the id of value must be the same """ for k, v in var.items(): if v is None: raise ValueError('the value of Map.var_dict is None') if v is None or k != v.id: raise ValueError('the value of the key in Map.var_dict and the id is different') return var @field_validator('links', mode='before') @classmethod def _check_links(cls, var: dict) -> dict: if 'global_links' in var.keys(): del var['global_links'] return var
实例1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 class SensorData(ABC): # 传感器基类 def __init__(self, clipvistor:DataEngine,sensor_name:str) -> None: super().__init__() self.sensor_name_=sensor_name self.raw_data = {} # 存储原始帧数据 self.frame_datas = None # 存储TimeLine实例化后的数据 self.clipvistor = clipvistor @abstractmethod # DataLoad()加载数据实现函数, 调用ClipVistor 格式转换 FrameData def load_sensor_data(self,sensor_name:str, path_list)->bool: pass # AlignSensor(,time_line_base,delt) 单个传感器数据与基线对齐 def align_time_line(self,refrence_time:TimeLine,delt_time:int) -> Optional['SensorData']: or_time_line= self.frame_datas ref_time_dic= refrence_time.align_time(or_time_line,delt_time) suf_key_value={v:k for k,v in ref_time_dic.items() if v!=None} ref_sensor_data= self._create_new_instance() # 新初始化一个对象 temp_key_frame_data={} for or_time,al_time in suf_key_value.items(): temp_key_frame_data[al_time]=self.raw_data[or_time] ref_sensor_data.raw_data=temp_key_frame_data # 对齐后的结果传入新对象 对齐后的对象 if len(ref_sensor_data.raw_data)==0: return None time_points=[] for key,frame_data in ref_sensor_data.raw_data.items(): time_points.append(key) ref_sensor_data.frame_datas = TimeLine(time_points) return ref_sensor_data # align_and_conver_line(target_time_line: TimeLine, delt_time: int) 单个传感器数据转换为utc时间 def align_and_conver_line(self, al_time_line: TimeLine, target_time_line: TimeLine, delt_time: int) -> Optional['SensorData']: or_time_line= self.frame_datas al_time_dic= or_time_line.align_time(al_time_line,delt_time) # TimeLine中的方法 #转换到已对其的时间为体系 al_key_value={al_time:or_time for or_time,al_time in al_time_dic.items() if al_time!=None} #获取对应参考实际的开始点 al_time_ticks= list(al_key_value.keys()) al_time_temp_line=TimeLine(al_time_ticks) anch_time_index=al_time_line.get_tick_index(al_time_temp_line[0]) temp_target_time_line= al_time_temp_line.convert_by_timeline_to_dict(target_time_line,anch_time_index) ref_sensor_data= self._create_new_instance() # 新初始化一个对象 temp_key_frame_data={} for al_time_,tra_get_time_ in temp_target_time_line.items(): or_time_= al_key_value[al_time_] temp_key_frame_data[tra_get_time_]=self.raw_data[or_time_] ref_sensor_data.raw_data=temp_key_frame_data if len(ref_sensor_data.raw_data)==0: return None time_points=[] for key,frame_data in ref_sensor_data.raw_data.items(): time_points.append(key) ref_sensor_data.frame_datas = TimeLine(time_points) return ref_sensor_data def _create_new_instance(self) -> 'SensorData': """工厂方法:创建当前类型的实例""" return self.__class__(self.clipvistor, self.sensor_name_)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 class CameraData(SensorData): def __init__(self,clipvistor:DataEngine,sensor_name:str) -> None: super().__init__(clipvistor,sensor_name) def load_sensor_data(self,sensor_name:str, path_list)->bool: # 包含数据加载和TimeLine类实例化 if len(self.raw_data)>0 : return False # for framedata in path_list: # time_str = Path(framedata).stem # timestamps =int(time_str) # self.raw_data[timestamps]=str(framedata) # 对读取到的数据先进行排序 data_pairs = [] for frame_data in path_list: time_str = Path(frame_data).stem timestamp = int(time_str) data_pairs.append((timestamp, str(frame_data))) if len(data_pairs) == 0: return False data_pairs_sorted = sorted(data_pairs, key=lambda x: x[0]) #使用sorted()函数进行排序(默认升序) for timestamp, file_path in data_pairs_sorted: # 将排序后的数据存入raw_data字典 self.raw_data[timestamp] = file_path if len(self.raw_data)==0: return False time_points=[] for key,frame_data in self.raw_data.items(): # 后期可以把frame_data加入到TimeLine类里 time_points.append(key) self.frame_datas = TimeLine(time_points) return True # 单个传感器数据转换为utc时间 def align_and_conver_line(self, al_time_line: TimeLine, target_time_line: TimeLine, delt_time: int) -> SensorData: return self
实例2 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 class Data(BaseModel, metaclass=abc.ABCMeta): """ Abstract class for data structure. All data's implementations of data entity should extend `Data`. """ # Global settings of Data # strict: 按照严格类型检查,不允许默认类型转换 # use_enum_values: 使用枚举类(Enum)的值 # validate_assignment: 是否在数据被更改时进行检查 # extra: 如何处理未定义字段(新增字段):`allow`、`forbid`、`ignore` # frozen: 数据在实例化之后被冻结 # arbitrary_types_allowded: 允许任意类型赋值 # allow_inf_nan: 允许inf或者nan的结果 # json_encoders: 对不同类型返回序列化的值 model_config = ConfigDict( strict=True, use_enum_values=False, validate_assignment=True, extra='ignore', frozen=False, arbitrary_types_allowded=False, allow_inf_nan=False, json_encoders=JsonSerialize() ) def dump(self, *args, **kwargs) -> dict: """ output data dict """ return self.model_dump(*args, mode='json', **kwargs) def dumps(self, *args, **kwargs) -> str: """ output json string """ return self.model_dump_json(*args, **kwargs) @abc.abstractmethod def _check_data(self, *args, **kwargs) -> bool: """ check the value of each property """ ...
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 class BehaviorData(Data): agent_ids: List[str] = Field(default=[], title='自车:存在交互目标的ids_list') is_abnormal: bool = Field(default=False, title='是否属于异常行为状态') is_ego_interactive: bool = Field(default=None, title='目标:行为是否与自车发生互动') is_key_frame: bool = Field(default=False, title='是否为行为关键帧') subtypes: List[Union[e_BehaviorType, e_InteractionType, e_SceneType]] = Field(default=[], title='细分行为类型') type: Union[e_BehaviorType, e_InteractionType, e_SceneType] = Field(default=None, title='行为类型') def _check_data(self, *args, **kwargs) -> bool: """ check the value of each property """ raise NotImplementedError class AgentData(Data): id: str = Field(default=None, title='目标ID') timestamp: str = Field(default=None, title='目标感知时间戳') type: e_ObjectClassType = Field(default=None, title='目标类型') confidence: float = Field(default=None, title='目标置信度') fcwconfidence: float = Field(default=None, title='FCW置信度') is_static: bool = Field(default=False, title='目标是否为静态目标, 用类别进行区分') is_moving: bool = Field(default=True, title='目标是否发生移动,用运动状态进行区分') ego_interactive: float = Field(default=0.0, title='目标是否与自车交互') # 位置信息 pos_vcs: Tuple[float, float] = Field( default=(None, None), title='VCS坐标', description='以当前帧自车位置和姿态为原点和坐标系的相对位置') pos_enu: Tuple[float, float] = Field( default=(None, None), title='平滑ENU坐标', description='通过卡尔曼滤波平滑全局ENU轨迹坐标') pose_odo: PoseData = Field( default=None, title='车辆运动估计轨迹 Frame0 ENU 位置和姿态', description='车辆的运动估计轨迹,提供短时间内的高精度相对位置关系') pose_refined: PoseData = Field( default=PoseData(), title='用拟合等手段平滑过后的 Frame0 ENU 位置和姿态', description='平滑后的轨迹,主要是满足模型训练的需求') pos_enu_origin: Tuple[float, float] = Field( default=(None, None), title='原始ENU坐标', description='以东北天为坐标系, 第0帧自车位置为原点的相对位置') vel: Tuple[float, float] = Field( default=(None, None), title='绝对速度', description='[x, y]') rel_vel: Tuple[float, float] = Field( default=(None, None), title='相对速度', description='[x, y]') acc: Tuple[float, float] = Field( default=(None, None), title='绝对加速度', description='[x, y]') heading_angle_local: float = Field( default=None, title='自车坐标系下航向角') heading_angle_global: float = Field( default=None, title='东北天全局航向角') heading_angle_origin: float = Field( default=None, title='自车坐标系下航向角, 原始数据的透传') yaw_rate: float = Field( default=None, title='偏航率') ego_angle: int = Field( default=None, title='目标角度', description='在自车车头方向算逆时针角度0~360') orientation: e_Orientation = Field( default=e_Orientation.UNKNOWN, title='目标方位') ttc_from_obj: float = Field( default=-1.0, title='目标与主车的TTC', description="目标位于自车、左邻、右邻车道时有输出,否则为-1,仅目标在自车车道上时值有实际物理意义") ttc_from_ego: float = Field( default=-1.0, title='主车与目标的TTC', description="目标位于自车、左邻、右邻车道时有输出,否则为-1,仅目标在自车车道上时值有实际物理意义") thw_from_obj: float = Field( default=-1.0, title="目标与主车的THW", description="目标位于自车、左邻、右邻车道时有输出,否则为-1,仅目标在自车车道上时值有实际物理意义") thw_from_ego: float = Field( default=-1.0, title="主车与目标的THW", description="目标位于自车、左邻、右邻车道时有输出,否则为-1,仅目标在自车车道上时值有实际物理意义") agent_dis2_ego: List[list] = Field( default=[], title="目标与主车四个角点的距离", description="目标的每个角点与主车每个角点之间的距离,以前左、前右、后右、后左的顺时针顺序排列") # 几何信息 width: float = Field(default=None, title='目标宽度') length: float = Field(default=None, title='目标长度') height: float = Field(default=None, title='目标高度') # 车道相关 lane_info: AgentLaneBindData = Field(default=None, title='目标车道信息') agent_lane_angle: float = Field( default=None, title="目标车与当前车道的夹角度数(角度制)", description="目标车与车道夹角偏左时逆时针为正(0~180°) 偏右顺时针为负(0~-180°)") # 行为相关 behaviors: List[BehaviorData] = Field(default=[], title='目标行为') # dragon_drive: bool = Field(default=False, title="画龙行驶") # driving_cross_boundary: bool = Field(default=None, title='目标是否跨硬边界') # 云端链路目标 visible 结果 visible_in_2d: List[str] = Field(default=[], title='目标在哪个相机中可见') visible_in_at128_l: bool = Field(default=None, title='目标在 AT128-left 中可见') visible_in_at128_r: bool = Field(default=None, title='目标在 AT128-right 中可见') # 摄像头观测来源 cam_sourses: List[e_SensorFlag] = Field(default=[], title='摄像头观测来源') interpolate: bool = Field(default=False, title="该帧是否为插值帧") # obj_3dinfo origin_vertices: List[Dict[str, float]] = Field(default=[], title='原始顶点位置') motion_vertices: List[Dict[str, float]] = Field(default=[], title='运动补偿后的顶点位置') motion_heading_angle: float = Field(default=None, title='运动补偿到图像时间的heading') cloud_point_number: int = Field(default=None, title='目标的点数量') # obj_2dinfo obj_2d_info: List[Obj2dData] = Field(default=[], title='关联的2D信息') # 轨迹 traj_valid: Dict[str, bool] = Field( default={ "dragon_drive": False, # 画龙行驶,True为画龙 "driving_cross_boundary": False # 目标是否跨硬边界,True为出边界 }, title='轨迹状态的标志位') traj_quality: Dict[str, float] = Field(default={}, title='轨迹质量评价质量和traj_valid对应') def _check_data(self, *args, **kwargs) -> bool: """ check the value of each property """ raise NotImplementedError def enu2vcs(self, points: Union[List, np.ndarray]) -> np.ndarray: """ 将enu坐标转到当前帧agent的vcs坐标 """ new_points = CoordsConverter.enu2vcs( trans=self.pos_enu, rotate=self.heading_angle_global, data=np.asarray(points) ) return new_points def vcs2enu(self, points: Union[List[List], np.ndarray]) -> np.ndarray: """ 将当前帧agent的vcs坐标转到enu坐标 """ new_points = CoordsConverter.vcs2enu( trans=self.pos_enu, rotate=self.heading_angle_global, data=np.asarray(points) ) return new_points def vcs2vcs(self, obj, points: Union[List, np.ndarray]) -> np.ndarray: """ 将他车的vcs坐标转到当前帧agent的vcs坐标 """ new_points = CoordsConverter.vcs2vcs( src_trans=obj.pos_enu, src_rotate=obj.heading_angle_global, tgt_trans=self.pos_enu, tgt_rotate=self.heading_angle_global, data=np.asarray(points) ) return new_points
typing 1 类型提示不会在运行时强制执行,它们主要用于静态分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from typing import ( List, Tuple, TYPE_CHECKING, Any, Callable, ClassVar, Dict, Generic, Literal, TypeVar, Union, Set, cast, overload, )
Literal 1 2 3 from typing import Literal def fun(statuses:Literal["FAILURE", "SUCCESS"]):
cast 1 2 3 4 5 6 from typing import cast # 类型声明 第一个参数类型,第二个参数实际对象 o = cast(CafsUtilsV1,context.resources.cafs_file_manager) skdata_dataengine = cast(DataEngine,context.resources.skdata_dataengine)
List、Tuple、Union、Any、Dict 1 from typing import List, Tuple, Union, Any
List 1 2 3 4 def fun(a: List[str]): ego_clips: List[List[int]] = self._ego_lane_keep_slice(data=data, ego_lane_route=ego_lane_route)
Union 1 def load(filename: str, keys: Union[None, str, Tuple[str, ...]] = None):
1 2 3 4 from typing import Union,Tuple # Union[Type1, Type2, ...] 表示可以是 Type1 或 Type2 或 ... def load(filename: str, keys: Union[None, str, Tuple[str, ...]] = None):
Tuple 1 -> Tuple[int, List[int]]:
1 2 3 4 5 6 7 8 9 # 元组: Tuple[str, int] # 2个元素:第一个是str,第二个是int Tuple[str, str, str] # 3个元素,都是str Tuple[str, ...] # 任意多个元素,都是str ✅ Tuple[Any, ...] # 所有元素可以是任何类型 # 元组是不可变的序列 t1: Tuple = (1, 2, 3) # 泛型元组 t2: Tuple[int] = (1, 2, 3) # Python 3.9+ 简化写法
Dict 1 2 3 4 5 6 7 8 9 10 11 12 13 from typing import Union, Tuple, List, Dict, Any # 复杂的联合类型 ComplexType = Union[ None, str, int, Tuple[str, ...], List[str], Dict[str, Union[str, int, float]] ] def process(data: ComplexType):
1 def update_label() -> Dict[str, Any]:
Optional 1 2 3 4 5 from typing import Optional 在函数参数或返回值的类型提示中使用 Optional,表示该值可以是指定类型或 None 在代码中处理可能为 None 的情况,确保逻辑安全。 使用 Optional 时,避免在类型中包含多个数据类型。例如: 正确: Optional[str] 错误: Optional[str, int]
1 2 3 4 def get_user_name(user: Optional[dict]) -> Optional[str]: if user is not None: return user.get("name") return None
1 2 3 4 def greet(name: Optional[str]) -> str: if name is None: return "Hello!" return f"Hello, {name}!"
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @classmethod def get_cos_util(cls, bucket_name: Optional[str] = None) -> 'COSBrowser': """ 根据桶名获取对应的cosutil实例 如果未提供桶名,则使用默认配置创建cosutil实例 线程安全的实现,使用双重检查锁定模式优化性能 Args: bucket_name: 桶名称 Returns: CosUtils实例 """ # 根据桶名确定账号名称 account_name = cls.get_account_by_bucket(bucket_name) if bucket_name else 'default' # 双重检查锁定模式,提高多线程环境下的性能 # 第一次检查,无锁 if account_name not in cls._cos_util_instances: # 如果实例不存在,获取锁 # with cls._lock: # # 再次检查,避免在获取锁期间其他线程已创建实例 # if account_name not in cls._cos_util_instances: # # 获取账号配置 # config = cls._account_configs.get(account_name, cls._default_config) # # 创建新的cosutil实例并缓存 # cls._cos_util_instances[account_name] = CosUtils(config) return None return cls._cos_util_instances[account_name]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def align_time_line(self,refrence_time:TimeLine,delt_time:int) -> Optional['SensorData']: or_time_line= self.frame_datas ref_time_dic= or_time_line.align_time(refrence_time,delt_time) suf_key_value={v:k for k,v in ref_time_dic.items() if v!=None} ref_sensor_data= self._create_new_instance() # 新初始化一个对象 temp_key_frame_data={} for al_time, or_time in suf_key_value.items(): temp_key_frame_data[al_time]=self.raw_data[or_time] ref_sensor_data.raw_data=temp_key_frame_data # 对齐后的结果传入新对象 对齐后的对象 if len(ref_sensor_data.raw_data)==0: return None time_points=[] for key,frame_data in ref_sensor_data.raw_data.items(): time_points.append(key) ref_sensor_data.frame_datas = TimeLine(time_points) return ref_sensor_data
Callable 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Callable[[Any], Any] 这是可调用对象的类型注解: Callable[[参数类型列表], 返回类型] Callable[[Any], Any] # 拆解: # Callable # 表示"可调用的"(函数、方法等) # [[Any], # 参数:接受一个任意类型的参数 # Any] # 返回值:返回任意类型 等价于: from typing import Callable, Any # 定义一个函数类型 def some_function(x: Any) -> Any: return x # 接受这样的函数作为参数 fallback: Callable[[Any], Any] = some_function
1 2 3 4 5 6 7 8 from typing import Callable, Any # 定义一个函数类型 def some_function(x: Any) -> Any: return x # 接受这样的函数作为参数 fallback: Callable[[Any], Any] = some_function
| 1 这是 Python 3.10+ 的联合类型语法(Union Type)
1 2 3 4 5 6 7 8 9 10 # Python 3.10+ 新语法 int | None # 等价于旧语法 from typing import Union, Optional Union[int, None] # 或 Optional[int] # 可以组合多个类型 int | str | None # 等价于 Union[int, str, None]
Enum
1 2 3 4 5 6 7 from enum import Enum # 默认情况下,值可以是任意类型(int、str、tuple、object 等) class LIDAR_TYPE(Enum): CORNER = 1 PANDAR = 2 ALL = 3 INVALID=4
1 2 3 4 5 6 7 8 9 10 # 如果继承了 IntEnum、StrEnum 等,则值必须是对应类型,并且成员可直接参与该类型的运算或比较。 from enum import IntEnum class Status(IntEnum): OK = 200 NOT_FOUND = 404 print(Status.OK + 100) # 输出: 300 print(isinstance(Status.OK, int)) # True
1 2 3 4 5 6 # 访问方式 按名称访问:Color['RED'] 按值访问:Color(1) print(Color['GREEN']) # <Color.GREEN: 2> print(Color(3)) # <Color.BLUE: 3>
1 2 3 4 5 6 7 8 9 10 # 获取名称和值:通过 .name 和 .value 属性。 # 自动赋值 如果值不重要,可以用 auto() 自动生成: from enum import auto class Weekday(Enum): MONDAY = auto() TUESDAY = auto() print(Weekday.MONDAY.value) # 1 print(Weekday.TUESDAY.value) # 2
@unique 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # @unique确保枚举值唯一,重复会报错 from enum import Enum, unique # 自车-驾驶状态 # 链路自定义 @unique class e_DrivingMode(Enum): # 未知 UNKNOWN: int = 0 # 领航 NAVIGATION: int = 1 # iacc IACC: int = 2 # 自车-定位状态 # 链路自定义 @unique class e_EgoLocStatus(Enum): # 未知 UNKNOWN: int = 0 # 较差 POOR: int = 1 # 正常 NORMAL: int = 2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class BehaviorData(Data): agent_ids: List[str] = Field(default=[], title='自车:存在交互目标的ids_list') is_abnormal: bool = Field(default=False, title='是否属于异常行为状态') is_ego_interactive: bool = Field(default=None, title='目标:行为是否与自车发生互动') is_key_frame: bool = Field(default=False, title='是否为行为关键帧') subtypes: List[Union[e_BehaviorType, e_InteractionType, e_SceneType]] = Field(default=[], title='细分行为类型') type: Union[e_BehaviorType, e_InteractionType, e_SceneType] = Field(default=None, title='行为类型') def _check_data(self, *args, **kwargs) -> bool: """ check the value of each property """ raise NotImplementedError # type必须是 这三个枚举类中任意一个的枚举成员
实例1 1 agent.lane_info.driving_direction == e_TrafficDriveDirection.REVERSE:
实例2 1 2 3 4 5 6 7 class BehaviorData(Data): agent_ids: List[str] = Field(default=[], title='自车:存在交互目标的ids_list') is_abnormal: bool = Field(default=False, title='是否属于异常行为状态') is_ego_interactive: bool = Field(default=None, title='目标:行为是否与自车发生互动') is_key_frame: bool = Field(default=False, title='是否为行为关键帧') subtypes: List[Union[e_BehaviorType, e_InteractionType, e_SceneType]] = Field(default=[], title='细分行为类型') type: Union[e_BehaviorType, e_InteractionType, e_SceneType] = Field(default=None, title='行为类型')
1 2 3 4 5 6 7 for agent_ts_idx in agents_info[agent_id].keys(): if tag_start_idx <= agent_ts_idx <= tag_end_idx: for agent_idx, agent_data in enumerate(data.frame[agent_ts_idx].agents): if agent_data.id != agent_id: continue beh_data = BehaviorData(agent_ids=[agent_id], type=e_BehaviorType.AGENT_CROSS, subtypes=[subtype]) data.frame[agent_ts_idx].agents[agent_idx].behaviors.append(beh_data)
1 2 3 4 @unique class e_BehaviorType(Enum): # 横穿 AGENT_CROSS: int = 4120
实例3 1 2 3 4 5 6 7 8 9 10 11 12 MAP_OBJECT_TYPE_LW = { "pedestrian": {'value': {'length': 0.8, 'width': 0.8}}, DEFAULT_TYPE:{'scale': {'length': 1.2, 'width': 1.0}}, } if agent.type in MAP_OBJECT_TYPE_LW: length = MAP_OBJECT_TYPE_LW[agent.type]['value']['length'] width = MAP_OBJECT_TYPE_LW[agent.type]['value']['width'] else: scale = MAP_OBJECT_TYPE_LW[DEFAULT_TYPE]['scale'] length = agent.length * scale['length'] width = agent.length * scale['width']
uuid 1 2 3 import uuid str(uuid.uuid4())[:4]
tqdm 1 2 3 4 5 6 from tqdm import tqdm import time # 最简单的用法:包装任何可迭代对象 for i in tqdm(range(100)): time.sleep(0.01) # 模拟耗时操作
1 2 3 4 5 6 7 8 9 from tqdm import tqdm import time # 手动控制进度 pbar = tqdm(total=100) # 设置总进度 for i in range(100): time.sleep(0.01) pbar.update(1) # 每次更新1 pbar.close()
1 2 3 4 5 # 自定义描述 for i in tqdm(range(100), desc="处理中"): time.sleep(0.01) 处理中: 8%|███████████
1 2 3 4 # 设置进度条单位 for i in tqdm(range(100), unit="个文件"): time.sleep(0.01) 18%|█████████████████████████▌ | 18/100 [00:18<01:26, 1.06s/个文件]
案例 1 2 3 4 5 6 from tqdm import tqdm with tqdm(clip_names) as clip_bar: #2.按照给的数据包进行对齐 for clip_name in clip_bar: clip_bar.set_description_str(f'Process:{clip_name}')
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 with tqdm(data_dir_names) as pbar: for clip_name in pbar: outchceck = None if clip_name in error_list: continue pbar.set_description_str(f'Process:{clip_name}') sensor_info = clipvistor.get_clip_data_by_info(clip_data, clip_name) pre_processor=AlignSensorDataProcessor(sensor_info, clipvistor) outchceck= pre_processor.anys_time_framework() if outchceck: rigth_clip.append(clip_name) # outchceck.save_to_file(f'/workspace/caclip2nuspkl/temp_data/{clip_name}_{outchceck.ticks_size}.json') alig_data_out=convert_algininfo_to_json(outchceck) alig_data_content = json.dumps(alig_data_out) # with open(f's_t/{clip_name}_{outchceck["time_line"].ticks_size}.json', 'w', encoding='utf-8') as f: with open(f's_t/{clip_name}.json', 'w', encoding='utf-8') as f: f.write(alig_data_content) pbar.set_postfix_str(clip_name+' Done!!!') sleep(0.5)
argparse 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import argparse def get_args(): parser = argparse.ArgumentParser( epilog=None or f""" Examples: Run relocation: $ {sys.argv[0]} --config ../configs/config.yaml """, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "--config", default="./settings.yaml", metavar="xxx", help="xxxx", ) parser.add_argument( "--config", default=None, help="xxxx" ) parser.add_argument( "--xx", default="" ) parser.add_argument( "--xx", default="" ) return parser.parse_args()
1 2 args = get_args() config_yaml = args.config
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def get_args(): parser = argparse.ArgumentParser(description='xxx') parser.add_argument( "--config", default="./settings.yaml", metavar="FILE", help="xxx", ) parser.add_argument( "--src-type", default=1, help="xxx", ) ... return parser.parse_args()
.add_argument 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 (method) def add_argument( *name_or_flags: str, action: _ActionStr | Type[Action] = ..., nargs: int | _NArgsStr | _SUPPRESS_T = ..., const: Any = ..., default: Any = ..., type: ((str) -> _T@add_argument) | FileType = ..., choices: Iterable[_T@add_argument] | None = ..., required: bool = ..., help: str | None = ..., metavar: str | tuple[str, ...] | None = ..., dest: str | None = ..., version: str = ..., **kwargs: Any ) -> Action
*name_or_flags 1 2 3 4 5 6 *name_or_flags 位置参数,可以是参数名(如 'filename')或选项标志(如 '-f', '--file') 例如:add_argument('filename')或 add_argument('-f', '--file') *name_or_flags: str 可变参数
1 2 3 4 5 parser.add_argument('-o', '--output', help='输出文件路径', default='output.txt') parser.add_argument('input_file', help='输入文件路径')
type 1 2 3 将命令行参数转换为指定类型 可以是内置类型:int, float, str 也可以是自定义函数:type=lambda x: x.upper()
default 1 2 当参数未在命令行中指定时的默认值 如果指定了default,通常参数不再是必需的
nargs 1 2 3 4 5 6 7 8 指定应消耗的命令行参数数量 常用值: '?'- 0或1个参数 '*'- 0或多个参数 '+'- 1或多个参数 整数 - 精确指定参数数量 argparse.REMAINDER- 所有剩余参数
action 1 2 3 4 5 6 7 8 9 10 11 12 action="count" 指定当命令行遇到此参数时的动作 常用值: 'store'- 存储参数值(默认) 'store_const'- 存储常量值 'store_true'/'store_false'- 存储布尔值 'append'- 将值添加到列表 'append_const'- 将常量添加到列表 'count'- 统计参数出现次数 'help'- 显示帮助信息 'version'- 显示版本信息
1 2 在帮助信息中显示的参数名称 例如:metavar='FILE'会在帮助中显示为 FILE
1 2 3 const 某些action(如'store_const')使用的常量值 例如:action='store_const', const=42
1 2 3 choices 限制参数的允许值 例如:choices=['small', 'medium', 'large']
1 2 3 required 是否必须提供此参数(对可选参数有效) 默认:False(对于-f/--flag风格参数)
1 2 3 help 参数的帮助信息描述 可以在帮助文本中使用%(default)s, %(type)s等占位符
1 2 3 dest 解析结果中属性的名称 默认从参数名推导(去掉-,将-替换为_)
1 2 version 与action='version'一起使用,指定版本字符串
案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 parser = argparse.ArgumentParser() # 位置参数 parser.add_argument('input_file', help='输入文件路径') # 可选参数 parser.add_argument('-o', '--output', help='输出文件路径', default='output.txt') # 带类型转换 parser.add_argument('-n', '--number', type=int, default=1, help='重复次数 (默认: %(default)d)') # 布尔标志 parser.add_argument('-v', '--verbose', action='store_true', help='详细输出模式') # 多值参数 parser.add_argument('--sizes', nargs='+', choices=['S', 'M', 'L'], help='一个或多个尺寸') # 存储常量 parser.add_argument('--debug', action='store_const', const=True, default=False)
copy
赋值 1 2 3 4 list1 = [1, 2, [3, 4]] list2 = list1 # 只是创建新引用,指向同一对象 list2[0] = 99 print(list1) # [99, 2, [3, 4]] # 原对象也被修改
浅拷贝 1 2 3 4 5 6 7 8 9 10 import copy list1 = [1, 2, [3, 4]] list2 = copy.copy(list1) # 或 list1.copy() 或 list1[:] list2[0] = 99 print(list1) # [1, 2, [3, 4]] # 第一层没被影响 # 但是嵌套对象仍然共享! list2[2][0] = 999 print(list1) # [1, 2, [999, 4]] # 嵌套列表被修改了
案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 class MyClass: def __init__(self, value, items=None): self.value = value self.items = items if items is not None else [] # 如果传入的 items 是列表,两个对象会共享这个列表! # 测试 original_list = [1, 2, 3] obj1 = MyClass(1, original_list) obj2 = MyClass(2, original_list) obj1.items.append(999) print(original_list) # [1, 2, 3, 999] # 被修改了! print(obj2.items) # [1, 2, 3, 999] # 也被修改了!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import copy class MyClass: def __init__(self, value, items=None): self.value = value if items is None: self.items = [] # 创建新列表 else: # 创建列表的副本,避免共享 self.items = list(items) # 浅拷贝 # 或者 self.items = copy.copy(items) def deep_copy_init(self, value, items=None): """如果需要深拷贝的版本""" self.value = value if items is None: self.items = [] else: self.items = copy.deepcopy(items) # 深拷贝
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 import copy from typing import List, Any, Optional class MyClass: """一个正确处理可变默认参数的类示例""" def __init__(self, value: Any, items: Optional[List[Any]] = None): """ 初始化 MyClass Args: value: 存储的值 items: 可选的列表,如果不提供则创建新列表 (注意:会创建副本,不会修改传入的列表) """ self.value = value if items is None: self._items = [] # 创建新列表 else: # 创建传入列表的副本 # 使用 list() 进行浅拷贝,或者 copy.deepcopy() 进行深拷贝 self._items = list(items) # 浅拷贝 def add_item(self, item: Any) -> None: """添加项目""" self._items.append(item) def get_items(self) -> List[Any]: """获取项目列表的副本""" return self._items.copy() # 返回副本,保护内部数据 def __repr__(self) -> str: return f"MyClass(value={self.value}, items={self._items})" def copy(self) -> 'MyClass': """创建对象的深拷贝""" return copy.deepcopy(self) # 测试 if __name__ == "__main__": # 测试1:默认参数 obj1 = MyClass(1) obj2 = MyClass(2) obj1.add_item("A") print(f"obj1: {obj1}") # MyClass(value=1, items=['A']) print(f"obj2: {obj2}") # MyClass(value=2, items=[]) # 测试2:传入列表 my_list = [1, 2, 3] obj3 = MyClass(3, my_list) obj3.add_item(4) print(f"my_list: {my_list}") # [1, 2, 3] # 原列表未改变 print(f"obj3: {obj3}") # MyClass(value=3, items=[1, 2, 3, 4])
深拷贝 1 2 import copy res_json = copy.deepcopy(res_json_origin)
1 2 3 4 5 6 7 8 import copy list1 = [1, 2, [3, 4]] list2 = copy.deepcopy(list1) # 完全独立的副本 list2[0] = 99 list2[2][0] = 999 print(list1) # [1, 2, [3, 4]] # 原对象完全不变 print(list2) # [99, 2, [999, 4]]
deepcopy方法 1 可以使用 __deepcopy__方法自定义深拷贝行为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import copy class MyClass: def __init__(self, data): self.data = data def __deepcopy__(self, memo): # 定义对象的深拷贝行为 new_object = MyClass(copy.deepcopy(self.data, memo)) return new_object # 创建对象 obj = MyClass([1, 2, 3]) # 进行深拷贝 deep_copied_obj = copy.deepcopy(obj)
EasyDict 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from easydict import EasyDict cfg = EasyDict() cfg.num_worker = 16 cfg.traj_refine = EasyDict() cfg.traj_refine.method= "segment" # steer_acc, bezier, bezier+steer_acc, segment cfg.traj_refine.sdt = 0.5 # 参考点采样间隔 cfg.traj_refine.tdt = 0.1 # 轨迹原本的采样时间 cfg.traj_refine.min_time=2.0 # 轨迹分段的最小片段时间长度 单位:s cfg.traj_refine.overlap_time=1.0 # 轨迹片段首尾的重叠时间长度(为了保证轨迹在分割点的平滑性) 单位:s cfg.moving_thr = 5 # m cfg.moving_min_vel = 2 # m/s cfg.moving_min_dist = 10 # m cfg.traj_check = EasyDict() cfg.traj_check.his = 40 cfg.traj_check.fut = 80 cfg.traj_check.dis_thr = 1.0
1 2 3 4 5 6 7 8 9 from easydict import EasyDict as edict # 创建一个 EasyDict 对象 d = edict({'foo': 3, 'bar': {'x': 1, 'y': 2}}) # 通过属性访问字典值 print(d.foo) # 输出: 3 print(d.bar.x) # 输出: 1 # 直接通过属性设置值 d.new_key = "Hello" print(d.new_key) # 输出: Hello
re match = re.search(r’([^ ]+)_’, filename) if match: return match.group(1)
序列化和反序列化 1 2 import pickle pickled_obj = pickle.dumps(obj) 序列化
1 obj = pickle.loads(xxx) 反序列化
配置加载 yaml文件 1 2 3 4 5 6 7 import yaml with open(config_yaml) as f: config = yaml.safe_load(f) # 直接加载为字典 # 访问 model_path = config["output_reloc_result"]["model_path"]
1 configs = yaml.load(open(yaml_file), Loader=yaml.FullLoader)
1 2 3 4 5 6 7 dumped = yaml.dump( dict, default_flow_style=None, allow_unicode=True, width=9999 ) with PathManager.open(filename, "w") as f: f.write(dumped) _ = yaml.unsafe_load(dumped)
json load 1 2 3 4 5 6 7 8 9 10 Python 中的 json.load 方法从文件中读取 JSON 数据并将其解析为 Python 对象 import json with open(filename, 'r') as f: config = json.load(f) with open(link_path, 'r', encoding='utf-8') as f: data = json.load(f) features= data.get('features',[])
loads 1 2 3 json.loads方法是一个非常重要的工具,它允许我们将JSON格式的字符串转换为Python对象。这个过程被称为反序列化。json.loads方法接受一个JSON字符串,并返回一个相应的Python数据结构,如字典或列表。 json.loads(context.cursor) 加载成dict
dumps 1 2 3 json.dumps() 是 Python 中用于将 Python 对象编码成 JSON 字符串的函数。它可以将字典、列表等 Python 数据结构转换为 JSON 格式,非常适合数据持久化、网络传输等场景 json.dumps(result_data,indent=2, ensure_ascii=False)
dump 1 2 3 4 5 json.dump() 这个方法用来将Python对象转换为JSON格式的字符串并保存到文件中 with open(file_path, "w") as json_file: json.dump(json_obj, json_file)
1 2 3 4 5 6 7 8 9 10 # 可以是传统 YAML model: type: "ResNet" layers: 50 pretrained: true train: batch_size: 32 lr: 0.001 epochs: 100
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 # config.py (LazyConfig 通常使用 .py 文件) from my_project.model import ResNet from my_project.optimizer import AdamW model = ResNet( layers=50, pretrained=True ) train = dict( batch_size=32, lr=0.001, epochs=100, optimizer=AdamW(lr=0.001) )
1 2 3 4 5 6 7 8 9 10 11 12 13 cfg = LazyConfig.load(config_yaml) # 1. 加载配置文件 cfg = LazyConfig.load("path/to/config.py") # 或 cfg = LazyConfig.load("path/to/config.yaml") # 2. 访问配置 print(cfg.model.type) # "ResNet" print(cfg.train.batch_size) # 32 # 3. 可以像字典一样访问 print(cfg["model"]["type"])
k8s 第21关 从零到一:在 K8s 中部署 Job 和 CronJob 的全面指导_job部署-CSDN博客
1 2 3 4 5 Kubernetes的简称 容器编排平台,主要用于自动化部署、扩展和管理容器化应用程序 腾讯汽车云容器服务是腾讯云面向汽车行业推出的容器化应用管理平台,主要服务于汽车企业的数字化应用开发和运维场景。基于Kubernetes技术
vscode 快捷键
1 2 3 4 5 6 7 8 9 10 11 ctrl + shift + p -> Preferences: Configure User Snippets # vs默认没有if __name__ == '__main__'提示。大括号内加入下列代码: "Python Main Guard": { "prefix": "main", // 触发词,输入 main 后按 Tab 或 Enter "body": [ "if __name__ == '__main__':", " $0" // $0 表示补全后光标停留的位置 ], "description": "Insert main guard (if __name__ == ...)" }
git git配置 1 2 3 4 5 6 7 git config --global user.name "xxx" git config --global user.email "xxx" git config user.name git config --list --global
创建一个项目 1 2 3 4 5 6 7 git init git remote add 远程仓库别名(origin) http......(https://github.com/你的用户名/你的仓库名.git) git branch -M main (将当前所在的分支重命名为main) git push 远程仓库别名 main
克隆命令 1 git clone -b dev_lk http://liuke:Liuke1999@10.0.0.29:9091/9room/spatiotemporal-data-project.git
https://blog.csdn.net/techforward/article/details/133203445 https://blog.csdn.net/m0_73745224/article/details/149708684
git branch 创建分支
显示分支 1 2 显示所有分支(本地+远程) git branch -a
1 2 3 4 5 6 7 8 git branch 只显示本地分支 git branch -r 只显示远程分支 git branch -vv 显示分支及其跟踪的远程分支
1 2 3 4 5 6 7 git branch -v 显示本地分支及其最近提交 [liuj@VM-16-2-tencentos spatiotemporal-data-project]$ git branch -v dev 88a148a [behind 217] Merge branch 'dev' of http://10.0.0.29:9091/9room/spatiotemporal-data-project into dev dev_asstercheck 6a30df7 [ahead 1] test修改 * dev_ty 021de64 Merge branch 'dev' of http://10.0.0.29:9091/9room/spatiotemporal-data-project into dev_ty
提交记录 git log 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 git log commit 456005affed0d488d8858a069cc37434ee2e0c3a (HEAD -> dev_ty, origin/dev_ty) Author: 202226460 <202226460@any3.com> Date: Fri Feb 6 09:07:41 2026 +0800 加了点注释 commit 080ce0f73632085555dab8b792f27c0311425368 Author: 202226460 <202226460@any3.com> Date: Fri Feb 6 09:07:25 2026 +0800 加了点注释 commit 6b09ab641376dab64b88bef2a03ec8394f6bf859 Author: 202226460 <202226460@any3.com> Date: Thu Feb 5 21:28:27 2026 +0800
git log –oneline 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 # 一行显示一条提交(简洁) 456005a (HEAD -> dev_ty, origin/dev_ty) 加了点注释 080ce0f 加了点注释 6b09ab6 init漏写schedule 9ed8aa4 add lable import to mongodb b23c536 Merge branch 'dev' of http://10.0.0.29:9091/9room/spatiotemporal-data-project into dev_pre f885f1b Merge branch 'dev_pre' of http://10.0.0.29:9091/9room/spatiotemporal-data-project into dev_yr 3a64d57 Merge branch 'dev_yr' into 'dev' cd169cb Merge branch 'dev' of http://10.0.0.29:9091/9room/spatiotemporal-data-project into dev_yr a8cb1a6 Merge branch 'dev' of http://10.0.0.29:9091/9room/spatiotemporal-data-project into dev_pre a0f2083 添加带数据版本的系统调度逻辑 7362b6c Merge branch 'dev_ty' into 'dev' bd1a240 label拆之后新增失败统计,labelchecker路径获取方式修改 a2abac9 update det3d 6f9ed97 修改监控job 1e7dc4c label资产修改了mongoclient使用方式 fd583dc temp update 99b29da Merge branch 'dev' of http://10.0.0.29:9091/9room/spatiotemporal-data-project into dev_yr 5dfc0e6 重构资产代码 a45243d Merge branch 'dev' of http://10.0.0.29:9091/9room/spatiotemporal-data-project into dev_ty 11d8070 update mongo resource
git log –graph 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 # 显示图形化分支结构 git log --graph * commit 456005affed0d488d8858a069cc37434ee2e0c3a (HEAD -> dev_ty, origin/dev_ty) | Author: 202226460 <202226460@any3.com> | Date: Fri Feb 6 09:07:41 2026 +0800 | | 加了点注释 | * commit 080ce0f73632085555dab8b792f27c0311425368 | Author: 202226460 <202226460@any3.com> | Date: Fri Feb 6 09:07:25 2026 +0800 | | 加了点注释 | * commit 6b09ab641376dab64b88bef2a03ec8394f6bf859 | Author: 202226460 <202226460@any3.com> | Date: Thu Feb 5 21:28:27 2026 +0800 | | init漏写schedule | * commit 9ed8aa4434fd20a7bbe3d7c07e8e080d3033720f | Author: 202226460 <202226460@any3.com> | Date: Thu Feb 5 19:36:17 2026 +0800 | | add lable import to mongodb | * commit 04b0002cf1076058322bc3affcef67e4c5e5c865 |\ Merge: 0ddde82 b3a1799 | | Author: 202226460 <202226460@any3.com> | | Date: Thu Feb 5 19:03:58 2026 +0800 | | | | Merge branch 'dev_yr' of http://10.0.0.29:9091/9room/spatiotemporal-data-project into dev_ty | | | * commit b3a1799d36b020fe5d1f03f29a84647814bbc9ba (origin/dev_yr) | | Author: yr <2521275899@qq.com> | | Date: Thu Feb 5 18:47:17 2026 +0800 | | | | update label
1 2 3 4 # 显示完整信息 git log --stat # 显示文件变更统计 git log -p # 显示具体修改内容(diff) git log --name-only # 只显示修改的文件名
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 [liuj@VM-16-2-tencentos spatiotemporal-data-project]$ git log -p commit 456005affed0d488d8858a069cc37434ee2e0c3a (HEAD -> dev_ty, origin/dev_ty) Author: 202226460 <202226460@any3.com> Date: Fri Feb 6 09:07:41 2026 +0800 加了点注释 diff --git a/production_system/src/workspace/partitions/partition_defs.py b/production_system/src/workspace/partitions/partition_defs.py index ad99ec1..1961ccc 100644 --- a/production_system/src/workspace/partitions/partition_defs.py +++ b/production_system/src/workspace/partitions/partition_defs.py @@ -18,7 +18,7 @@ time_window_task_partitions_for_statistical_analysis = TimeWindowPartitionsDefin #cron_schedule="*/3 * * * *", 设置成这样,分区数量不符合要求 timezone="Asia/Shanghai", fmt="%Y-%m-%d", - end_offset=1, # 提前显示 + end_offset=1, # DailyPartitionsDefinition会延迟一天,time_window_task_partitions_for_statistical_analysis的end_offset=1提前显示当天 ) __all__ = [ "clip_partitions",'daily_task_partitions','lable_job_clip_partitions', commit 080ce0f73632085555dab8b792f27c0311425368 Author: 202226460 <202226460@any3.com> Date: Fri Feb 6 09:07:25 2026 +0800 加了点注释 diff --git a/.vscode/settings.json b/.vscode/settings.json index 3516cb9..0ae70d2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,5 @@ { - "python.formatting.provider": "autopep8" + "python.formatting.provider": "autopep8", + "workbench.editor.enablePreviewFromCodeNavigation": true, + "workbench.editor.enablePreviewFromQuickOpen": true } \ No newline at end of file commit 6b09ab641376dab64b88bef2a03ec8394f6bf859 Author: 202226460 <202226460@any3.com> Date: Thu Feb 5 21:28:27 2026 +0800
筛选 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # 按数量限制 git log -5 # 显示最近5条提交 git log -n 10 # 显示最近10条提交 # 按时间筛选 git log --since="2026-01-01" git log --until="2026-01-31" git log --since="2 weeks ago" # 按作者筛选 git log --author="张三" git log --author="zhangsan" # 按提交信息筛选 git log --grep="bugfix" # 搜索包含"bugfix"的提交信息 # 按文件筛选 git log -- README.md # 查看README.md的修改历史 git log -- src/ # 查看src目录的修改历史
1 2 3 4 5 6 7 8 # 最实用的组合:图形化+一行显示 git log --oneline --graph --all # 显示统计信息+图形 git log --stat --graph # 漂亮的格式化输出 git log --pretty=format:"%h - %an, %ar : %s"
1 2 3 4 5 # 查看在dev分支但不在main分支的提交 git log main..dev # 查看两个分支的差异 git log branch1..branch2
1 2 3 4 5 # 显示所有分支的时间线 git log --oneline --graph --all # 按时间倒序 git log --reverse
1 2 3 4 5 6 7 8 9 10 # 自定义输出格式 git log --pretty=format:"%C(yellow)%h %Creset%s %Cgreen(%cr) %C(bold blue)<%an>%Creset" # 常用占位符: # %h - 短哈希 # %H - 完整哈希 # %s - 提交说明 # %an - 作者名 # %ar - 相对时间 # %ad - 日期
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 分页模式下的常用操作键 按键 功能 说明 空格键 向下翻一页 最常用 回车键 向下翻一行 逐行查看 b 向上翻一页 回看内容 q 退出分页模式 返回命令行 /关键词 搜索内容 输入/后输入关键词搜索 n 查找下一个匹配项 配合搜索使用 N 查找上一个匹配项 g 跳到第一行 G 跳到最后一行
切换分支 1 2 git checkout 分支名 切换分支 git checkout -b 分支名 创建和切换
1 2 3 4 5 这个命令是错误的,因为它试图切换到origin这个分支,但origin是一个远程仓库的别名,不是一个分支。 正确的应该是: git checkout branch_name- 切换到指定分支 git checkout -b new_branch- 创建并切换到新分支 git checkout origin/dev_yr- 创建一个跟踪远程分支的本地分支
删除分支
合并分支 1 2 3 4 5 6 7 git merge origin/dev_yr 将远程分支 origin/dev_yr合并到当前分支 或者 git pull origin dev_yr 这个命令实际上是两个操作的组合: git fetch origin dev_yr- 获取远程 dev_yr分支的最新代码 git merge FETCH_HEAD- 将获取的远程分支合并到当前分支
1 2 3 4 5 6 7 8 # 1. 先将你的修改暂存起来(执行下面操作,要先暂存,不然代码会丢失) git stash # 2. 拉取远程最新代码 git pull origin dev # 3. 将你暂存的修改恢复回来 git stash pop
1 2 3 4 5 6 7 8 git config pull.rebase false git config pull.rebase false的作用是将 git pull的默认行为设置为合并(merge)而不是变基(rebase) 合并(merge,默认策略):拉取远程更新后,会创建一个额外的合并提交,保留完整的分支历史。 变基(rebase):将你的本地提交“移动”到远程最新提交之后,使历史线呈直线(避免合并提交) 查看当前配置 git config --get pull.rebase
git fetch和git pull 1 2 3 git fetch和git pull的主要区别如下: git fetch: 仅更新远程分支的信息,不会自动合并到本地分支。用户需要手动执行合并操作。 git pull: 同时执行 git fetch 和 git merge,直接将远程分支的更新合并到当前分支,可能隐藏过程细节。
FETCH_HEAD与HEAD的区别 1 HEAD是一个指向当前工作分支的引用,而FETCH_HEAD则是指向最后一次从远程仓库抓取分支的最新状态。HEAD通常用于表示当前工作分支的状态,而FETCH_HEAD则用于表示最近一次从远程仓库拉取的数据。
发布分支 1 2 3 4 5 6 7 8 9 10 11 12 13 Publish Branch 当你在本地创建了一个新分支,但远程仓库还没有这个分支时,你需要将这个本地分支"发布"(推送)到远程仓库 # 1. 在本地创建新分支 git checkout -b feature/new-feature # 2. 进行开发,做了一些提交 git add . git commit -m "Add new feature" # 3. 将本地分支发布到远程仓库 git push -u origin feature/new-feature # 这里的 -u 表示设置上游分支(upstream)
git stash 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 # 当前在 feature 分支,有未提交的修改 git status # 显示有修改的文件 # 1. 暂存当前修改 git stash # 或 git stash save "描述信息" # 2. 工作区变干净了 git status # 显示没有修改 # 3. 切换到其他分支 git checkout main # 4. 修复bug后,回到 feature 分支 git checkout feature # 5. 恢复暂存的修改 git stash pop
1 2 3 4 5 6 场景2:拉取最新代码前 你想拉取远程更新,但有未提交的修改: # 有本地修改,但想先拉取最新代码 git stash # 暂存修改 git pull # 拉取最新代码 git stash pop # 恢复修改
命令 1 git push origin dev_ty --force
debug 1 2 .vscode/launch.json launch.json 可以自行配置,vscode用于调试配置的文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 { // Use IntelliSense to learn about possible attributes. // Hover to view descriptions of existing attributes. // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ { "name": "Python: Current File", "type": "python", "request": "launch", "program": "${file}", "console": "integratedTerminal", "justMyCode": true } ] }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 { "version": "0.2.0", "configurations": [ { "name": "Python: run_output", "type": "python", "request": "launch", "program": "${workspaceFolder}/run_output.py", "args": [ "--tasks", "CD701_LS6C3G0Y2RA400208_2025-01-01_13-42-18", "--run-type", "2", "--config", "/data/config/output.yaml" ], "console": "integratedTerminal", "justMyCode": true } ] }
MongoDB MongoDB 概念解析 | 菜鸟教程
Mongodb入门到精通—> 保姆级别教程 - JamieChyi - 博客园
1 MongoDB 将数据存储为一个文档,数据结构由键值(key=>value)对组成,文档类似于 JSON 对象,字段值可以包含其他文档,数组及文档数组;
1 数据库 集合(collection) 文档(document) 字段 主键(_id) 索引(index) 表连接,MongoDB不支持
1 2 3 4 5 6 7 8 9 10 文档中的键/值对是有序的。 MongoDB区分类型和大小写。 MongoDB的文档不能有重复的键。 文档键命名规范: 键不能含有\0 (空字符)。这个字符用来表示键的结尾。 .和$有特别的意义,只有在特定环境下才能使用。 以下划线"_"开头的键是保留的(不是严格要求的)。
数据类型 1 2 文档的键是字符串。除了少数例外情况,键可以使用任意UTF-8字符。 文档中的值不仅可以是在双引号里面的字符串,还可以是其他几种数据类型(甚至可以是整个嵌入的文档)。
12.23交流 run_dev.sh -p docker -run -h 0.0.0.0
进入容器之后,外部代码修改不一定得行,chmod 777 * ip:4000
init .py和definitions.py
build.py 0.0.2
build.py学习
测试杨瑞 . env.sh env
f 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 g:1400 1300 s:900 34 y:1040 m:700 simi: 创业 债券 国债、地方债、企业债 银行利率会影响 公元 公元前 世纪 财政赤字:xxxx 通过国债、地方债、主权外债来弥补 赤字率:赤字率 = (财政赤字总额 / 国内生产总值 GDP) × 100% 3% 贸易赤字: 主权外债: 美债:只有当它被美国以外的国家和实体持有时 增值税: 期货 期权 当日收益 = 本金*当日收益率 收盘本金=当日收益+开盘本金 第二日负收益率=前一日收益率 第二日收益 = 本金-本金*第二日负收益率*1 2 3 4