笔记 python list list[n,m] 左闭右开 json.loads(str)[“data”]
列表生成式 [i for i in x if i xxx]
时间库 time .strptime() 1 2 3 >>>time.strptime('2025-12-12_13-59-30', '%Y-%m-%d_%H-%M-%S').tm_hour time.struct_time(tm_year=2025, tm_mon=12, tm_mday=12, tm_hour=13, tm_min=59, tm_sec=30, tm_wday=4, tm_yday=346, tm_isdst=-1)
.gmtime() 1 2 >>>time.gmtime(time.time()) # UTC时间 是一个对象 time.struct_time(tm_year=2026, tm_mon=1, tm_mday=12, tm_hour=10, tm_min=0, tm_sec=51, tm_wday=0, tm_yday=12, tm_isdst=0)
.strftime() 1 2 3 4 import time curr_time = time.strftime("%Y-%m-%d_%H-%M-%S") # 返回字符串 '2026-01-08_11-14-36'
1 2 3 4 time.strftime('%Y-%m-%d %H:%M:%S', utc_time) >>> time.strftime('%Y-%m-%d_%H-%M-%S', time.strptime('2025-12-12_13-59-30', '%Y-%m-%d_%H-%M-%S')) '2025-12-12_13-59-30'
时间戳 1 2 time.time() # UTC时间 秒小数点15位 与时区无关,全球统一 time.localtime(time.time()) # 时区不同时间不同
1 2 3 4 time.mktime(time.strptime('时间字符串','%Y-%m-%d也就是时间格式')) >>> time.mktime(time.strptime('2025-12-12_13-59-30','%Y-%m-%d_%H-%M-%S')) 1765519170.0
datetime .strftime() 1 2 >>> datetime.now().strftime("%Y-%m-%d") '2026-01-08'
1 2 >>> datetime.now() datetime.datetime(2026, 1, 8, 11, 22, 14, 313776)
1 2 shanghai_time = datetime.now(pytz.timezone('Asia/Shanghai')) partition = shanghai_time.strftime("%Y-%m-%d %H:%M:%S")
1 2 3 4 5 from datetime import datetime >>>clip_create_time= datetime.now().isoformat() '2026-01-08T11:19:16.862023'
时间戳 1 2 datetime.now().timestamp() # year month day hour minute second 1768210913.418071
strptime 1 2 from datetime import datetime, timedelta datetime.strptime(date_key, "%Y-%m-%d") - timedelta(days=1)
timedelta 1 (datetime.strptime(date_key, "%Y-%m-%d") - timedelta(days=1)).strftime("%Y-%m-%d")
字符串 .strip() .lstrip() .rstrip() 1 2 3 字符串.strip()去除左右两侧空格 字符串.rstrip('/') 移除末尾斜杠 字符串.lstrip('/') 移除头部斜杠
.count() 1 2 3 "xxx".count("/") 'a{]dfef{}ddf'.count('{}') 计算{}的数量
.join([])
.endswith()
.startswith()
.split() 1 2 字符串.split('xxx') 所有分割 字符串.split('xxx',1) 从左开始分割一次
.repalce() 1 2 3 4 filename = filename.replace("/./", "/") 字符串.replace('xxx','替代成什么')
.find() .index()
.isdigit() .isalpha() .isalnum() 1 2 3 .isdigit() 判断是否全是数字 .isalpha() 判断是否全是字母 各种语言的字母都可以 .isalnum() 字母或者数字
.upper() .lower() .swapcase() 1 2 3 .upper() 全大写 .lower() 全小写 .swapcase() 大小写互换
.title() 1 2 3 4 转换成每个单词首字母大写 s = "hello world" print(s.title()) # 输出: Hello World
.center() .ljust() .rjust() 1 2 3 4 5 6 使用 center()、ljust()、rjust() 对字符串进行对齐 s = "Python" print(s.center(10, "*")) # 输出: **Python** print(s.ljust(10, "-")) # 输出: Python---- print(s.rjust(10, "-")) # 输出: ----Python
函数 1 2 3 4 位置参数 默认参数:默认参数写在不选参数后面,且默认参数只能是不可变对象 可变参数:*args,函数内部会获得一个tuple。如果本来传的参就是list或者tuple等则在前加*可以正常使用 *name_or_flags: str
any()函数 列表中存在至少一个 True 元素 my_list = [False, False, True, False] print(any(my_list)) # 输出: True 元组中不存在 True 元素 my_tuple = (False, False, False) print(any(my_tuple)) # 输出: False 字典中存在至少一个 True 值 my_dict = {‘a’: False, ‘b’: True, ‘c’: False} print(any(my_dict.values())) # 输出: True 集合中不存在 True 元素 my_set = {False, False, False} print(any(my_set)) # 输出: False
enumerate() 1 for i,e in enumerate(list)
with 1 2 3 with是一个 Python 的关键字,用于上下文管理器。 自动资源管理:当你用 with语句打开一个数据库连接(这里是通过 connection.cursor()获取一个游标)时,它会确保在代码块执行完毕后,无论是否发生异常,都会自动、正确地关闭游标,释放相关资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 connection = context.resources.database with connection.cursor() as cursor: cursor.execute(query) 等同于 connection = context.resources.database cursor = connection.cursor() # 手动获取游标 try: # 在这里执行你的数据库操作... # cursor.execute(...) pass finally: cursor.close() # 无论如何,最后都要手动关闭游标
三元运算符
set() .add() 1 2 3 4 5 6 7 8 9 10 tmp = set () tmp.add(1 ) print (my_set) my_set.add(2 ) print (my_set) my_set.add(1 ) print (my_set)
1 2 3 4 5 6 7 8 9 set ().add(1 ) my_set = set ().add(1 ) my_set = set () my_set.add(1 )
.update() 1 2 3 4 my_set = {1 , 2 , 3 } my_set.update([4 , 5 , 6 ]) print (my_set)
差操作 1 2 set().difference(set()) set() - set()
对称差集操作
交操作
并操作
dict 1 2 3 .items() .keys() .values()
defaultdict 1 2 3 daily_groups = defaultdict(list) daily_groups[time_period].append({'data_name':dat_name,'sorce_path':sorce_path}) keys = list(daily_groups.keys())
dict navigation_set=set()
items=0
for feature in features:
navigation= feature['properties'].get('navigation',0)
range 1 2 3 range(1,13) 1-12 range(1,13,3) 1 4 7 10
isinstance和type isinstance(i, dict)
cast() typing 类型声明 cast(CafsUtilsV1,context.resources.cafs_file_manager)
items() 1 2 3 4 5 6 7 items()的作用是把字典中的每对key和value组成一个元组,并把这些元祖放在列表中返回。 举个例子: d = {'a': 1, 'b': 2, 'c': 3, 'd': 4} d.items() dict_items([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
list .sort()
.append() .extend() .index() 1 temp = group_keys.index(context.cursor)
os getenv 1 2 os.getenv('ENV_NAME',''): 这行代码用于从系统环境变量中获取值,并设置默认值,环境变量名为ENV_NAME
listdir 1 os.listdir(PATH_TO_REQUESTS)
getcwd 1 2 import os os.getcwd() 获取当前工作路径
os.path 1 2 3 4 5 6 7 8 9 os.path.dirname() 获取文件的文件夹目录 os.path.join("dir","file") os.path.abspath(__file__) 获取文件路径包含文件名 os.path.isfile(file_path) os.path.splitext() 是 Python 中用于分离文件名和扩展名的函数。
1 2 3 4 5 6 7 8 9 10 11 12 PATH_TO_REQUESTS = os.path.join(os.path.dirname(__file__), "../", "data/requests") __file__ - 当前Python脚本文件的完整路径 比如:/home/user/project/src/utils/config.py os.path.dirname(__file__) - 获取当前文件所在的目录路径 比如:/home/user/project/src/utils "../" - 相对路径,表示上一级目录 从 /home/user/project/src/utils回到 /home/user/project/src 最终结果:/home/user/project/src/data/requests
1 filename.endswith(".json")
1 2 3 with open(f"{os.path.abspath(os.path.dirname(__file__))}/../version_info.txt") as f 获取的是当前文件位置,而不是调用它的函数文件的位置
os.remove()
读写文件with open 1 2 with open("./output.json", "w", encoding="utf-8") as f: json.dump(result_data, f, indent=2, ensure_ascii=False) # indent 用于美化格式
json 1 2 3 json.dumps(current_state) 转换成字符串 json.dumps(result_data,indent=2, ensure_ascii=False)
1 2 3 4 5 json.loads(context.cursor) 加载成dict with open(link_path, 'r', encoding='utf-8') as f: data = json.load(f) features= data.get('features',[])
re match = re.search(r’([^ ]+)_’, filename) if match: return match.group(1)
日志 python输出日志你会使用啥?print还是logging ,这几个模块更加不错哦。 - 知乎
loguru 1 from loguru import logger
.info
.warning logging 1 2 3 4 5 6 7 8 9 #控制日志输出级别 root = RootLogger(WARNING) Logger.root = root Logger.manager = Manager(Logger.root) # 输出样式 WARNING:root:这是一条warning日志 ERROR:root:这是一条error日志 总结起来就是: 日志级别:日志名称:日志内容
1 2 3 4 5 6 7 8 9 10 11 12 # 日志类型 logging.debug("") logging.warning("") logging.error("") logging.info("") logging.critical("") 或者 logging.log(logging.DEBUG,"") logging.log(logging.WARNING,"") logging.log(logging.ERROR,"") logging.log(logging.INFO,"") logging.log(logging.CRITICAL,"")
1 2 logging.basicConfig(filename='有这个参数就不会输出在终端',filemode='默认追加模式',format='设置日志格式',level='日志级别',stream='不能与filename同时存在,可以是sys.stdout、sys.stderr、网络stream')
Path 1 from pathlib import Path
.joinpath() 1 2 3 4 from pathlib import Path dir = Path(context.resources.path_resource['dataset_path'])/partition+'/' # 得到的是path相关的对象 dir = str( Path(target_path.value).joinpath(clip_name.value))+'/'+check_path # 字符串路径
1 2 3 str(Path(dir).joinpath("lidar_left/")) # 这里会将末尾的‘/’删除 str(Path(dir).joinpath("lidar_left/"))+‘/’ 可以这样
.exists() 1 Path(config_yaml).exists()
.mkdir() 1 2 3 parents如果为 True,则会创建缺失的父目录。如果为 False,则父目录必须已存在,否则抛出 FileNotFoundError。 exist_ok如果为True,则忽略目录已存在的情况,不会抛出异常。如果为 False,当目录已存在时抛出 FileExistsError。
1 2 logs_dir = Path(cfg.temp_dir) logs_dir.mkdir(parents=True, exist_ok=True)
.parent 1 self._mkdir(Path(local_file).parent)
.is_file()
.is_dir()
.suffix 1 2 3 Path.suffix:获取扩展名(包括点),例如 '.txt' path.suffix in ['.yaml', '.yml']
.suffixes 1 Path.suffixes:获取所有扩展名列表,例如 ['.tar', '.gz']
.name 1 Path.name:获取完整的文件名(包括扩展名),例如 'document.txt'
.stem 1 2 3 Path.stem:获取不包含扩展名的文件名,例如 'document' 如果文件名以点开头(例如 '.gitignore')或有多个点(例如 'archive.tar.gz'),它只去除最后一个点之后的部分
@property和setter 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 class MyClass: def __init__(self, value, items=None): self.value = value self._items = [] if items is not None: self._items = items @property def items(self): return self._items.copy() @items.setter def items(self, value): self._items = list(value) def __repr__(self): return f"MyClass(value={self.value}, items={self._items})" # 测试 if __name__ == "__main__": # 1. 测试属性保护 obj = MyClass(1, [1, 2, 3]) print(f"初始化: {obj}") # 获取items(返回的是副本) items_copy = obj.items print(f"获取的items: {items_copy}") print(f"items_copy is obj._items: {items_copy is obj._items}") # False # 尝试修改副本 items_copy.append(999) print(f"修改副本后: {obj}") # 内部未改变 # 2. 测试设置器 external_list = [10, 20, 30] obj.items = external_list print(f"\n设置新列表后: {obj}") # 修改外部列表 external_list.append(999) print(f"修改外部列表后: {obj}") # 内部未改变 # 3. 直接操作内部列表(不推荐,但可能发生) obj._items.append(888) # 可以,但违反封装原则 print(f"直接修改_items后: {obj}")
str和repr
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class PythonCS: def __init__(self, name): self.name = name def __str__(self): return '公众号: ' + self.name def __repr__(self): return "PythonCS('{}')".format(self.name) object = PythonCS('Python禅师') # str调用 print(object) print(str(object)) # repr调用 ex = repr(object) print(ex) # 重新创建对象 object2 = eval(ex) print(object.name == object2.name)
1 2 3 4 公众号: Python禅师 公众号: Python禅师 PythonCS('Python禅师') True
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Product: def __init__(self, name, price, discount=0): self.name = name self.price = price self.discount = discount def __str__(self): """给用户看的商品信息""" final_price = self.price * (1 - self.discount/100) if self.discount > 0: return f"{self.name}: 原价¥{self.price},现价¥{final_price:.2f}({self.discount}%折扣)" return f"{self.name}: ¥{self.price}" def __repr__(self): """给开发者看的完整信息""" return f"Product('{self.name}', {self.price}, discount={self.discount})" product = Product("iPhone 15", 6999, discount=10) print(str(product)) # iPhone 15: 原价¥6999,现价¥6299.10(10%折扣) print(repr(product)) # Product('iPhone 15', 6999, discount=10)
eval()和exec() eval 1 eval函数是一个内置函数,用于将字符串解析并执行为Python表达式
1 2 3 4 5 6 x = 1 print(eval('x+1')) # 输出:2 print(eval('x+y', {'x': 1, 'y': 2})) # 输出:3 print(eval('[x**2 for x in range(5)]')) # 输出:[0, 1, 4, 9, 16]
1 2 3 # 危险的使用示例 user_input = "os.system('rm -rf /')" # 一段恶意代码 eval(user_input) # 这将执行恶意代码
exec 1 2 3 4 5 6 eval函数和exec函数在一定程度上是相似的,都可以执行字符串形式的Python代码。但是,eval函数返回表达式的结果,而exec函数不返回任何结果。此外,exec可以执行更复杂的Python代码结构,比如类定义、函数定义和多行语句,而eval只能解析单个表达式。 eval('x = 5') # 这会导致语法错误,因为'x = 5'不是一个表达式 exec('x = 5') # 这可以正常执行,因为'x = 5'是一个语句 print(x) # 输出:5
库 pip 1 pip install -r xxx.txt -i http.... --trusted-host mirrors.tencentyun.com
tqdm 1 2 3 4 5 6 from tqdm import tqdm import time # 最简单的用法:包装任何可迭代对象 for i in tqdm(range(100)): time.sleep(0.01) # 模拟耗时操作
1 2 3 4 5 6 7 8 9 from tqdm import tqdm import time # 手动控制进度 pbar = tqdm(total=100) # 设置总进度 for i in range(100): time.sleep(0.01) pbar.update(1) # 每次更新1 pbar.close()
1 2 3 4 5 # 自定义描述 for i in tqdm(range(100), desc="处理中"): time.sleep(0.01) 处理中: 8%|███████████
1 2 3 4 # 设置进度条单位 for i in tqdm(range(100), unit="个文件"): time.sleep(0.01) 18%|█████████████████████████▌ | 18/100 [00:18<01:26, 1.06s/个文件]
argparse 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import argparse def get_args(): parser = argparse.ArgumentParser( epilog=None or f""" Examples: Run relocation: $ {sys.argv[0]} --config ../configs/config.yaml """, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "--config", default="./settings.yaml", metavar="xxx", help="xxxx", ) parser.add_argument( "--config", default=None, help="xxxx" ) parser.add_argument( "--xx", default="" ) parser.add_argument( "--xx", default="" ) return parser.parse_args()
1 2 args = get_args() config_yaml = args.config
.add_argument 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 (method) def add_argument( *name_or_flags: str, action: _ActionStr | Type[Action] = ..., nargs: int | _NArgsStr | _SUPPRESS_T = ..., const: Any = ..., default: Any = ..., type: ((str) -> _T@add_argument) | FileType = ..., choices: Iterable[_T@add_argument] | None = ..., required: bool = ..., help: str | None = ..., metavar: str | tuple[str, ...] | None = ..., dest: str | None = ..., version: str = ..., **kwargs: Any ) -> Action
*name_or_flags 1 2 3 4 5 6 *name_or_flags 位置参数,可以是参数名(如 'filename')或选项标志(如 '-f', '--file') 例如:add_argument('filename')或 add_argument('-f', '--file') *name_or_flags: str 可变参数
1 2 3 4 5 parser.add_argument('-o', '--output', help='输出文件路径', default='output.txt') parser.add_argument('input_file', help='输入文件路径')
type 1 2 3 将命令行参数转换为指定类型 可以是内置类型:int, float, str 也可以是自定义函数:type=lambda x: x.upper()
default 1 2 当参数未在命令行中指定时的默认值 如果指定了default,通常参数不再是必需的
nargs 1 2 3 4 5 6 7 8 指定应消耗的命令行参数数量 常用值: '?'- 0或1个参数 '*'- 0或多个参数 '+'- 1或多个参数 整数 - 精确指定参数数量 argparse.REMAINDER- 所有剩余参数
action 1 2 3 4 5 6 7 8 9 10 11 12 action="count" 指定当命令行遇到此参数时的动作 常用值: 'store'- 存储参数值(默认) 'store_const'- 存储常量值 'store_true'/'store_false'- 存储布尔值 'append'- 将值添加到列表 'append_const'- 将常量添加到列表 'count'- 统计参数出现次数 'help'- 显示帮助信息 'version'- 显示版本信息
1 2 在帮助信息中显示的参数名称 例如:metavar='FILE'会在帮助中显示为 FILE
1 2 3 const 某些action(如'store_const')使用的常量值 例如:action='store_const', const=42
1 2 3 choices 限制参数的允许值 例如:choices=['small', 'medium', 'large']
1 2 3 required 是否必须提供此参数(对可选参数有效) 默认:False(对于-f/--flag风格参数)
1 2 3 help 参数的帮助信息描述 可以在帮助文本中使用%(default)s, %(type)s等占位符
1 2 3 dest 解析结果中属性的名称 默认从参数名推导(去掉-,将-替换为_)
1 2 version 与action='version'一起使用,指定版本字符串
案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 parser = argparse.ArgumentParser() # 位置参数 parser.add_argument('input_file', help='输入文件路径') # 可选参数 parser.add_argument('-o', '--output', help='输出文件路径', default='output.txt') # 带类型转换 parser.add_argument('-n', '--number', type=int, default=1, help='重复次数 (默认: %(default)d)') # 布尔标志 parser.add_argument('-v', '--verbose', action='store_true', help='详细输出模式') # 多值参数 parser.add_argument('--sizes', nargs='+', choices=['S', 'M', 'L'], help='一个或多个尺寸') # 存储常量 parser.add_argument('--debug', action='store_const', const=True, default=False)
copy
赋值 1 2 3 4 list1 = [1, 2, [3, 4]] list2 = list1 # 只是创建新引用,指向同一对象 list2[0] = 99 print(list1) # [99, 2, [3, 4]] # 原对象也被修改
浅拷贝 1 2 3 4 5 6 7 8 9 10 import copy list1 = [1, 2, [3, 4]] list2 = copy.copy(list1) # 或 list1.copy() 或 list1[:] list2[0] = 99 print(list1) # [1, 2, [3, 4]] # 第一层没被影响 # 但是嵌套对象仍然共享! list2[2][0] = 999 print(list1) # [1, 2, [999, 4]] # 嵌套列表被修改了
案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 class MyClass: def __init__(self, value, items=None): self.value = value self.items = items if items is not None else [] # 如果传入的 items 是列表,两个对象会共享这个列表! # 测试 original_list = [1, 2, 3] obj1 = MyClass(1, original_list) obj2 = MyClass(2, original_list) obj1.items.append(999) print(original_list) # [1, 2, 3, 999] # 被修改了! print(obj2.items) # [1, 2, 3, 999] # 也被修改了!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import copy class MyClass: def __init__(self, value, items=None): self.value = value if items is None: self.items = [] # 创建新列表 else: # 创建列表的副本,避免共享 self.items = list(items) # 浅拷贝 # 或者 self.items = copy.copy(items) def deep_copy_init(self, value, items=None): """如果需要深拷贝的版本""" self.value = value if items is None: self.items = [] else: self.items = copy.deepcopy(items) # 深拷贝
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 import copy from typing import List, Any, Optional class MyClass: """一个正确处理可变默认参数的类示例""" def __init__(self, value: Any, items: Optional[List[Any]] = None): """ 初始化 MyClass Args: value: 存储的值 items: 可选的列表,如果不提供则创建新列表 (注意:会创建副本,不会修改传入的列表) """ self.value = value if items is None: self._items = [] # 创建新列表 else: # 创建传入列表的副本 # 使用 list() 进行浅拷贝,或者 copy.deepcopy() 进行深拷贝 self._items = list(items) # 浅拷贝 def add_item(self, item: Any) -> None: """添加项目""" self._items.append(item) def get_items(self) -> List[Any]: """获取项目列表的副本""" return self._items.copy() # 返回副本,保护内部数据 def __repr__(self) -> str: return f"MyClass(value={self.value}, items={self._items})" def copy(self) -> 'MyClass': """创建对象的深拷贝""" return copy.deepcopy(self) # 测试 if __name__ == "__main__": # 测试1:默认参数 obj1 = MyClass(1) obj2 = MyClass(2) obj1.add_item("A") print(f"obj1: {obj1}") # MyClass(value=1, items=['A']) print(f"obj2: {obj2}") # MyClass(value=2, items=[]) # 测试2:传入列表 my_list = [1, 2, 3] obj3 = MyClass(3, my_list) obj3.add_item(4) print(f"my_list: {my_list}") # [1, 2, 3] # 原列表未改变 print(f"obj3: {obj3}") # MyClass(value=3, items=[1, 2, 3, 4])
深拷贝 1 2 import copy res_json = copy.deepcopy(res_json_origin)
1 2 3 4 5 6 7 8 import copy list1 = [1, 2, [3, 4]] list2 = copy.deepcopy(list1) # 完全独立的副本 list2[0] = 99 list2[2][0] = 999 print(list1) # [1, 2, [3, 4]] # 原对象完全不变 print(list2) # [99, 2, [999, 4]]
deepcopy方法 1 可以使用 __deepcopy__方法自定义深拷贝行为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import copy class MyClass: def __init__(self, data): self.data = data def __deepcopy__(self, memo): # 定义对象的深拷贝行为 new_object = MyClass(copy.deepcopy(self.data, memo)) return new_object # 创建对象 obj = MyClass([1, 2, 3]) # 进行深拷贝 deep_copied_obj = copy.deepcopy(obj)
sys 1 2 workspace_path = "/workspace" sys.path.insert(0, workspace_path) # sys.path 是一个列表 插入前面可以避免标准库的覆盖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # sys.path 示例 sys.path = [ '/usr/local/lib/python3.8/site-packages', '/usr/lib/python3.8', '/home/user/project' ] # 在索引 0 位置插入 sys.path.insert(0, '/workspace') # 插入后的 sys.path print(sys.path) # 输出: # [ # '/workspace', # 新插入的路径 # '/usr/local/lib/python3.8/site-packages', # 原来的第一个元素 # '/usr/lib/python3.8', # 原来的第二个元素 # '/home/user/project' # 原来的第三个元素 # ]
序列化和反序列化 1 2 import pickle pickled_obj = pickle.dumps(obj) 序列化
1 obj = pickle.loads(xxx) 反序列化
异常 1 2 3 4 5 try: pass except Exception as e: context.log.error(f"数据分组失败: {str(e)}") raise
init .py1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 #!/usr/bin/env python # -*- coding: utf-8 -*- """ dataengine 包初始化文件 """ from .xxx import xxx from .xxx import xxx __version__ = "1.0.0" __author__ = "liumin" __all__ = [ "xx", "xxx" ] __description__ = "提供本地存储和COS存储的统一接口封装"
1 2 3 4 5 6 7 8 导入时更方便 # 没有 __init__.py 时的导入 from data_engine.data_engine import DataEngine from data_engine.storage import IStorage from data_engine.cos_storage import COSStorage # 有 __init__.py 时的导入(更简洁) from data_engine import DataEngine, IStorage, COSStorage
配置加载 yaml文件 1 2 3 4 5 6 7 import yaml with open(config_yaml) as f: config = yaml.safe_load(f) # 直接加载为字典 # 访问 model_path = config["output_reloc_result"]["model_path"]
1 configs = yaml.load(open(yaml_file), Loader=yaml.FullLoader)
1 2 3 4 5 6 7 dumped = yaml.dump( dict, default_flow_style=None, allow_unicode=True, width=9999 ) with PathManager.open(filename, "w") as f: f.write(dumped) _ = yaml.unsafe_load(dumped)
json文件 1 2 3 import json with open(filename, 'r') as f: config = json.load(f)
1 2 3 4 5 6 7 8 9 10 # 可以是传统 YAML model: type: "ResNet" layers: 50 pretrained: true train: batch_size: 32 lr: 0.001 epochs: 100
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 # config.py (LazyConfig 通常使用 .py 文件) from my_project.model import ResNet from my_project.optimizer import AdamW model = ResNet( layers=50, pretrained=True ) train = dict( batch_size=32, lr=0.001, epochs=100, optimizer=AdamW(lr=0.001) )
1 2 3 4 5 6 7 8 9 10 11 12 13 cfg = LazyConfig.load(config_yaml) # 1. 加载配置文件 cfg = LazyConfig.load("path/to/config.py") # 或 cfg = LazyConfig.load("path/to/config.yaml") # 2. 访问配置 print(cfg.model.type) # "ResNet" print(cfg.train.batch_size) # 32 # 3. 可以像字典一样访问 print(cfg["model"]["type"])
类 函数参数 1 2 3 4 from typing import Union,Tuple # Union[Type1, Type2, ...] 表示可以是 Type1 或 Type2 或 ... def load(filename: str, keys: Union[None, str, Tuple[str, ...]] = None):
1 2 3 4 5 6 7 8 9 10 # 对比固定长度的元组: Tuple[str, int] # 2个元素:第一个是str,第二个是int Tuple[str, str, str] # 3个元素,都是str Tuple[str, ...] # 任意多个元素,都是str ✅ Tuple[Any, ...] # 所有元素可以是任何类型 # 元组是不可变的序列 t1: Tuple = (1, 2, 3) # 泛型元组 t2: Tuple[int] = (1, 2, 3) # Python 3.9+ 简化写法 t3: tuple[int] = (1, 2, 3) # Python 3.9+ 更简洁
1 2 3 4 5 6 7 8 9 10 11 12 13 from typing import Union, Tuple, List, Dict, Any # 复杂的联合类型 ComplexType = Union[ None, str, int, Tuple[str, ...], List[str], Dict[str, Union[str, int, float]] ] def process(data: ComplexType):
init和new方法 1 2 3 __init__():后调用,第一个参数self,初始化器,初始化实例(设置属性),不返回任何值(返回None) __new__():先调用,第一个参数cls,构造器,创建实例(分配内存),必须返回实例
延迟初始化类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class IoProcess: _instance = None _initialized = False def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if not self._initialized: pass def init(self, yaml_file): # 加载配置文 configs = yaml.load(open(yaml_file), Loader=yaml.FullLoader) configs = configs['output_reloc_result'] self.config_base = configs["config_base"] self.match_dict = configs["match_dict"] self._initialized = True # 初始化对象 self.cos = CosUtils(self.config_base)
1 2 3 4 5 # 1. 获取单例实例 ioProcess = IoProcess() # 此时 __init__ 是空的 # 2. 在需要时进行实际初始化 ioProcess.init(config_yaml) # 延迟加载配置
类方法 1 2 3 4 5 6 class utils: @classmethod def SaveJSON(cls, file_path, json_obj): with open(file_path, "w") as json_file: json.dump(json_obj, json_file) json_file.close()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 class IoProcess: _instance = None # 类变量,存储单例实例 def __init__(self, yaml_file=None): """初始化方法,但实际初始化是延迟的""" self._initialized = False if yaml_file: self._init_from_yaml(yaml_file) def _init_from_yaml(self, yaml_file): """实际执行初始化的私有方法""" if not self._initialized: # 加载配置 with open(yaml_file) as f: config = yaml.safe_load(f) self.config = config self._initialized = True print(f"初始化完成: {yaml_file}") @classmethod def get_instance(cls, yaml_file=None): """获取单例实例的工厂方法""" if cls._instance is None: # 第一次调用:创建实例 print("创建新实例") cls._instance = cls(yaml_file) elif yaml_file and not cls._instance._initialized: # 实例存在但未初始化,且提供了配置文件 print("初始化现有实例") cls._instance._init_from_yaml(yaml_file) # 其他情况:直接返回现有实例 return cls._instance
cls 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 cls是 类本身(class object),而不是类的实例。类方法第一个参数必须是它 class IoProcess: _instance = None # 类变量,属于类本身 @classmethod def get_instance(cls, yaml_file=None): # cls 指向 IoProcess 类 print(cls) # <class '__main__.IoProcess'> print(cls.__name__) # "IoProcess" # cls 可以用来: # 1. 访问类变量 print(cls._instance) # 访问类变量 _instance # 2. 创建新实例 instance = cls() # 等价于 IoProcess() # 3. 调用其他类方法 cls.other_classmethod() # 通过类调用 IoProcess.get_instance() # cls 自动传入 IoProcess
@classmethod和@staticmethod和实例方法 1 2 3 4 5 6 7 8 9 10 11 @classmethod:参数cls 实例和类都可以访问类方法 @staticmethod:无特殊参数 实例和类都可以访问类方法 实例方法:参数self 实例可以访问 只有实例才可以访问实例属性 @classmethod和@staticmethod都可以访问类属性
继承 1 2 3 4 5 6 7 class A(B): def __init__(self, config, ioprocess, args): super().__init__(config=config , ioprocess=ioprocess, args=args) self.xxx = args.xxx
k8s 第21关 从零到一:在 K8s 中部署 Job 和 CronJob 的全面指导_job部署-CSDN博客
1 2 3 4 5 Kubernetes的简称 容器编排平台,主要用于自动化部署、扩展和管理容器化应用程序 腾讯汽车云容器服务是腾讯云面向汽车行业推出的容器化应用管理平台,主要服务于汽车企业的数字化应用开发和运维场景。基于Kubernetes技术
vscode 快捷键 ctrl + `
git git配置 1 2 3 4 5 6 7 git config --global user.name "xxx" git config --global user.email "xxx" git config user.name git config --list --global
创建一个项目 1 2 3 4 5 6 7 git init git remote add 远程仓库别名(origin) http......(https://github.com/你的用户名/你的仓库名.git) git branch -M main (将当前所在的分支重命名为main) git push 远程仓库别名 main
克隆命令 1 git clone -b dev_lk http://liuke:Liuke1999@10.0.0.29:9091/9room/spatiotemporal-data-project.git
https://blog.csdn.net/techforward/article/details/133203445 https://blog.csdn.net/m0_73745224/article/details/149708684
git branch 创建分支
显示分支 1 2 显示所有分支(本地+远程) git branch -a
1 2 3 4 5 6 7 8 git branch 只显示本地分支 git branch -r 只显示远程分支 git branch -vv 显示分支及其跟踪的远程分支
1 2 3 4 5 6 7 git branch -v 显示本地分支及其最近提交 [liuj@VM-16-2-tencentos spatiotemporal-data-project]$ git branch -v dev 88a148a [behind 217] Merge branch 'dev' of http://10.0.0.29:9091/9room/spatiotemporal-data-project into dev dev_asstercheck 6a30df7 [ahead 1] test修改 * dev_ty 021de64 Merge branch 'dev' of http://10.0.0.29:9091/9room/spatiotemporal-data-project into dev_ty
提交记录 git log 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 git log commit 456005affed0d488d8858a069cc37434ee2e0c3a (HEAD -> dev_ty, origin/dev_ty) Author: 202226460 <202226460@any3.com> Date: Fri Feb 6 09:07:41 2026 +0800 加了点注释 commit 080ce0f73632085555dab8b792f27c0311425368 Author: 202226460 <202226460@any3.com> Date: Fri Feb 6 09:07:25 2026 +0800 加了点注释 commit 6b09ab641376dab64b88bef2a03ec8394f6bf859 Author: 202226460 <202226460@any3.com> Date: Thu Feb 5 21:28:27 2026 +0800
git log –oneline 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 # 一行显示一条提交(简洁) 456005a (HEAD -> dev_ty, origin/dev_ty) 加了点注释 080ce0f 加了点注释 6b09ab6 init漏写schedule 9ed8aa4 add lable import to mongodb b23c536 Merge branch 'dev' of http://10.0.0.29:9091/9room/spatiotemporal-data-project into dev_pre f885f1b Merge branch 'dev_pre' of http://10.0.0.29:9091/9room/spatiotemporal-data-project into dev_yr 3a64d57 Merge branch 'dev_yr' into 'dev' cd169cb Merge branch 'dev' of http://10.0.0.29:9091/9room/spatiotemporal-data-project into dev_yr a8cb1a6 Merge branch 'dev' of http://10.0.0.29:9091/9room/spatiotemporal-data-project into dev_pre a0f2083 添加带数据版本的系统调度逻辑 7362b6c Merge branch 'dev_ty' into 'dev' bd1a240 label拆之后新增失败统计,labelchecker路径获取方式修改 a2abac9 update det3d 6f9ed97 修改监控job 1e7dc4c label资产修改了mongoclient使用方式 fd583dc temp update 99b29da Merge branch 'dev' of http://10.0.0.29:9091/9room/spatiotemporal-data-project into dev_yr 5dfc0e6 重构资产代码 a45243d Merge branch 'dev' of http://10.0.0.29:9091/9room/spatiotemporal-data-project into dev_ty 11d8070 update mongo resource
git log –graph 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 # 显示图形化分支结构 git log --graph * commit 456005affed0d488d8858a069cc37434ee2e0c3a (HEAD -> dev_ty, origin/dev_ty) | Author: 202226460 <202226460@any3.com> | Date: Fri Feb 6 09:07:41 2026 +0800 | | 加了点注释 | * commit 080ce0f73632085555dab8b792f27c0311425368 | Author: 202226460 <202226460@any3.com> | Date: Fri Feb 6 09:07:25 2026 +0800 | | 加了点注释 | * commit 6b09ab641376dab64b88bef2a03ec8394f6bf859 | Author: 202226460 <202226460@any3.com> | Date: Thu Feb 5 21:28:27 2026 +0800 | | init漏写schedule | * commit 9ed8aa4434fd20a7bbe3d7c07e8e080d3033720f | Author: 202226460 <202226460@any3.com> | Date: Thu Feb 5 19:36:17 2026 +0800 | | add lable import to mongodb | * commit 04b0002cf1076058322bc3affcef67e4c5e5c865 |\ Merge: 0ddde82 b3a1799 | | Author: 202226460 <202226460@any3.com> | | Date: Thu Feb 5 19:03:58 2026 +0800 | | | | Merge branch 'dev_yr' of http://10.0.0.29:9091/9room/spatiotemporal-data-project into dev_ty | | | * commit b3a1799d36b020fe5d1f03f29a84647814bbc9ba (origin/dev_yr) | | Author: yr <2521275899@qq.com> | | Date: Thu Feb 5 18:47:17 2026 +0800 | | | | update label
1 2 3 4 # 显示完整信息 git log --stat # 显示文件变更统计 git log -p # 显示具体修改内容(diff) git log --name-only # 只显示修改的文件名
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 [liuj@VM-16-2-tencentos spatiotemporal-data-project]$ git log -p commit 456005affed0d488d8858a069cc37434ee2e0c3a (HEAD -> dev_ty, origin/dev_ty) Author: 202226460 <202226460@any3.com> Date: Fri Feb 6 09:07:41 2026 +0800 加了点注释 diff --git a/production_system/src/workspace/partitions/partition_defs.py b/production_system/src/workspace/partitions/partition_defs.py index ad99ec1..1961ccc 100644 --- a/production_system/src/workspace/partitions/partition_defs.py +++ b/production_system/src/workspace/partitions/partition_defs.py @@ -18,7 +18,7 @@ time_window_task_partitions_for_statistical_analysis = TimeWindowPartitionsDefin #cron_schedule="*/3 * * * *", 设置成这样,分区数量不符合要求 timezone="Asia/Shanghai", fmt="%Y-%m-%d", - end_offset=1, # 提前显示 + end_offset=1, # DailyPartitionsDefinition会延迟一天,time_window_task_partitions_for_statistical_analysis的end_offset=1提前显示当天 ) __all__ = [ "clip_partitions",'daily_task_partitions','lable_job_clip_partitions', commit 080ce0f73632085555dab8b792f27c0311425368 Author: 202226460 <202226460@any3.com> Date: Fri Feb 6 09:07:25 2026 +0800 加了点注释 diff --git a/.vscode/settings.json b/.vscode/settings.json index 3516cb9..0ae70d2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,5 @@ { - "python.formatting.provider": "autopep8" + "python.formatting.provider": "autopep8", + "workbench.editor.enablePreviewFromCodeNavigation": true, + "workbench.editor.enablePreviewFromQuickOpen": true } \ No newline at end of file commit 6b09ab641376dab64b88bef2a03ec8394f6bf859 Author: 202226460 <202226460@any3.com> Date: Thu Feb 5 21:28:27 2026 +0800
筛选 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # 按数量限制 git log -5 # 显示最近5条提交 git log -n 10 # 显示最近10条提交 # 按时间筛选 git log --since="2026-01-01" git log --until="2026-01-31" git log --since="2 weeks ago" # 按作者筛选 git log --author="张三" git log --author="zhangsan" # 按提交信息筛选 git log --grep="bugfix" # 搜索包含"bugfix"的提交信息 # 按文件筛选 git log -- README.md # 查看README.md的修改历史 git log -- src/ # 查看src目录的修改历史
1 2 3 4 5 6 7 8 # 最实用的组合:图形化+一行显示 git log --oneline --graph --all # 显示统计信息+图形 git log --stat --graph # 漂亮的格式化输出 git log --pretty=format:"%h - %an, %ar : %s"
1 2 3 4 5 # 查看在dev分支但不在main分支的提交 git log main..dev # 查看两个分支的差异 git log branch1..branch2
1 2 3 4 5 # 显示所有分支的时间线 git log --oneline --graph --all # 按时间倒序 git log --reverse
1 2 3 4 5 6 7 8 9 10 # 自定义输出格式 git log --pretty=format:"%C(yellow)%h %Creset%s %Cgreen(%cr) %C(bold blue)<%an>%Creset" # 常用占位符: # %h - 短哈希 # %H - 完整哈希 # %s - 提交说明 # %an - 作者名 # %ar - 相对时间 # %ad - 日期
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 分页模式下的常用操作键 按键 功能 说明 空格键 向下翻一页 最常用 回车键 向下翻一行 逐行查看 b 向上翻一页 回看内容 q 退出分页模式 返回命令行 /关键词 搜索内容 输入/后输入关键词搜索 n 查找下一个匹配项 配合搜索使用 N 查找上一个匹配项 g 跳到第一行 G 跳到最后一行
切换分支 1 2 git checkout 分支名 切换分支 git checkout -b 分支名 创建和切换
1 2 3 4 5 这个命令是错误的,因为它试图切换到origin这个分支,但origin是一个远程仓库的别名,不是一个分支。 正确的应该是: git checkout branch_name- 切换到指定分支 git checkout -b new_branch- 创建并切换到新分支 git checkout origin/dev_yr- 创建一个跟踪远程分支的本地分支
删除分支
合并分支 1 2 3 4 5 6 7 git merge origin/dev_yr 将远程分支 origin/dev_yr合并到当前分支 或者 git pull origin dev_yr 这个命令实际上是两个操作的组合: git fetch origin dev_yr- 获取远程 dev_yr分支的最新代码 git merge FETCH_HEAD- 将获取的远程分支合并到当前分支
1 2 3 4 5 6 7 8 # 1. 先将你的修改暂存起来(执行下面操作,要先暂存,不然代码会丢失) git stash # 2. 拉取远程最新代码 git pull origin dev # 3. 将你暂存的修改恢复回来 git stash pop
git fetch和git pull 1 2 3 git fetch和git pull的主要区别如下: git fetch: 仅更新远程分支的信息,不会自动合并到本地分支。用户需要手动执行合并操作。 git pull: 同时执行 git fetch 和 git merge,直接将远程分支的更新合并到当前分支,可能隐藏过程细节。
FETCH_HEAD与HEAD的区别 1 HEAD是一个指向当前工作分支的引用,而FETCH_HEAD则是指向最后一次从远程仓库抓取分支的最新状态。HEAD通常用于表示当前工作分支的状态,而FETCH_HEAD则用于表示最近一次从远程仓库拉取的数据。
发布分支 1 2 3 4 5 6 7 8 9 10 11 12 13 Publish Branch 当你在本地创建了一个新分支,但远程仓库还没有这个分支时,你需要将这个本地分支"发布"(推送)到远程仓库 # 1. 在本地创建新分支 git checkout -b feature/new-feature # 2. 进行开发,做了一些提交 git add . git commit -m "Add new feature" # 3. 将本地分支发布到远程仓库 git push -u origin feature/new-feature # 这里的 -u 表示设置上游分支(upstream)
git stash 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 # 当前在 feature 分支,有未提交的修改 git status # 显示有修改的文件 # 1. 暂存当前修改 git stash # 或 git stash save "描述信息" # 2. 工作区变干净了 git status # 显示没有修改 # 3. 切换到其他分支 git checkout main # 4. 修复bug后,回到 feature 分支 git checkout feature # 5. 恢复暂存的修改 git stash pop
1 2 3 4 5 6 场景2:拉取最新代码前 你想拉取远程更新,但有未提交的修改: # 有本地修改,但想先拉取最新代码 git stash # 暂存修改 git pull # 拉取最新代码 git stash pop # 恢复修改
命令 1 git push origin dev_ty --force
debug 1 2 .vscode/launch.json launch.json 可以自行配置,vscode用于调试配置的文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 { // Use IntelliSense to learn about possible attributes. // Hover to view descriptions of existing attributes. // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ { "name": "Python: Current File", "type": "python", "request": "launch", "program": "${file}", "console": "integratedTerminal", "justMyCode": true } ] }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 { "version": "0.2.0", "configurations": [ { "name": "Python: run_output", "type": "python", "request": "launch", "program": "${workspaceFolder}/run_output.py", "args": [ "--tasks", "CD701_LS6C3G0Y2RA400208_2025-01-01_13-42-18", "--run-type", "2", "--config", "/data/config/output.yaml" ], "console": "integratedTerminal", "justMyCode": true } ] }
调试源码 1 2 3 4 5 6 7 8 9 10 # 在终端中执行 cd /workspace/src/workspace/sensors/test # 创建指向源码的符号链接 ln -s /workspace/data_engine/data_engine ./data_engine # 现在可以直接导入 # test.py import data_engine from data_engine import DataEngine, COSStorage
MongoDB MongoDB 概念解析 | 菜鸟教程
Mongodb入门到精通—> 保姆级别教程 - JamieChyi - 博客园
1 MongoDB 将数据存储为一个文档,数据结构由键值(key=>value)对组成,文档类似于 JSON 对象,字段值可以包含其他文档,数组及文档数组;
1 数据库 集合(collection) 文档(document) 字段 主键(_id) 索引(index) 表连接,MongoDB不支持
1 2 3 4 5 6 7 8 9 10 文档中的键/值对是有序的。 MongoDB区分类型和大小写。 MongoDB的文档不能有重复的键。 文档键命名规范: 键不能含有\0 (空字符)。这个字符用来表示键的结尾。 .和$有特别的意义,只有在特定环境下才能使用。 以下划线"_"开头的键是保留的(不是严格要求的)。
数据类型 1 2 文档的键是字符串。除了少数例外情况,键可以使用任意UTF-8字符。 文档中的值不仅可以是在双引号里面的字符串,还可以是其他几种数据类型(甚至可以是整个嵌入的文档)。
12.23交流 run_dev.sh -p docker -run -h 0.0.0.0
进入容器之后,外部代码修改不一定得行,chmod 777 * ip:4000
init .py和definitions.py
build.py 0.0.2
build.py学习
测试杨瑞 . env.sh env
clip_task_lidar_extcalib_todo:
CD701_LS6C3E2X5SK400065_2025-12-23_10-09-23
CD701_LS6C3G342SK400136_2025-12-23_15-56-57
CD701_LS6C3G347RA400116_2025-12-23_18-41-22
CD701_LS6C3G347RA400116_2025-12-23_17-14-57
批量处理sensor备份 lidar_extcalib_sensor.py 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 from datetime import datetime from dagster import DagsterEventType,asset_sensor,AssetKey,RunRequest,sensor,\ SensorEvaluationContext,\ EventLogEntry from ..assets import clip_task_check_parse_todo from typing import cast from dagster_graphql import DagsterGraphQLClient @sensor( required_resource_keys={'graphql_client'}, minimum_interval_seconds=60, job_name='lidar_extcalib_todo_job' ) def auto_excute_lidar_extcalib_sensor(context: SensorEvaluationContext): # dagster_event = event_log_entry.dagster_event # if not (dagster_event and # dagster_event.event_type_value == DagsterEventType.ASSET_MATERIALIZATION.value): # return None # asset_materialization = dagster_event.event_specific_data.materialization # asset_key = asset_materialization.asset_key # partition = asset_materialization.partition client = cast(DagsterGraphQLClient,context.resources.graphql_client) query = """ query PartitionHealthQuery($assetKey: AssetKeyInput!) { assetNodeOrError(assetKey: $assetKey) { ... on AssetNode { id partitionKeysByDimension { name type partitionKeys __typename } assetPartitionStatuses { ... on TimePartitionStatuses { ranges { status startTime endTime startKey endKey __typename } __typename } ... on DefaultPartitionStatuses { materializedPartitions materializingPartitions failedPartitions __typename } ... on MultiPartitionStatuses { primaryDimensionName ranges { primaryDimStartKey primaryDimEndKey primaryDimStartTime primaryDimEndTime secondaryDim { ... on TimePartitionStatuses { ranges { status startTime endTime startKey endKey __typename } __typename } ... on DefaultPartitionStatuses { materializedPartitions materializingPartitions failedPartitions __typename } __typename } __typename } __typename } __typename } __typename } __typename } } """ assetkey = "clip_task_check_parse_todo" variables = { "assetKey": { "path": [ f"{assetkey}" ] } } # 获取clip_task_check_parse_todo物化成功的分区 result = client._execute(query=query,variables=variables) # dict_keys(['materializedPartitions', 'materializingPartitions', 'failedPartitions', '__typename']) assetPartitionStatuses = result.get("assetNodeOrError").get("assetPartitionStatuses") check_parse_mdp = assetPartitionStatuses.get("materializedPartitions") context.log.info(f"check_parse_mdp 长度:{len(check_parse_mdp)} 值:{check_parse_mdp}") # 获取clip_task_time_algin_todo物化成功的分区 assetkey = "clip_task_time_algin_todo" variables = { "assetKey": { "path": [ f"{assetkey}" ] } } result = client._execute(query=query,variables=variables) # dict_keys(['materializedPartitions', 'materializingPartitions', 'failedPartitions', '__typename']) assetPartitionStatuses = result.get("assetNodeOrError").get("assetPartitionStatuses") time_algin_mgp = assetPartitionStatuses.get("materializingPartitions") context.log.info(f"time_algin_mgp 长度:{len(time_algin_mgp)} 值:{time_algin_mgp}") if len(time_algin_mgp)>0: context.log.info(f"lidar_extcalib_todo_job有正在物化的分区,不执行新的物化!") return None time_algin_mdp = assetPartitionStatuses.get("materializedPartitions") context.log.info(f"time_algin_mdp 长度:{len(time_algin_mdp)} 值:{time_algin_mdp}") time_algin_fdp = assetPartitionStatuses.get("failedPartitions") context.log.info(f"time_algin_fdp 长度:{len(time_algin_fdp)} 值:{time_algin_fdp}") # 失败的分区单独处理 result_partitions = set(check_parse_mdp).difference(set(time_algin_mdp)).difference(set(time_algin_fdp)) context.log.info(f"result_partitions 长度:{len(result_partitions)} 值:{result_partitions}") if result_partitions: run_requests = [] for clip_name in (result_partitions[0:200] if result_partitions>200 else result_partitions): run_requests.append(RunRequest(partition_key=clip_name)) context.log.info(f"请求下游处理 {clip_name}") return run_requests
dynamic_annotation_sensor.py 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 from datetime import datetime from dagster import DagsterEventType,asset_sensor,AssetKey,RunRequest,sensor,\ SensorEvaluationContext,\ EventLogEntry from ..assets import clip_task_check_parse_todo from typing import cast from dagster_graphql import DagsterGraphQLClient @sensor( # asset_key=AssetKey("clip_task_check_parse_todo"), required_resource_keys={'graphql_client'}, minimum_interval_seconds=60, job_name='dynamic_annotation_todo_job' ) def dynamic_annotation_sensor(context: SensorEvaluationContext): # dagster_event = event_log_entry.dagster_event # if not (dagster_event and # dagster_event.event_type_value == DagsterEventType.ASSET_MATERIALIZATION.value): # return None # asset_materialization = dagster_event.event_specific_data.materialization # asset_key = asset_materialization.asset_key # partition = asset_materialization.partition client = cast(DagsterGraphQLClient,context.resources.graphql_client) query = """ query PartitionHealthQuery($assetKey: AssetKeyInput!) { assetNodeOrError(assetKey: $assetKey) { ... on AssetNode { id partitionKeysByDimension { name type partitionKeys __typename } assetPartitionStatuses { ... on TimePartitionStatuses { ranges { status startTime endTime startKey endKey __typename } __typename } ... on DefaultPartitionStatuses { materializedPartitions materializingPartitions failedPartitions __typename } ... on MultiPartitionStatuses { primaryDimensionName ranges { primaryDimStartKey primaryDimEndKey primaryDimStartTime primaryDimEndTime secondaryDim { ... on TimePartitionStatuses { ranges { status startTime endTime startKey endKey __typename } __typename } ... on DefaultPartitionStatuses { materializedPartitions materializingPartitions failedPartitions __typename } __typename } __typename } __typename } __typename } __typename } __typename } } """ assetkey = "clip_find_result_file_todo" variables = { "assetKey": { "path": [ f"{assetkey}" ] } } # 获取clip_find_result_file_todo物化成功的分区 result = client._execute(query=query,variables=variables) # dict_keys(['materializedPartitions', 'materializingPartitions', 'failedPartitions', '__typename']) assetPartitionStatuses = result.get("assetNodeOrError").get("assetPartitionStatuses") find_result_file_mdp = assetPartitionStatuses.get("materializedPartitions") context.log.info(f"find_result_file_mdp 长度:{len(find_result_file_mdp)} 值:{find_result_file_mdp}") # 获取clip_task_det3d_todo物化成功的分区 assetkey = "clip_task_det3d_todo" variables = { "assetKey": { "path": [ f"{assetkey}" ] } } result = client._execute(query=query,variables=variables) # dict_keys(['materializedPartitions', 'materializingPartitions', 'failedPartitions', '__typename']) assetPartitionStatuses = result.get("assetNodeOrError").get("assetPartitionStatuses") det3d_mgp = assetPartitionStatuses.get("materializingPartitions") context.log.info(f"det3d_mgp 长度:{len(det3d_mgp)} 值:{det3d_mgp}") if len(det3d_mgp)>0: context.log.info(f"clip_task_det3d_todo有正在物化的分区,不执行新的物化!") return None det3d_mdp = assetPartitionStatuses.get("materializedPartitions") context.log.info(f"det3d_mdp 长度:{len(det3d_mdp)} 值:{det3d_mdp}") det3d_fdp = assetPartitionStatuses.get("failedPartitions") context.log.info(f"det3d_fdp 长度:{len(det3d_fdp)} 值:{det3d_fdp}") # 失败的分区单独处理 result_partitions = set(find_result_file_mdp).difference(set(det3d_mdp)).difference(set(det3d_fdp)) context.log.info(f"result_partitions 长度:{len(result_partitions)} 值:{result_partitions}") if result_partitions: run_requests = [] for clip_name in (result_partitions[0:200] if result_partitions>200 else result_partitions): run_requests.append(RunRequest(partition_key=clip_name)) context.log.info(f"请求下游处理 {clip_name}") return run_requests
sensor写法备份 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 from datetime import datetime from typing import List, Optional, Set from dagster import DagsterEventType, DagsterRunStatus, EventRecordsFilter, RunRequest, RunStatusSensorContext, build_input_context, run_status_sensor, sensor,asset_sensor,AssetKey from ..jobs.auto_task_jobs import * from ..partitions.partition_defs import daily_task_partitions,clip_partitions @run_status_sensor( run_status=DagsterRunStatus.SUCCESS, minimum_interval_seconds=60, monitored_jobs=[auto_get_task_job], request_job=clip_task_todo_job ) def one_day_clip_auto_job_sensor(context: RunStatusSensorContext): run_id = context.dagster_run.run_id instance = context.instance partition_tag = context.dagster_run.tags.get("dagster/partition") if partition_tag: context.log.info(f"检测到作业 {context.dagster_run.job_name} 成功执行") context.log.info(f"分区键: {partition_tag}") #获取动态 registered_clips: Set[str] = set(instance.get_dynamic_partitions("auto_dynamic")) # asset_key=AssetKey("oneday_clips_srouce_data") materialization_records = instance.get_event_records( EventRecordsFilter( event_type= DagsterEventType.ASSET_MATERIALIZATION, asset_key=asset_key, asset_partitions=[partition_tag] ), limit=1 ) if not materialization_records: context.log.warning(f"未找到分区 {partition_tag} 的物化记录") return None data_record= materialization_records[0] materialization= data_record.event_log_entry.dagster_event.step_materialization_data data_metadata= materialization.materialization.metadata for metadata in data_metadata: context.log.info(metadata) # 构建输入上下文并加载数据 # input_context = build_input_context( # asset_key=asset_key, # partition_key=partition_tag, # asset_partitions_def=daily_task_partitions, # instance=instance # ) # asset_data = context.resources.io_manager.load_input(input_context) # if 'clip_data' not in asset_data: # context.log.warning(f"资产数据中缺少 'clip_data' 字段") # return None # clip_data = asset_data['clip_data'] # # 提取所有剪辑名称 # all_clips: List[str] = [] # for clip_names in clip_data.values(): # if isinstance(clip_names, list): # for v in clip_names: # all_clips.append(v["data_name"]) # if not all_clips: # context.log.info("未找到任何clip数据") # return None # # 获取已注册的分区 # registered_clips: Set[str] = set(context.instance.get_dynamic_partitions("clip_partitions")) # new_clips= set(all_clips)-registered_clips # if not new_clips: # context.log.info("没有新的剪辑需要添加") # else: # # 只添加新的剪辑分区 # context.instance.add_dynamic_partitions("clip_partitions", new_clips) # context.log.info(f"成功添加 {len(new_clips)} 个新Clip分区") # # 只有在有新剪辑时才触发运行 # if new_clips: # run_requests = [] # for clip_name in new_clips: # run_requests.append(RunRequest(partition_key=clip_name,tags={'task_group':partition_tag})) # context.log.info(f"请求下游处理 {clip_name}") # return run_requests # else: return None
f 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 g:1400 1300 s:900 34 y:1040 m:700 simi: 创业 债券 国债、地方债、企业债 银行利率会影响 公元 公元前 世纪 财政赤字:xxxx 通过国债、地方债、主权外债来弥补 赤字率:赤字率 = (财政赤字总额 / 国内生产总值 GDP) × 100% 3% 贸易赤字: 主权外债: 美债:只有当它被美国以外的国家和实体持有时 增值税: 期货 期权 当日收益 = 本金*当日收益率 收盘本金=当日收益+开盘本金 第二日负收益率=前一日收益率 第二日收益 = 本金-本金*第二日负收益率*1 2 3 4