dagster

dagster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@asset(group_name="task_todo",
required_resource_keys={"path_resource"},
partitions_def=clip_partitions,
config_schema={
"image_name": Field(
str,
default_value="changan-map.tencentcloudcr.com/train/dat_parse_sys",
description="解析镜像"
),
"image_version": Field(
str,
default_value="v3.1.2",
description="容器镜像版本"
),
"exe_command": Field(list,
default_value=["/workspace/parse_data.sh"],
description='启动命令'),
"exe_args": Field(list,
default_value=[
"--task", "{}",
"--type", "image",
"--in_dir", "[[raw_data]]",
"--out_dir", "[[raw_image]]",
],
description='命令参数'),
1
2
[[raw_data]] 是从required_resource_keys中去获取值

批量处理sensor备份

lidar_extcalib_sensor.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from datetime import datetime 
from dagster import DagsterEventType,asset_sensor,AssetKey,RunRequest,sensor,\
SensorEvaluationContext,\
EventLogEntry
from ..assets import clip_task_check_parse_todo
from typing import cast
from dagster_graphql import DagsterGraphQLClient
@sensor(
required_resource_keys={'graphql_client'},
minimum_interval_seconds=60,
job_name='lidar_extcalib_todo_job'
)
def auto_excute_lidar_extcalib_sensor(context: SensorEvaluationContext):
# dagster_event = event_log_entry.dagster_event

# if not (dagster_event and
# dagster_event.event_type_value == DagsterEventType.ASSET_MATERIALIZATION.value):
# return None

# asset_materialization = dagster_event.event_specific_data.materialization
# asset_key = asset_materialization.asset_key
# partition = asset_materialization.partition

client = cast(DagsterGraphQLClient,context.resources.graphql_client)
query = """
query PartitionHealthQuery($assetKey: AssetKeyInput!) {
assetNodeOrError(assetKey: $assetKey) {
... on AssetNode {
id
partitionKeysByDimension {
name
type
partitionKeys
__typename
}
assetPartitionStatuses {
... on TimePartitionStatuses {
ranges {
status
startTime
endTime
startKey
endKey
__typename
}
__typename
}
... on DefaultPartitionStatuses {
materializedPartitions
materializingPartitions
failedPartitions
__typename
}
... on MultiPartitionStatuses {
primaryDimensionName
ranges {
primaryDimStartKey
primaryDimEndKey
primaryDimStartTime
primaryDimEndTime
secondaryDim {
... on TimePartitionStatuses {
ranges {
status
startTime
endTime
startKey
endKey
__typename
}
__typename
}
... on DefaultPartitionStatuses {
materializedPartitions
materializingPartitions
failedPartitions
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
}
"""

assetkey = "clip_task_check_parse_todo"
variables = {
"assetKey": {
"path": [
f"{assetkey}"
]
}
}

# 获取clip_task_check_parse_todo物化成功的分区
result = client._execute(query=query,variables=variables)
# dict_keys(['materializedPartitions', 'materializingPartitions', 'failedPartitions', '__typename'])
assetPartitionStatuses = result.get("assetNodeOrError").get("assetPartitionStatuses")
check_parse_mdp = assetPartitionStatuses.get("materializedPartitions")
context.log.info(f"check_parse_mdp 长度:{len(check_parse_mdp)} 值:{check_parse_mdp}")

# 获取clip_task_time_algin_todo物化成功的分区
assetkey = "clip_task_time_algin_todo"
variables = {
"assetKey": {
"path": [
f"{assetkey}"
]
}
}
result = client._execute(query=query,variables=variables)
# dict_keys(['materializedPartitions', 'materializingPartitions', 'failedPartitions', '__typename'])
assetPartitionStatuses = result.get("assetNodeOrError").get("assetPartitionStatuses")
time_algin_mgp = assetPartitionStatuses.get("materializingPartitions")
context.log.info(f"time_algin_mgp 长度:{len(time_algin_mgp)} 值:{time_algin_mgp}")

if len(time_algin_mgp)>0:
context.log.info(f"lidar_extcalib_todo_job有正在物化的分区,不执行新的物化!")
return None

time_algin_mdp = assetPartitionStatuses.get("materializedPartitions")
context.log.info(f"time_algin_mdp 长度:{len(time_algin_mdp)} 值:{time_algin_mdp}")
time_algin_fdp = assetPartitionStatuses.get("failedPartitions")
context.log.info(f"time_algin_fdp 长度:{len(time_algin_fdp)} 值:{time_algin_fdp}")

# 失败的分区单独处理
result_partitions = set(check_parse_mdp).difference(set(time_algin_mdp)).difference(set(time_algin_fdp))
context.log.info(f"result_partitions 长度:{len(result_partitions)} 值:{result_partitions}")

if result_partitions:
run_requests = []
for clip_name in (result_partitions[0:200] if result_partitions>200 else result_partitions):
run_requests.append(RunRequest(partition_key=clip_name))
context.log.info(f"请求下游处理 {clip_name}")
return run_requests

dynamic_annotation_sensor.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from datetime import datetime 
from dagster import DagsterEventType,asset_sensor,AssetKey,RunRequest,sensor,\
SensorEvaluationContext,\
EventLogEntry
from ..assets import clip_task_check_parse_todo
from typing import cast
from dagster_graphql import DagsterGraphQLClient
@sensor(
# asset_key=AssetKey("clip_task_check_parse_todo"),
required_resource_keys={'graphql_client'},
minimum_interval_seconds=60,
job_name='dynamic_annotation_todo_job'
)
def dynamic_annotation_sensor(context: SensorEvaluationContext):
# dagster_event = event_log_entry.dagster_event

# if not (dagster_event and
# dagster_event.event_type_value == DagsterEventType.ASSET_MATERIALIZATION.value):
# return None

# asset_materialization = dagster_event.event_specific_data.materialization
# asset_key = asset_materialization.asset_key
# partition = asset_materialization.partition

client = cast(DagsterGraphQLClient,context.resources.graphql_client)
query = """
query PartitionHealthQuery($assetKey: AssetKeyInput!) {
assetNodeOrError(assetKey: $assetKey) {
... on AssetNode {
id
partitionKeysByDimension {
name
type
partitionKeys
__typename
}
assetPartitionStatuses {
... on TimePartitionStatuses {
ranges {
status
startTime
endTime
startKey
endKey
__typename
}
__typename
}
... on DefaultPartitionStatuses {
materializedPartitions
materializingPartitions
failedPartitions
__typename
}
... on MultiPartitionStatuses {
primaryDimensionName
ranges {
primaryDimStartKey
primaryDimEndKey
primaryDimStartTime
primaryDimEndTime
secondaryDim {
... on TimePartitionStatuses {
ranges {
status
startTime
endTime
startKey
endKey
__typename
}
__typename
}
... on DefaultPartitionStatuses {
materializedPartitions
materializingPartitions
failedPartitions
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
}
"""

assetkey = "clip_find_result_file_todo"
variables = {
"assetKey": {
"path": [
f"{assetkey}"
]
}
}

# 获取clip_find_result_file_todo物化成功的分区
result = client._execute(query=query,variables=variables)
# dict_keys(['materializedPartitions', 'materializingPartitions', 'failedPartitions', '__typename'])
assetPartitionStatuses = result.get("assetNodeOrError").get("assetPartitionStatuses")
find_result_file_mdp = assetPartitionStatuses.get("materializedPartitions")
context.log.info(f"find_result_file_mdp 长度:{len(find_result_file_mdp)} 值:{find_result_file_mdp}")
# 获取clip_task_det3d_todo物化成功的分区
assetkey = "clip_task_det3d_todo"
variables = {
"assetKey": {
"path": [
f"{assetkey}"
]
}
}
result = client._execute(query=query,variables=variables)

# dict_keys(['materializedPartitions', 'materializingPartitions', 'failedPartitions', '__typename'])
assetPartitionStatuses = result.get("assetNodeOrError").get("assetPartitionStatuses")
det3d_mgp = assetPartitionStatuses.get("materializingPartitions")
context.log.info(f"det3d_mgp 长度:{len(det3d_mgp)} 值:{det3d_mgp}")

if len(det3d_mgp)>0:
context.log.info(f"clip_task_det3d_todo有正在物化的分区,不执行新的物化!")
return None

det3d_mdp = assetPartitionStatuses.get("materializedPartitions")
context.log.info(f"det3d_mdp 长度:{len(det3d_mdp)} 值:{det3d_mdp}")
det3d_fdp = assetPartitionStatuses.get("failedPartitions")
context.log.info(f"det3d_fdp 长度:{len(det3d_fdp)} 值:{det3d_fdp}")

# 失败的分区单独处理
result_partitions = set(find_result_file_mdp).difference(set(det3d_mdp)).difference(set(det3d_fdp))
context.log.info(f"result_partitions 长度:{len(result_partitions)} 值:{result_partitions}")

if result_partitions:
run_requests = []
for clip_name in (result_partitions[0:200] if result_partitions>200 else result_partitions):
run_requests.append(RunRequest(partition_key=clip_name))
context.log.info(f"请求下游处理 {clip_name}")
return run_requests

sensor写法备份

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from datetime import datetime 
from typing import List, Optional, Set
from dagster import DagsterEventType, DagsterRunStatus, EventRecordsFilter, RunRequest, RunStatusSensorContext, build_input_context, run_status_sensor, sensor,asset_sensor,AssetKey
from ..jobs.auto_task_jobs import *
from ..partitions.partition_defs import daily_task_partitions,clip_partitions



@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
minimum_interval_seconds=60,
monitored_jobs=[auto_get_task_job],
request_job=clip_task_todo_job
)
def one_day_clip_auto_job_sensor(context: RunStatusSensorContext):

run_id = context.dagster_run.run_id
instance = context.instance

partition_tag = context.dagster_run.tags.get("dagster/partition")

if partition_tag:
context.log.info(f"检测到作业 {context.dagster_run.job_name} 成功执行")
context.log.info(f"分区键: {partition_tag}")
#获取动态
registered_clips: Set[str] = set(instance.get_dynamic_partitions("auto_dynamic"))

#
asset_key=AssetKey("oneday_clips_srouce_data")

materialization_records = instance.get_event_records(
EventRecordsFilter(
event_type= DagsterEventType.ASSET_MATERIALIZATION,
asset_key=asset_key,
asset_partitions=[partition_tag]
),
limit=1
)

if not materialization_records:
context.log.warning(f"未找到分区 {partition_tag} 的物化记录")
return None

data_record= materialization_records[0]
materialization= data_record.event_log_entry.dagster_event.step_materialization_data
data_metadata= materialization.materialization.metadata

for metadata in data_metadata:
context.log.info(metadata)



# 构建输入上下文并加载数据
# input_context = build_input_context(
# asset_key=asset_key,
# partition_key=partition_tag,
# asset_partitions_def=daily_task_partitions,
# instance=instance
# )

# asset_data = context.resources.io_manager.load_input(input_context)

# if 'clip_data' not in asset_data:
# context.log.warning(f"资产数据中缺少 'clip_data' 字段")
# return None

# clip_data = asset_data['clip_data']

# # 提取所有剪辑名称
# all_clips: List[str] = []
# for clip_names in clip_data.values():
# if isinstance(clip_names, list):
# for v in clip_names:
# all_clips.append(v["data_name"])

# if not all_clips:
# context.log.info("未找到任何clip数据")
# return None

# # 获取已注册的分区
# registered_clips: Set[str] = set(context.instance.get_dynamic_partitions("clip_partitions"))

# new_clips= set(all_clips)-registered_clips

# if not new_clips:
# context.log.info("没有新的剪辑需要添加")
# else:
# # 只添加新的剪辑分区
# context.instance.add_dynamic_partitions("clip_partitions", new_clips)
# context.log.info(f"成功添加 {len(new_clips)} 个新Clip分区")

# # 只有在有新剪辑时才触发运行
# if new_clips:
# run_requests = []
# for clip_name in new_clips:
# run_requests.append(RunRequest(partition_key=clip_name,tags={'task_group':partition_tag}))
# context.log.info(f"请求下游处理 {clip_name}")
# return run_requests
# else:
return None