1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
| from datetime import datetime from typing import List, Optional, Set from dagster import DagsterEventType, DagsterRunStatus, EventRecordsFilter, RunRequest, RunStatusSensorContext, build_input_context, run_status_sensor, sensor,asset_sensor,AssetKey from ..jobs.auto_task_jobs import * from ..partitions.partition_defs import daily_task_partitions,clip_partitions
@run_status_sensor( run_status=DagsterRunStatus.SUCCESS, minimum_interval_seconds=60, monitored_jobs=[auto_get_task_job], request_job=clip_task_todo_job ) def one_day_clip_auto_job_sensor(context: RunStatusSensorContext): run_id = context.dagster_run.run_id instance = context.instance partition_tag = context.dagster_run.tags.get("dagster/partition") if partition_tag: context.log.info(f"检测到作业 {context.dagster_run.job_name} 成功执行") context.log.info(f"分区键: {partition_tag}") #获取动态 registered_clips: Set[str] = set(instance.get_dynamic_partitions("auto_dynamic")) # asset_key=AssetKey("oneday_clips_srouce_data") materialization_records = instance.get_event_records( EventRecordsFilter( event_type= DagsterEventType.ASSET_MATERIALIZATION, asset_key=asset_key, asset_partitions=[partition_tag] ), limit=1 ) if not materialization_records: context.log.warning(f"未找到分区 {partition_tag} 的物化记录") return None data_record= materialization_records[0] materialization= data_record.event_log_entry.dagster_event.step_materialization_data data_metadata= materialization.materialization.metadata for metadata in data_metadata: context.log.info(metadata)
# 构建输入上下文并加载数据 # input_context = build_input_context( # asset_key=asset_key, # partition_key=partition_tag, # asset_partitions_def=daily_task_partitions, # instance=instance # ) # asset_data = context.resources.io_manager.load_input(input_context) # if 'clip_data' not in asset_data: # context.log.warning(f"资产数据中缺少 'clip_data' 字段") # return None # clip_data = asset_data['clip_data'] # # 提取所有剪辑名称 # all_clips: List[str] = [] # for clip_names in clip_data.values(): # if isinstance(clip_names, list): # for v in clip_names: # all_clips.append(v["data_name"]) # if not all_clips: # context.log.info("未找到任何clip数据") # return None # # 获取已注册的分区 # registered_clips: Set[str] = set(context.instance.get_dynamic_partitions("clip_partitions")) # new_clips= set(all_clips)-registered_clips # if not new_clips: # context.log.info("没有新的剪辑需要添加") # else: # # 只添加新的剪辑分区 # context.instance.add_dynamic_partitions("clip_partitions", new_clips) # context.log.info(f"成功添加 {len(new_clips)} 个新Clip分区") # # 只有在有新剪辑时才触发运行 # if new_clips: # run_requests = [] # for clip_name in new_clips: # run_requests.append(RunRequest(partition_key=clip_name,tags={'task_group':partition_tag})) # context.log.info(f"请求下游处理 {clip_name}") # return run_requests # else: return None
|