dagster 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @asset(group_name="task_todo", required_resource_keys={"path_resource"}, partitions_def=clip_partitions, config_schema={ "image_name": Field( str, default_value="changan-map.tencentcloudcr.com/train/dat_parse_sys", description="解析镜像" ), "image_version": Field( str, default_value="v3.1.2", description="容器镜像版本" ), "exe_command": Field(list, default_value=["/workspace/parse_data.sh"], description='启动命令'), "exe_args": Field(list, default_value=[ "--task", "{}", "--type", "image", "--in_dir", "[[raw_data]]", "--out_dir", "[[raw_image]]", ], description='命令参数'),
1 2 [[raw_data]] 是从required_resource_keys中去获取值
sensor @sensor 定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def sensor( job_name: Optional[str] = None, *, name: Optional[str] = None, minimum_interval_seconds: Optional[int] = None, description: Optional[str] = None, job: Optional[ExecutableDefinition] = None, jobs: Optional[Sequence[ExecutableDefinition]] = None, default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, asset_selection: Optional[CoercibleToAssetSelection] = None, required_resource_keys: Optional[set[str]] = None, tags: Optional[Mapping[str, str]] = None, metadata: Optional[RawMetadataMapping] = None, target: Optional[ Union[ "CoercibleToAssetSelection", "AssetsDefinition", "JobDefinition", "UnresolvedAssetJobDefinition", ] ] = None, ) -> Callable[[RawSensorEvaluationFunction], SensorDefinition]:
案例一 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 from datetime import datetime from dagster import DagsterEventType,asset_sensor,AssetKey,RunRequest,sensor,\ SensorEvaluationContext,\ EventLogEntry from ..assets import clip_task_check_parse_todo from typing import cast from dagster_graphql import DagsterGraphQLClient @sensor( required_resource_keys={'graphql_client'}, minimum_interval_seconds=60, job_name='lidar_extcalib_todo_job' ) def auto_excute_lidar_extcalib_sensor(context: SensorEvaluationContext): # dagster_event = event_log_entry.dagster_event # if not (dagster_event and # dagster_event.event_type_value == DagsterEventType.ASSET_MATERIALIZATION.value): # return None # asset_materialization = dagster_event.event_specific_data.materialization # asset_key = asset_materialization.asset_key # partition = asset_materialization.partition client = cast(DagsterGraphQLClient,context.resources.graphql_client) query = """ query PartitionHealthQuery($assetKey: AssetKeyInput!) { assetNodeOrError(assetKey: $assetKey) { ... on AssetNode { id partitionKeysByDimension { name type partitionKeys __typename } assetPartitionStatuses { ... on TimePartitionStatuses { ranges { status startTime endTime startKey endKey __typename } __typename } ... on DefaultPartitionStatuses { materializedPartitions materializingPartitions failedPartitions __typename } ... on MultiPartitionStatuses { primaryDimensionName ranges { primaryDimStartKey primaryDimEndKey primaryDimStartTime primaryDimEndTime secondaryDim { ... on TimePartitionStatuses { ranges { status startTime endTime startKey endKey __typename } __typename } ... on DefaultPartitionStatuses { materializedPartitions materializingPartitions failedPartitions __typename } __typename } __typename } __typename } __typename } __typename } __typename } } """ assetkey = "clip_task_check_parse_todo" variables = { "assetKey": { "path": [ f"{assetkey}" ] } } # 获取clip_task_check_parse_todo物化成功的分区 result = client._execute(query=query,variables=variables) # dict_keys(['materializedPartitions', 'materializingPartitions', 'failedPartitions', '__typename']) assetPartitionStatuses = result.get("assetNodeOrError").get("assetPartitionStatuses") check_parse_mdp = assetPartitionStatuses.get("materializedPartitions") context.log.info(f"check_parse_mdp 长度:{len(check_parse_mdp)} 值:{check_parse_mdp}") # 获取clip_task_time_algin_todo物化成功的分区 assetkey = "clip_task_time_algin_todo" variables = { "assetKey": { "path": [ f"{assetkey}" ] } } result = client._execute(query=query,variables=variables) # dict_keys(['materializedPartitions', 'materializingPartitions', 'failedPartitions', '__typename']) assetPartitionStatuses = result.get("assetNodeOrError").get("assetPartitionStatuses") time_algin_mgp = assetPartitionStatuses.get("materializingPartitions") context.log.info(f"time_algin_mgp 长度:{len(time_algin_mgp)} 值:{time_algin_mgp}") if len(time_algin_mgp)>0: context.log.info(f"lidar_extcalib_todo_job有正在物化的分区,不执行新的物化!") return None time_algin_mdp = assetPartitionStatuses.get("materializedPartitions") context.log.info(f"time_algin_mdp 长度:{len(time_algin_mdp)} 值:{time_algin_mdp}") time_algin_fdp = assetPartitionStatuses.get("failedPartitions") context.log.info(f"time_algin_fdp 长度:{len(time_algin_fdp)} 值:{time_algin_fdp}") # 失败的分区单独处理 result_partitions = set(check_parse_mdp).difference(set(time_algin_mdp)).difference(set(time_algin_fdp)) context.log.info(f"result_partitions 长度:{len(result_partitions)} 值:{result_partitions}") if result_partitions: run_requests = [] for clip_name in (result_partitions[0:200] if result_partitions>200 else result_partitions): run_requests.append(RunRequest(partition_key=clip_name)) context.log.info(f"请求下游处理 {clip_name}") return run_requests
案例二 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 from datetime import datetime from dagster import DagsterEventType,asset_sensor,AssetKey,RunRequest,sensor,\ SensorEvaluationContext,\ EventLogEntry from ..assets import clip_task_check_parse_todo from typing import cast from dagster_graphql import DagsterGraphQLClient @sensor( # asset_key=AssetKey("clip_task_check_parse_todo"), required_resource_keys={'graphql_client'}, minimum_interval_seconds=60, job_name='dynamic_annotation_todo_job' ) def dynamic_annotation_sensor(context: SensorEvaluationContext): # dagster_event = event_log_entry.dagster_event # if not (dagster_event and # dagster_event.event_type_value == DagsterEventType.ASSET_MATERIALIZATION.value): # return None # asset_materialization = dagster_event.event_specific_data.materialization # asset_key = asset_materialization.asset_key # partition = asset_materialization.partition client = cast(DagsterGraphQLClient,context.resources.graphql_client) query = """ query PartitionHealthQuery($assetKey: AssetKeyInput!) { assetNodeOrError(assetKey: $assetKey) { ... on AssetNode { id partitionKeysByDimension { name type partitionKeys __typename } assetPartitionStatuses { ... on TimePartitionStatuses { ranges { status startTime endTime startKey endKey __typename } __typename } ... on DefaultPartitionStatuses { materializedPartitions materializingPartitions failedPartitions __typename } ... on MultiPartitionStatuses { primaryDimensionName ranges { primaryDimStartKey primaryDimEndKey primaryDimStartTime primaryDimEndTime secondaryDim { ... on TimePartitionStatuses { ranges { status startTime endTime startKey endKey __typename } __typename } ... on DefaultPartitionStatuses { materializedPartitions materializingPartitions failedPartitions __typename } __typename } __typename } __typename } __typename } __typename } __typename } } """ assetkey = "clip_find_result_file_todo" variables = { "assetKey": { "path": [ f"{assetkey}" ] } } # 获取clip_find_result_file_todo物化成功的分区 result = client._execute(query=query,variables=variables) # dict_keys(['materializedPartitions', 'materializingPartitions', 'failedPartitions', '__typename']) assetPartitionStatuses = result.get("assetNodeOrError").get("assetPartitionStatuses") find_result_file_mdp = assetPartitionStatuses.get("materializedPartitions") context.log.info(f"find_result_file_mdp 长度:{len(find_result_file_mdp)} 值:{find_result_file_mdp}") # 获取clip_task_det3d_todo物化成功的分区 assetkey = "clip_task_det3d_todo" variables = { "assetKey": { "path": [ f"{assetkey}" ] } } result = client._execute(query=query,variables=variables) # dict_keys(['materializedPartitions', 'materializingPartitions', 'failedPartitions', '__typename']) assetPartitionStatuses = result.get("assetNodeOrError").get("assetPartitionStatuses") det3d_mgp = assetPartitionStatuses.get("materializingPartitions") context.log.info(f"det3d_mgp 长度:{len(det3d_mgp)} 值:{det3d_mgp}") if len(det3d_mgp)>0: context.log.info(f"clip_task_det3d_todo有正在物化的分区,不执行新的物化!") return None det3d_mdp = assetPartitionStatuses.get("materializedPartitions") context.log.info(f"det3d_mdp 长度:{len(det3d_mdp)} 值:{det3d_mdp}") det3d_fdp = assetPartitionStatuses.get("failedPartitions") context.log.info(f"det3d_fdp 长度:{len(det3d_fdp)} 值:{det3d_fdp}") # 失败的分区单独处理 result_partitions = set(find_result_file_mdp).difference(set(det3d_mdp)).difference(set(det3d_fdp)) context.log.info(f"result_partitions 长度:{len(result_partitions)} 值:{result_partitions}") if result_partitions: run_requests = [] for clip_name in (result_partitions[0:200] if result_partitions>200 else result_partitions): run_requests.append(RunRequest(partition_key=clip_name)) context.log.info(f"请求下游处理 {clip_name}") return run_requests
@run_status_sensor 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 from datetime import datetime from typing import List, Optional, Set from dagster import DagsterEventType, DagsterRunStatus, EventRecordsFilter, RunRequest, RunStatusSensorContext, build_input_context, run_status_sensor, sensor,asset_sensor,AssetKey from ..jobs.auto_task_jobs import * from ..partitions.partition_defs import daily_task_partitions,clip_partitions @run_status_sensor( run_status=DagsterRunStatus.SUCCESS, minimum_interval_seconds=60, monitored_jobs=[auto_get_task_job], request_job=clip_task_todo_job ) def one_day_clip_auto_job_sensor(context: RunStatusSensorContext): run_id = context.dagster_run.run_id instance = context.instance partition_tag = context.dagster_run.tags.get("dagster/partition") if partition_tag: context.log.info(f"检测到作业 {context.dagster_run.job_name} 成功执行") context.log.info(f"分区键: {partition_tag}") #获取动态 registered_clips: Set[str] = set(instance.get_dynamic_partitions("auto_dynamic")) # asset_key=AssetKey("oneday_clips_srouce_data") materialization_records = instance.get_event_records( EventRecordsFilter( event_type= DagsterEventType.ASSET_MATERIALIZATION, asset_key=asset_key, asset_partitions=[partition_tag] ), limit=1 ) if not materialization_records: context.log.warning(f"未找到分区 {partition_tag} 的物化记录") return None data_record= materialization_records[0] materialization= data_record.event_log_entry.dagster_event.step_materialization_data data_metadata= materialization.materialization.metadata for metadata in data_metadata: context.log.info(metadata) # 构建输入上下文并加载数据 # input_context = build_input_context( # asset_key=asset_key, # partition_key=partition_tag, # asset_partitions_def=daily_task_partitions, # instance=instance # ) # asset_data = context.resources.io_manager.load_input(input_context) # if 'clip_data' not in asset_data: # context.log.warning(f"资产数据中缺少 'clip_data' 字段") # return None # clip_data = asset_data['clip_data'] # # 提取所有剪辑名称 # all_clips: List[str] = [] # for clip_names in clip_data.values(): # if isinstance(clip_names, list): # for v in clip_names: # all_clips.append(v["data_name"]) # if not all_clips: # context.log.info("未找到任何clip数据") # return None # # 获取已注册的分区 # registered_clips: Set[str] = set(context.instance.get_dynamic_partitions("clip_partitions")) # new_clips= set(all_clips)-registered_clips # if not new_clips: # context.log.info("没有新的剪辑需要添加") # else: # # 只添加新的剪辑分区 # context.instance.add_dynamic_partitions("clip_partitions", new_clips) # context.log.info(f"成功添加 {len(new_clips)} 个新Clip分区") # # 只有在有新剪辑时才触发运行 # if new_clips: # run_requests = [] # for clip_name in new_clips: # run_requests.append(RunRequest(partition_key=clip_name,tags={'task_group':partition_tag})) # context.log.info(f"请求下游处理 {clip_name}") # return run_requests # else: return None
@asset_sensor 定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def asset_sensor( asset_key: AssetKey, *, job_name: Optional[str] = None, name: Optional[str] = None, minimum_interval_seconds: Optional[int] = None, description: Optional[str] = None, job: Optional[ExecutableDefinition] = None, jobs: Optional[Sequence[ExecutableDefinition]] = None, default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, required_resource_keys: Optional[set[str]] = None, tags: Optional[Mapping[str, str]] = None, metadata: Optional[RawMetadataMapping] = None, ) -> Callable[ [ AssetMaterializationFunction, ], AssetSensorDefinition, ]:
案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 @asset_sensor( asset_key=AssetKey("oneday_clips_srouce_data"), required_resource_keys={'io_manager'}, job_name='clip_task_todo_job' ) def auto_update_clips_parts_sensor(context: SensorEvaluationContext, event_log_entry: EventLogEntry): dagster_event = event_log_entry.dagster_event if not (dagster_event and dagster_event.event_type_value == DagsterEventType.ASSET_MATERIALIZATION.value): return None asset_materialization = dagster_event.event_specific_data.materialization asset_key = asset_materialization.asset_key partition = asset_materialization.partition context.log.info(f"检测到资产物化: {asset_key}, 分区: {partition}") try: context.log.info(f"尝试读取资产数据: {asset_key}, 分区: {partition}") # 构建输入上下文并加载数据 input_context = build_input_context( asset_key=asset_key, partition_key=partition, asset_partitions_def=daily_task_partitions, instance=context.instance ) asset_data = context.resources.io_manager.load_input(input_context) if 'clip_data' not in asset_data: context.log.warning(f"资产数据中缺少 'clip_data' 字段") return None clip_data = asset_data['clip_data'] # 提取所有剪辑名称 all_clips: List[str] = [] for clip_names in clip_data.values(): if isinstance(clip_names, list): for v in clip_names: all_clips.append(v["data_name"]) if not all_clips: context.log.info("未找到任何剪辑数据") return None # 获取已注册的分区 registered_clips: Set[str] = set(context.instance.get_dynamic_partitions("clip_partitions")) # 找出需要添加的新剪辑 new_clips = [clip for clip in all_clips if clip not in registered_clips] if not new_clips: context.log.info("没有新的剪辑需要添加") else: # 只添加新的剪辑分区 context.instance.add_dynamic_partitions("clip_partitions", new_clips) context.log.info(f"成功添加 {len(new_clips)} 个新Clip分区") # 记录元数据信息 if asset_materialization.metadata: metadata = { key: metadata_value.value for key, metadata_value in asset_materialization.metadata.items() } context.log.info(f"资产元数据: {metadata}") # 只有在有新剪辑时才触发运行 if new_clips: run_requests = [] for clip_name in new_clips: run_requests.append(RunRequest(partition_key=clip_name,tags={'task_group':partition})) context.log.info(f"请求下游处理 {clip_name}") return run_requests else: return None except KeyError as e: context.log.error(f"数据格式错误,缺少必要的键: {e}") return None except ValueError as e: context.log.error(f"数据值错误: {e}") return None except Exception as e: context.log.error(f"处理资产数据时发生意外错误: {e}") # 可以选择重新抛出或返回 None 取决于业务需求 return None
dagster 1 2 3 4 5 6 7 8 9 10 11 12 13 14 query { repositoriesOrError { ... on RepositoryConnection { nodes { name location { name } } } } } curl -X POST http://10.226.1.155/graphql \ -H "Content-Type: application/json" \ -d '{"query": "query { repositoriesOrError { ... on RepositoryConnection { nodes { name location { name } } } } }"}' 输出会告诉你正确的 repositoryName 和 repositoryLocationName
1 网页访问 http://10.226.1.155/graphql:浏览器默认使用 HTTP 协议的标准端口 80,所以 URL 中省略了 :80 也能正常工作。
1 对外提供服务的,可能通过 LoadBalancer 或 Ingress 暴露。
DagsterGraphQLClient 查看任务执行状态 1 2 3 4 5 6 7 8 try: status: DagsterRunStatus = client.get_run_status("6aff29ce-11b9-4ca4-a8d8-7b9851af3681") if status == DagsterRunStatus.SUCCESS: print("0") else: print("1") except Exception as exc: raise exc
提交任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from dagster_graphql import DagsterGraphQLClient,DagsterGraphQLClientError from dagster import DagsterRunStatus from datetime import datetime import pytz client = DagsterGraphQLClient("10.226.3.135", port_number=80) partition = "CD701_LS6C3G0Y6RA400907_2025-10-16_08-29-55" shanghai_time = datetime.now(pytz.timezone('Asia/Shanghai')) date = shanghai_time.strftime("%Y-%m-%d") with open("error_time_align.txt","r",encoding="utf-8") as f: clip = f.readline().strip() while clip: try: new_run_id: str = client.submit_job_execution( "dynamic_annotation_todo_job", repository_location_name="skdata-code", repository_name="__repository__", run_config={}, tags={"dagster/partition": clip,"user":"shell","date":date} ) clip = f.readline().strip() except DagsterGraphQLClientError as exc: raise exc
run_config 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 import yaml base_config = r""" ops: clip_task_behaviorlabel_todo: config: exe_args: - -u - "1" - -d - "1" - -v - v0.0.7 - -c - "{}" - -r - "[[root_path]]base_static_dataset/" - --Tr - "[[root_path]]sub_dataset/4d_lable/track_data/" - --m1up - "[[root_path]]sub_dataset/lable/lable1/[[image_version]]/" - --m2up - "[[root_path]]sub_dataset/lable/lable2/[[image_version]]/" exe_command: - /bin/bash - ./run2.sh image_name: changan-map.tencentcloudcr.com/train/dat_lable image_pull_policy: Always image_version: v0.0.7 namespace: dagster-app-test pull_secrets: - name: tenc-secret resources: limits: cpu: "6" memory: 10Gi requests: cpu: "4" memory: 8Gi timeout_seconds: 36000 clip_task_det2d_todo: config: exe_args: - --task - "{}" - --in_dir - "[[dataset_path]]" - --out_dir - "[[root_path]]sub_dataset/4d_lable/mo_data/[[image_version]]/" exe_command: - /bin/bash - run_det2d.sh image_name: changan-map.tencentcloudcr.com/train/static_dynamic_annotation image_pull_policy: Always image_version: v2.2 namespace: dagster-app-test pod_spec_config: affinity: node_affinity: required_during_scheduling_ignored_during_execution: node_selector_terms: - match_expressions: - key: kubernetes.io/hostname operator: In values: - 10.226.18.16 - 10.226.18.3 pull_secrets: - name: tenc-secret resources: limits: cpu: "8" memory: 16Gi tke.cloud.tencent.com/qgpu-core: "20" tke.cloud.tencent.com/qgpu-memory: "6" requests: cpu: "4" memory: 8Gi tke.cloud.tencent.com/qgpu-core: "20" tke.cloud.tencent.com/qgpu-memory: "6" timeout_seconds: 3600 clip_task_det3d_todo: config: exe_args: - --task - "{}" - --task_type - det3d - --infer_data_root - "[[dataset_path]]" - --pred_dir - "[[root_path]]sub_dataset/4d_lable/mo_data/[[image_version]]/" exe_command: - /bin/bash - ./dagster_entrypoint.sh image_name: changan-map.tencentcloudcr.com/train/static_dynamic_annotation image_pull_policy: Always image_version: v2.2 namespace: dagster-app-test pod_spec_config: affinity: node_affinity: required_during_scheduling_ignored_during_execution: node_selector_terms: - match_expressions: - key: kubernetes.io/hostname operator: In values: - 10.226.18.16 - 10.226.18.3 volumes: - empty_dir: medium: Memory size_limit: 32Gi name: dshm pull_secrets: - name: tenc-secret resources: limits: cpu: "8" memory: 32Gi tke.cloud.tencent.com/qgpu-core: "20" tke.cloud.tencent.com/qgpu-memory: "6" requests: cpu: "4" memory: 16Gi tke.cloud.tencent.com/qgpu-core: "20" tke.cloud.tencent.com/qgpu-memory: "6" timeout_seconds: 3600 volume_mounts: - mount_path: /dev/shm name: dshm clip_task_labelstorage_todo: config: exe_args: - -d - "1" - -v - v0.0.7 - -c - "{}" - --m2up - "[[root_path]]sub_dataset/lable/lable2/[[image_version]]/" exe_command: - /bin/bash - ./run3.sh image_name: changan-map.tencentcloudcr.com/train/dat_lable_mongodb image_pull_policy: Always image_version: v0.0.7 namespace: dagster-app-test pull_secrets: - name: tenc-secret resources: limits: cpu: "4" memory: 8Gi requests: cpu: "2" memory: 4Gi timeout_seconds: 36000 clip_task_mot3d_todo: config: exe_args: - --task - "{}" - --task_type - mot3d - --pkl_path - "[[root_path]]base_static_dataset/" - --box3d_path - "[[root_path]]sub_dataset/4d_lable/mo_data/[[image_version]]/" - --out_path - "[[root_path]]sub_dataset/4d_lable/track_data/[[image_version]]/" exe_command: - /bin/bash - ./dagster_entrypoint.sh image_name: changan-map.tencentcloudcr.com/train/static_dynamic_annotation image_pull_policy: Always image_version: v2.2 namespace: dagster-app-test pull_secrets: - name: tenc-secret resources: limits: cpu: "8" memory: 32Gi requests: cpu: "4" memory: 16Gi timeout_seconds: 3600 clip_task_otherlabel_todo: config: exe_args: - -u - "1" - -d - "1" - -v - v0.0.7 - -c - "{}" - -r - "[[root_path]]base_static_dataset/" - --m1up - "[[root_path]]sub_dataset/lable/lable1/[[image_version]]/" exe_command: - /bin/bash - ./run1.sh image_name: changan-map.tencentcloudcr.com/train/dat_lable image_pull_policy: Always image_version: v0.0.7 namespace: dagster-app-test pull_secrets: - name: tenc-secret resources: limits: cpu: "6" memory: 10Gi requests: cpu: "4" memory: 8Gi timeout_seconds: 36000 """.replace("{}","2026-01-28") dic_config = yaml.load(base_config) try: new_run_id: str = client.submit_job_execution( "dynamic_annotation_todo_job", repository_location_name="skdata-code", repository_name="__repository__", run_config=dic_config ) except DagsterGraphQLClientError as exc: raise exc
查看任务打印的日志 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 queryer_run_info=''' query GetRunEvents($runId: String!) { runsOrError(filter: {runIds: [$runId]}) { ... on Runs { results { runId status eventConnection { events { __typename ... on MessageEvent { message timestamp level stepKey } } } assetMaterializations { partition } } } } }''' variables = { "runId":runId }
1 2 3 4 5 variables = { "runId":runId } result_str = client._execute(queryer_run_info, variables=variables).__str__()
查看时间范围内 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 # "clip_task_todo_job","FAILURE" query = """ query DailyPartitionStats($pipelineName: String!, $statuses: [RunStatus!]!, $startDate: Float!, $endDate: Float!) { runsOrError(filter: { pipelineName: $pipelineName, statuses:$statuses, createdBefore: $endDate, createdAfter: $startDate }) { ... on Runs { results { runId status } } } } """ variables = {"pipelineName":pipelineName,"statuses":[statuses],"startDate":startDate,"endDate": endDate} count = len(client._execute(query, variables=variables).get("runsOrError").get("results"))
项目project definition @definitions
create-dagster project dagster-tutorial cd dagster-tutorial python -m venv .venv source .venv/bin/activate
将你的项目安装为可编辑的包: pip install –editable . –group dev(pip版本问题后边的group可以删除)
. ├── pyproject.toml ├── README.md ├── src │ └── dagster_tutorial │ ├── init .py │ ├── definitions.py │ └── defs │ └── init .py └── tests └── init .py
pyproject.toml: defines the metadata and Python dependencies for the project. src/definitions.py: defines the main Dagster project object.
###dg check defs In Dagster, all defined objects (such as assets) need to be associated with a top-level Definitions object in order to be deployed.
1 2 All component YAML validated successfully. All definitions loaded successfully.
dg dev 资产assets @dg.asset
dg scaffold defs dagster.asset assets.py
dg launch –assets “*” dg launch –assets customers,orders,payments
资产间数据传输 在用户界面中配置资产 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import dagster as dg # Define the config schema class ForecastModelConfig(dg.Config): # lookback_window_days defaults to 30, but can be # overridden by the user. If you do not provide a # default, the user will need to provide a value. lookback_window_days: int = 30 # Access the config with the `config` parameter @dg.asset def forecast_model(config: ForecastModelConfig): print("Forecasting over time window:", config.lookback_window_days) # ...more code here defs = dg.Definitions(assets=[forecast_model])
资源 resource @dg.definitions
dg scaffold defs dagster.resources resources.py
DAG 资产依赖@dg.asset([deps=[….]]) @dg.asset_check(asset=’xx’) automation @dg.schedule(cron_schedule=’xxx’,target=’xx’) Scheduled jobs
Automate asset materialization:
eager automation condition
components partition sensors目录