dagster

dagster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@asset(group_name="task_todo",
required_resource_keys={"path_resource"},
partitions_def=clip_partitions,
config_schema={
"image_name": Field(
str,
default_value="changan-map.tencentcloudcr.com/train/dat_parse_sys",
description="解析镜像"
),
"image_version": Field(
str,
default_value="v3.1.2",
description="容器镜像版本"
),
"exe_command": Field(list,
default_value=["/workspace/parse_data.sh"],
description='启动命令'),
"exe_args": Field(list,
default_value=[
"--task", "{}",
"--type", "image",
"--in_dir", "[[raw_data]]",
"--out_dir", "[[raw_image]]",
],
description='命令参数'),
1
2
[[raw_data]] 是从required_resource_keys中去获取值

sensor

@sensor

定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def sensor(
job_name: Optional[str] = None,
*,
name: Optional[str] = None,
minimum_interval_seconds: Optional[int] = None,
description: Optional[str] = None,
job: Optional[ExecutableDefinition] = None,
jobs: Optional[Sequence[ExecutableDefinition]] = None,
default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED,
asset_selection: Optional[CoercibleToAssetSelection] = None,
required_resource_keys: Optional[set[str]] = None,
tags: Optional[Mapping[str, str]] = None,
metadata: Optional[RawMetadataMapping] = None,
target: Optional[
Union[
"CoercibleToAssetSelection",
"AssetsDefinition",
"JobDefinition",
"UnresolvedAssetJobDefinition",
]
] = None,
) -> Callable[[RawSensorEvaluationFunction], SensorDefinition]:

案例一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from datetime import datetime 
from dagster import DagsterEventType,asset_sensor,AssetKey,RunRequest,sensor,\
SensorEvaluationContext,\
EventLogEntry
from ..assets import clip_task_check_parse_todo
from typing import cast
from dagster_graphql import DagsterGraphQLClient
@sensor(
required_resource_keys={'graphql_client'},
minimum_interval_seconds=60,
job_name='lidar_extcalib_todo_job'
)
def auto_excute_lidar_extcalib_sensor(context: SensorEvaluationContext):
# dagster_event = event_log_entry.dagster_event

# if not (dagster_event and
# dagster_event.event_type_value == DagsterEventType.ASSET_MATERIALIZATION.value):
# return None

# asset_materialization = dagster_event.event_specific_data.materialization
# asset_key = asset_materialization.asset_key
# partition = asset_materialization.partition

client = cast(DagsterGraphQLClient,context.resources.graphql_client)
query = """
query PartitionHealthQuery($assetKey: AssetKeyInput!) {
assetNodeOrError(assetKey: $assetKey) {
... on AssetNode {
id
partitionKeysByDimension {
name
type
partitionKeys
__typename
}
assetPartitionStatuses {
... on TimePartitionStatuses {
ranges {
status
startTime
endTime
startKey
endKey
__typename
}
__typename
}
... on DefaultPartitionStatuses {
materializedPartitions
materializingPartitions
failedPartitions
__typename
}
... on MultiPartitionStatuses {
primaryDimensionName
ranges {
primaryDimStartKey
primaryDimEndKey
primaryDimStartTime
primaryDimEndTime
secondaryDim {
... on TimePartitionStatuses {
ranges {
status
startTime
endTime
startKey
endKey
__typename
}
__typename
}
... on DefaultPartitionStatuses {
materializedPartitions
materializingPartitions
failedPartitions
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
}
"""

assetkey = "clip_task_check_parse_todo"
variables = {
"assetKey": {
"path": [
f"{assetkey}"
]
}
}

# 获取clip_task_check_parse_todo物化成功的分区
result = client._execute(query=query,variables=variables)
# dict_keys(['materializedPartitions', 'materializingPartitions', 'failedPartitions', '__typename'])
assetPartitionStatuses = result.get("assetNodeOrError").get("assetPartitionStatuses")
check_parse_mdp = assetPartitionStatuses.get("materializedPartitions")
context.log.info(f"check_parse_mdp 长度:{len(check_parse_mdp)} 值:{check_parse_mdp}")

# 获取clip_task_time_algin_todo物化成功的分区
assetkey = "clip_task_time_algin_todo"
variables = {
"assetKey": {
"path": [
f"{assetkey}"
]
}
}
result = client._execute(query=query,variables=variables)
# dict_keys(['materializedPartitions', 'materializingPartitions', 'failedPartitions', '__typename'])
assetPartitionStatuses = result.get("assetNodeOrError").get("assetPartitionStatuses")
time_algin_mgp = assetPartitionStatuses.get("materializingPartitions")
context.log.info(f"time_algin_mgp 长度:{len(time_algin_mgp)} 值:{time_algin_mgp}")

if len(time_algin_mgp)>0:
context.log.info(f"lidar_extcalib_todo_job有正在物化的分区,不执行新的物化!")
return None

time_algin_mdp = assetPartitionStatuses.get("materializedPartitions")
context.log.info(f"time_algin_mdp 长度:{len(time_algin_mdp)} 值:{time_algin_mdp}")
time_algin_fdp = assetPartitionStatuses.get("failedPartitions")
context.log.info(f"time_algin_fdp 长度:{len(time_algin_fdp)} 值:{time_algin_fdp}")

# 失败的分区单独处理
result_partitions = set(check_parse_mdp).difference(set(time_algin_mdp)).difference(set(time_algin_fdp))
context.log.info(f"result_partitions 长度:{len(result_partitions)} 值:{result_partitions}")

if result_partitions:
run_requests = []
for clip_name in (result_partitions[0:200] if result_partitions>200 else result_partitions):
run_requests.append(RunRequest(partition_key=clip_name))
context.log.info(f"请求下游处理 {clip_name}")
return run_requests

案例二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from datetime import datetime 
from dagster import DagsterEventType,asset_sensor,AssetKey,RunRequest,sensor,\
SensorEvaluationContext,\
EventLogEntry
from ..assets import clip_task_check_parse_todo
from typing import cast
from dagster_graphql import DagsterGraphQLClient
@sensor(
# asset_key=AssetKey("clip_task_check_parse_todo"),
required_resource_keys={'graphql_client'},
minimum_interval_seconds=60,
job_name='dynamic_annotation_todo_job'
)
def dynamic_annotation_sensor(context: SensorEvaluationContext):
# dagster_event = event_log_entry.dagster_event

# if not (dagster_event and
# dagster_event.event_type_value == DagsterEventType.ASSET_MATERIALIZATION.value):
# return None

# asset_materialization = dagster_event.event_specific_data.materialization
# asset_key = asset_materialization.asset_key
# partition = asset_materialization.partition

client = cast(DagsterGraphQLClient,context.resources.graphql_client)
query = """
query PartitionHealthQuery($assetKey: AssetKeyInput!) {
assetNodeOrError(assetKey: $assetKey) {
... on AssetNode {
id
partitionKeysByDimension {
name
type
partitionKeys
__typename
}
assetPartitionStatuses {
... on TimePartitionStatuses {
ranges {
status
startTime
endTime
startKey
endKey
__typename
}
__typename
}
... on DefaultPartitionStatuses {
materializedPartitions
materializingPartitions
failedPartitions
__typename
}
... on MultiPartitionStatuses {
primaryDimensionName
ranges {
primaryDimStartKey
primaryDimEndKey
primaryDimStartTime
primaryDimEndTime
secondaryDim {
... on TimePartitionStatuses {
ranges {
status
startTime
endTime
startKey
endKey
__typename
}
__typename
}
... on DefaultPartitionStatuses {
materializedPartitions
materializingPartitions
failedPartitions
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
}
"""

assetkey = "clip_find_result_file_todo"
variables = {
"assetKey": {
"path": [
f"{assetkey}"
]
}
}

# 获取clip_find_result_file_todo物化成功的分区
result = client._execute(query=query,variables=variables)
# dict_keys(['materializedPartitions', 'materializingPartitions', 'failedPartitions', '__typename'])
assetPartitionStatuses = result.get("assetNodeOrError").get("assetPartitionStatuses")
find_result_file_mdp = assetPartitionStatuses.get("materializedPartitions")
context.log.info(f"find_result_file_mdp 长度:{len(find_result_file_mdp)} 值:{find_result_file_mdp}")
# 获取clip_task_det3d_todo物化成功的分区
assetkey = "clip_task_det3d_todo"
variables = {
"assetKey": {
"path": [
f"{assetkey}"
]
}
}
result = client._execute(query=query,variables=variables)

# dict_keys(['materializedPartitions', 'materializingPartitions', 'failedPartitions', '__typename'])
assetPartitionStatuses = result.get("assetNodeOrError").get("assetPartitionStatuses")
det3d_mgp = assetPartitionStatuses.get("materializingPartitions")
context.log.info(f"det3d_mgp 长度:{len(det3d_mgp)} 值:{det3d_mgp}")

if len(det3d_mgp)>0:
context.log.info(f"clip_task_det3d_todo有正在物化的分区,不执行新的物化!")
return None

det3d_mdp = assetPartitionStatuses.get("materializedPartitions")
context.log.info(f"det3d_mdp 长度:{len(det3d_mdp)} 值:{det3d_mdp}")
det3d_fdp = assetPartitionStatuses.get("failedPartitions")
context.log.info(f"det3d_fdp 长度:{len(det3d_fdp)} 值:{det3d_fdp}")

# 失败的分区单独处理
result_partitions = set(find_result_file_mdp).difference(set(det3d_mdp)).difference(set(det3d_fdp))
context.log.info(f"result_partitions 长度:{len(result_partitions)} 值:{result_partitions}")

if result_partitions:
run_requests = []
for clip_name in (result_partitions[0:200] if result_partitions>200 else result_partitions):
run_requests.append(RunRequest(partition_key=clip_name))
context.log.info(f"请求下游处理 {clip_name}")
return run_requests

@run_status_sensor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from datetime import datetime 
from typing import List, Optional, Set
from dagster import DagsterEventType, DagsterRunStatus, EventRecordsFilter, RunRequest, RunStatusSensorContext, build_input_context, run_status_sensor, sensor,asset_sensor,AssetKey
from ..jobs.auto_task_jobs import *
from ..partitions.partition_defs import daily_task_partitions,clip_partitions



@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
minimum_interval_seconds=60,
monitored_jobs=[auto_get_task_job],
request_job=clip_task_todo_job
)
def one_day_clip_auto_job_sensor(context: RunStatusSensorContext):

run_id = context.dagster_run.run_id
instance = context.instance

partition_tag = context.dagster_run.tags.get("dagster/partition")

if partition_tag:
context.log.info(f"检测到作业 {context.dagster_run.job_name} 成功执行")
context.log.info(f"分区键: {partition_tag}")
#获取动态
registered_clips: Set[str] = set(instance.get_dynamic_partitions("auto_dynamic"))

#
asset_key=AssetKey("oneday_clips_srouce_data")

materialization_records = instance.get_event_records(
EventRecordsFilter(
event_type= DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
asset_partitions=[partition_tag]
),
limit=1
)

if not materialization_records:
context.log.warning(f"未找到分区 {partition_tag} 的物化记录")
return None

data_record= materialization_records[0]
materialization= data_record.event_log_entry.dagster_event.step_materialization_data
data_metadata= materialization.materialization.metadata

for metadata in data_metadata:
context.log.info(metadata)



# 构建输入上下文并加载数据
# input_context = build_input_context(
# asset_key=asset_key,
# partition_key=partition_tag,
# asset_partitions_def=daily_task_partitions,
# instance=instance
# )

# asset_data = context.resources.io_manager.load_input(input_context)

# if 'clip_data' not in asset_data:
# context.log.warning(f"资产数据中缺少 'clip_data' 字段")
# return None

# clip_data = asset_data['clip_data']

# # 提取所有剪辑名称
# all_clips: List[str] = []
# for clip_names in clip_data.values():
# if isinstance(clip_names, list):
# for v in clip_names:
# all_clips.append(v["data_name"])

# if not all_clips:
# context.log.info("未找到任何clip数据")
# return None

# # 获取已注册的分区
# registered_clips: Set[str] = set(context.instance.get_dynamic_partitions("clip_partitions"))

# new_clips= set(all_clips)-registered_clips

# if not new_clips:
# context.log.info("没有新的剪辑需要添加")
# else:
# # 只添加新的剪辑分区
# context.instance.add_dynamic_partitions("clip_partitions", new_clips)
# context.log.info(f"成功添加 {len(new_clips)} 个新Clip分区")

# # 只有在有新剪辑时才触发运行
# if new_clips:
# run_requests = []
# for clip_name in new_clips:
# run_requests.append(RunRequest(partition_key=clip_name,tags={'task_group':partition_tag}))
# context.log.info(f"请求下游处理 {clip_name}")
# return run_requests
# else:
return None

@asset_sensor

定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def asset_sensor(
asset_key: AssetKey,
*,
job_name: Optional[str] = None,
name: Optional[str] = None,
minimum_interval_seconds: Optional[int] = None,
description: Optional[str] = None,
job: Optional[ExecutableDefinition] = None,
jobs: Optional[Sequence[ExecutableDefinition]] = None,
default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED,
required_resource_keys: Optional[set[str]] = None,
tags: Optional[Mapping[str, str]] = None,
metadata: Optional[RawMetadataMapping] = None,
) -> Callable[
[
AssetMaterializationFunction,
],
AssetSensorDefinition,
]:

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@asset_sensor(
asset_key=AssetKey("oneday_clips_srouce_data"),
required_resource_keys={'io_manager'},
job_name='clip_task_todo_job'
)
def auto_update_clips_parts_sensor(context: SensorEvaluationContext, event_log_entry: EventLogEntry):
dagster_event = event_log_entry.dagster_event

if not (dagster_event and
dagster_event.event_type_value == DagsterEventType.ASSET_MATERIALIZATION.value):
return None

asset_materialization = dagster_event.event_specific_data.materialization
asset_key = asset_materialization.asset_key
partition = asset_materialization.partition

context.log.info(f"检测到资产物化: {asset_key}, 分区: {partition}")

try:
context.log.info(f"尝试读取资产数据: {asset_key}, 分区: {partition}")

# 构建输入上下文并加载数据
input_context = build_input_context(
asset_key=asset_key,
partition_key=partition,
asset_partitions_def=daily_task_partitions,
instance=context.instance
)

asset_data = context.resources.io_manager.load_input(input_context)

if 'clip_data' not in asset_data:
context.log.warning(f"资产数据中缺少 'clip_data' 字段")
return None

clip_data = asset_data['clip_data']

# 提取所有剪辑名称
all_clips: List[str] = []
for clip_names in clip_data.values():
if isinstance(clip_names, list):
for v in clip_names:
all_clips.append(v["data_name"])

if not all_clips:
context.log.info("未找到任何剪辑数据")
return None

# 获取已注册的分区
registered_clips: Set[str] = set(context.instance.get_dynamic_partitions("clip_partitions"))

# 找出需要添加的新剪辑
new_clips = [clip for clip in all_clips if clip not in registered_clips]

if not new_clips:
context.log.info("没有新的剪辑需要添加")
else:
# 只添加新的剪辑分区
context.instance.add_dynamic_partitions("clip_partitions", new_clips)
context.log.info(f"成功添加 {len(new_clips)} 个新Clip分区")

# 记录元数据信息
if asset_materialization.metadata:
metadata = {
key: metadata_value.value
for key, metadata_value in asset_materialization.metadata.items()
}
context.log.info(f"资产元数据: {metadata}")

# 只有在有新剪辑时才触发运行
if new_clips:
run_requests = []
for clip_name in new_clips:
run_requests.append(RunRequest(partition_key=clip_name,tags={'task_group':partition}))
context.log.info(f"请求下游处理 {clip_name}")
return run_requests
else:
return None

except KeyError as e:
context.log.error(f"数据格式错误,缺少必要的键: {e}")
return None
except ValueError as e:
context.log.error(f"数据值错误: {e}")
return None
except Exception as e:
context.log.error(f"处理资产数据时发生意外错误: {e}")
# 可以选择重新抛出或返回 None 取决于业务需求
return None

dagster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
query {
repositoriesOrError {
... on RepositoryConnection {
nodes {
name
location { name }
}
}
}
}
curl -X POST http://10.226.1.155/graphql \
-H "Content-Type: application/json" \
-d '{"query": "query { repositoriesOrError { ... on RepositoryConnection { nodes { name location { name } } } } }"}'
输出会告诉你正确的 repositoryName 和 repositoryLocationName
1
网页访问 http://10.226.1.155/graphql:浏览器默认使用 HTTP 协议的标准端口 80,所以 URL 中省略了 :80 也能正常工作。
1
对外提供服务的,可能通过 LoadBalancer 或 Ingress 暴露。

DagsterGraphQLClient

查看任务执行状态

1
2
3
4
5
6
7
8
try:
status: DagsterRunStatus = client.get_run_status("6aff29ce-11b9-4ca4-a8d8-7b9851af3681")
if status == DagsterRunStatus.SUCCESS:
print("0")
else:
print("1")
except Exception as exc:
raise exc

提交任务

tags
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from dagster_graphql import DagsterGraphQLClient,DagsterGraphQLClientError
from dagster import DagsterRunStatus
from datetime import datetime
import pytz
client = DagsterGraphQLClient("10.226.3.135", port_number=80)
partition = "CD701_LS6C3G0Y6RA400907_2025-10-16_08-29-55"
shanghai_time = datetime.now(pytz.timezone('Asia/Shanghai'))
date = shanghai_time.strftime("%Y-%m-%d")
with open("error_time_align.txt","r",encoding="utf-8") as f:
clip = f.readline().strip()
while clip:
try:
new_run_id: str = client.submit_job_execution(
"dynamic_annotation_todo_job",
repository_location_name="skdata-code",
repository_name="__repository__",
run_config={},
tags={"dagster/partition": clip,"user":"shell","date":date}
)
clip = f.readline().strip()
except DagsterGraphQLClientError as exc:
raise exc
run_config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
import yaml
base_config = r"""
ops:
clip_task_behaviorlabel_todo:
config:
exe_args:
- -u
- "1"
- -d
- "1"
- -v
- v0.0.7
- -c
- "{}"
- -r
- "[[root_path]]base_static_dataset/"
- --Tr
- "[[root_path]]sub_dataset/4d_lable/track_data/"
- --m1up
- "[[root_path]]sub_dataset/lable/lable1/[[image_version]]/"
- --m2up
- "[[root_path]]sub_dataset/lable/lable2/[[image_version]]/"
exe_command:
- /bin/bash
- ./run2.sh
image_name: changan-map.tencentcloudcr.com/train/dat_lable
image_pull_policy: Always
image_version: v0.0.7
namespace: dagster-app-test
pull_secrets:
- name: tenc-secret
resources:
limits:
cpu: "6"
memory: 10Gi
requests:
cpu: "4"
memory: 8Gi
timeout_seconds: 36000
clip_task_det2d_todo:
config:
exe_args:
- --task
- "{}"
- --in_dir
- "[[dataset_path]]"
- --out_dir
- "[[root_path]]sub_dataset/4d_lable/mo_data/[[image_version]]/"
exe_command:
- /bin/bash
- run_det2d.sh
image_name: changan-map.tencentcloudcr.com/train/static_dynamic_annotation
image_pull_policy: Always
image_version: v2.2
namespace: dagster-app-test
pod_spec_config:
affinity:
node_affinity:
required_during_scheduling_ignored_during_execution:
node_selector_terms:
- match_expressions:
- key: kubernetes.io/hostname
operator: In
values:
- 10.226.18.16
- 10.226.18.3
pull_secrets:
- name: tenc-secret
resources:
limits:
cpu: "8"
memory: 16Gi
tke.cloud.tencent.com/qgpu-core: "20"
tke.cloud.tencent.com/qgpu-memory: "6"
requests:
cpu: "4"
memory: 8Gi
tke.cloud.tencent.com/qgpu-core: "20"
tke.cloud.tencent.com/qgpu-memory: "6"
timeout_seconds: 3600
clip_task_det3d_todo:
config:
exe_args:
- --task
- "{}"
- --task_type
- det3d
- --infer_data_root
- "[[dataset_path]]"
- --pred_dir
- "[[root_path]]sub_dataset/4d_lable/mo_data/[[image_version]]/"
exe_command:
- /bin/bash
- ./dagster_entrypoint.sh
image_name: changan-map.tencentcloudcr.com/train/static_dynamic_annotation
image_pull_policy: Always
image_version: v2.2
namespace: dagster-app-test
pod_spec_config:
affinity:
node_affinity:
required_during_scheduling_ignored_during_execution:
node_selector_terms:
- match_expressions:
- key: kubernetes.io/hostname
operator: In
values:
- 10.226.18.16
- 10.226.18.3
volumes:
- empty_dir:
medium: Memory
size_limit: 32Gi
name: dshm
pull_secrets:
- name: tenc-secret
resources:
limits:
cpu: "8"
memory: 32Gi
tke.cloud.tencent.com/qgpu-core: "20"
tke.cloud.tencent.com/qgpu-memory: "6"
requests:
cpu: "4"
memory: 16Gi
tke.cloud.tencent.com/qgpu-core: "20"
tke.cloud.tencent.com/qgpu-memory: "6"
timeout_seconds: 3600
volume_mounts:
- mount_path: /dev/shm
name: dshm
clip_task_labelstorage_todo:
config:
exe_args:
- -d
- "1"
- -v
- v0.0.7
- -c
- "{}"
- --m2up
- "[[root_path]]sub_dataset/lable/lable2/[[image_version]]/"
exe_command:
- /bin/bash
- ./run3.sh
image_name: changan-map.tencentcloudcr.com/train/dat_lable_mongodb
image_pull_policy: Always
image_version: v0.0.7
namespace: dagster-app-test
pull_secrets:
- name: tenc-secret
resources:
limits:
cpu: "4"
memory: 8Gi
requests:
cpu: "2"
memory: 4Gi
timeout_seconds: 36000
clip_task_mot3d_todo:
config:
exe_args:
- --task
- "{}"
- --task_type
- mot3d
- --pkl_path
- "[[root_path]]base_static_dataset/"
- --box3d_path
- "[[root_path]]sub_dataset/4d_lable/mo_data/[[image_version]]/"
- --out_path
- "[[root_path]]sub_dataset/4d_lable/track_data/[[image_version]]/"
exe_command:
- /bin/bash
- ./dagster_entrypoint.sh
image_name: changan-map.tencentcloudcr.com/train/static_dynamic_annotation
image_pull_policy: Always
image_version: v2.2
namespace: dagster-app-test
pull_secrets:
- name: tenc-secret
resources:
limits:
cpu: "8"
memory: 32Gi
requests:
cpu: "4"
memory: 16Gi
timeout_seconds: 3600
clip_task_otherlabel_todo:
config:
exe_args:
- -u
- "1"
- -d
- "1"
- -v
- v0.0.7
- -c
- "{}"
- -r
- "[[root_path]]base_static_dataset/"
- --m1up
- "[[root_path]]sub_dataset/lable/lable1/[[image_version]]/"
exe_command:
- /bin/bash
- ./run1.sh
image_name: changan-map.tencentcloudcr.com/train/dat_lable
image_pull_policy: Always
image_version: v0.0.7
namespace: dagster-app-test
pull_secrets:
- name: tenc-secret
resources:
limits:
cpu: "6"
memory: 10Gi
requests:
cpu: "4"
memory: 8Gi
timeout_seconds: 36000
""".replace("{}","2026-01-28")
dic_config = yaml.load(base_config)
try:
new_run_id: str = client.submit_job_execution(
"dynamic_annotation_todo_job",
repository_location_name="skdata-code",
repository_name="__repository__",
run_config=dic_config
)
except DagsterGraphQLClientError as exc:
raise exc

查看任务打印的日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
queryer_run_info='''
query GetRunEvents($runId: String!) {
runsOrError(filter: {runIds: [$runId]}) {
... on Runs {
results {
runId
status
eventConnection {
events {
__typename
... on MessageEvent {
message
timestamp
level
stepKey
}
}
}
assetMaterializations {
partition
}
}
}
}
}'''
variables = {
"runId":runId
}
1
2
3
4
5
variables = {
"runId":runId
}

result_str = client._execute(queryer_run_info, variables=variables).__str__()

查看时间范围内

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# "clip_task_todo_job","FAILURE"
query = """
query DailyPartitionStats($pipelineName: String!, $statuses: [RunStatus!]!, $startDate: Float!, $endDate: Float!) {
runsOrError(filter: {
pipelineName: $pipelineName,
statuses:$statuses,
createdBefore: $endDate,
createdAfter: $startDate
}) {
... on Runs {
results {
runId
status
}
}
}
}
"""
variables = {"pipelineName":pipelineName,"statuses":[statuses],"startDate":startDate,"endDate": endDate}
count = len(client._execute(query, variables=variables).get("runsOrError").get("results"))

项目project definition

@definitions

create-dagster project dagster-tutorial
cd dagster-tutorial
python -m venv .venv
source .venv/bin/activate

将你的项目安装为可编辑的包:
pip install –editable . –group dev(pip版本问题后边的group可以删除)

.
├── pyproject.toml
├── README.md
├── src
│ └── dagster_tutorial
│ ├── init.py
│ ├── definitions.py
│ └── defs
│ └── init.py
└── tests
└── init.py

pyproject.toml: defines the metadata and Python dependencies for the project.
src/definitions.py: defines the main Dagster project object.

###dg check defs
In Dagster, all defined objects (such as assets) need to be associated with a top-level Definitions object in order to be deployed.

1
2
All component YAML validated successfully.
All definitions loaded successfully.

dg dev

资产assets

@dg.asset

dg scaffold defs dagster.asset assets.py

dg launch –assets “*”
dg launch –assets customers,orders,payments

资产间数据传输

在用户界面中配置资产

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import dagster as dg


# Define the config schema
class ForecastModelConfig(dg.Config):
# lookback_window_days defaults to 30, but can be
# overridden by the user. If you do not provide a
# default, the user will need to provide a value.
lookback_window_days: int = 30


# Access the config with the `config` parameter
@dg.asset
def forecast_model(config: ForecastModelConfig):
print("Forecasting over time window:", config.lookback_window_days)
# ...more code here


defs = dg.Definitions(assets=[forecast_model])

资源 resource

@dg.definitions

dg scaffold defs dagster.resources resources.py

DAG

资产依赖@dg.asset([deps=[….]])

@dg.asset_check(asset=’xx’)

automation @dg.schedule(cron_schedule=’xxx’,target=’xx’)

Scheduled jobs

Automate asset materialization:

eager automation condition

components

partition

sensors目录