@@ -12,134 +12,105 @@ import (
1212 "github.com/coze-dev/coze-loop/backend/pkg/logs"
1313)
1414
15+ const noQueryParsed = "no_query_parsed"
16+
1517type ChatProcessor struct {}
1618
1719func (c * ChatProcessor ) Transform (ctx context.Context , spans loop_span.SpanList ) (loop_span.SpanList , error ) {
1820 for _ , span := range spans {
19- if span == nil || ! span .IsChatSpan () {
20- continue
21- }
22- if span .Input == "" {
21+ if span == nil {
2322 continue
2423 }
25- processedInput := c .extractLastUserMessage (ctx , span .Input )
26- if processedInput != "" {
27- span .Input = processedInput
24+ if span .IsModelSpan () {
25+ span .Input = c .processModelInput (ctx , span .Input )
2826 }
2927 }
3028 return spans , nil
3129}
3230
33- func (c * ChatProcessor ) extractLastUserMessage (ctx context.Context , input string ) string {
31+ func (c * ChatProcessor ) processModelInput (ctx context.Context , input string ) string {
32+ if input == "" {
33+ return noQueryParsed
34+ }
35+
3436 var inputMap map [string ]interface {}
3537 if err := sonic .UnmarshalString (input , & inputMap ); err != nil {
3638 logs .CtxDebug (ctx , "chat processor: input is not a valid JSON object" )
37- return ""
39+ return noQueryParsed
3840 }
3941
40- if messages := c .tryExtractFromStandardChat ( inputMap ); messages != nil {
41- return c . buildUserInput ( ctx , messages )
42+ if result , ok := c .tryProcessStandardChat ( ctx , inputMap ); ok {
43+ return result
4244 }
4345
44- if messages := c .tryExtractFromResponsesAPI ( inputMap ); messages != nil {
45- return c . buildUserInput ( ctx , messages )
46+ if result , ok := c .tryProcessResponsesAPI ( ctx , input , inputMap ); ok {
47+ return result
4648 }
4749
48- return ""
50+ return noQueryParsed
4951}
5052
51- func (c * ChatProcessor ) tryExtractFromStandardChat ( inputMap map [string ]interface {}) [] interface {} {
53+ func (c * ChatProcessor ) tryProcessStandardChat ( ctx context. Context , inputMap map [string ]interface {}) ( string , bool ) {
5254 messages , ok := inputMap ["messages" ].([]interface {})
5355 if ! ok || len (messages ) == 0 {
54- return nil
56+ return "" , false
57+ }
58+
59+ lastMsg , ok := messages [len (messages )- 1 ].(map [string ]interface {})
60+ if ! ok {
61+ return "" , false
62+ }
63+ role , _ := lastMsg ["role" ].(string )
64+ if role != "user" {
65+ return "" , true
66+ }
67+ // TODO: more compaction
68+ inputMap ["messages" ] = []interface {}{lastMsg }
69+ result , err := sonic .MarshalString (inputMap )
70+ if err != nil {
71+ logs .CtxWarn (ctx , "chat processor: failed to marshal input: %v" , err )
72+ return "" , true
5573 }
56- return c . filterLastUserMessage ( messages )
74+ return result , true
5775}
5876
59- func (c * ChatProcessor ) tryExtractFromResponsesAPI ( inputMap map [string ]interface {}) [] interface {} {
77+ func (c * ChatProcessor ) tryProcessResponsesAPI ( ctx context. Context , input string , inputMap map [string ]interface {}) ( string , bool ) {
6078 inputField , ok := inputMap ["input" ]
6179 if ! ok {
62- return nil
80+ return "" , false
6381 }
6482
6583 switch v := inputField .(type ) {
6684 case string :
67- if v != "" {
68- return []interface {}{
69- map [string ]interface {}{
70- "role" : "user" ,
71- "content" : v ,
72- },
73- }
74- }
85+ return input , true
7586 case []interface {}:
76- messages := c .convertResponsesAPIToStandard (v )
77- return c .filterLastUserMessage (messages )
87+ return c .processResponsesAPIMessages (ctx , inputMap , v )
7888 }
79- return nil
89+ return "" , false
8090}
8191
82- func (c * ChatProcessor ) convertResponsesAPIToStandard (input []interface {}) []interface {} {
83- var messages []interface {}
84- for _ , item := range input {
85- itemMap , ok := item .(map [string ]interface {})
86- if ! ok {
87- continue
88- }
89-
90- itemType , _ := itemMap ["type" ].(string )
91- switch itemType {
92- case "message" , "" :
93- role , _ := itemMap ["role" ].(string )
94- if role == "" {
95- continue
96- }
97- content := itemMap ["content" ]
98- messages = append (messages , map [string ]interface {}{
99- "role" : role ,
100- "content" : content ,
101- })
102- }
92+ func (c * ChatProcessor ) processResponsesAPIMessages (ctx context.Context , inputMap map [string ]interface {}, items []interface {}) (string , bool ) {
93+ if len (items ) == 0 {
94+ return "" , true
10395 }
104- return messages
105- }
10696
107- func (c * ChatProcessor ) filterLastUserMessage (messages []interface {}) []interface {} {
108- lastUserIndex := - 1
109- for i := len (messages ) - 1 ; i >= 0 ; i -- {
110- msg , ok := messages [i ].(map [string ]interface {})
111- if ! ok {
112- continue
113- }
114- role , _ := msg ["role" ].(string )
115- if role == "user" {
116- lastUserIndex = i
117- break
118- }
119- }
120-
121- if lastUserIndex == - 1 {
122- return nil
123- }
124-
125- return []interface {}{messages [lastUserIndex ]}
126- }
127-
128- func (c * ChatProcessor ) buildUserInput (ctx context.Context , messages []interface {}) string {
129- if len (messages ) == 0 {
130- return ""
97+ lastItem , ok := items [len (items )- 1 ].(map [string ]interface {})
98+ if ! ok {
99+ return "" , true
131100 }
132101
133- result := map [string ]interface {}{
134- "messages" : messages ,
102+ role , _ := lastItem ["role" ].(string )
103+ if role != "user" {
104+ return "" , true
135105 }
136106
137- output , err := sonic .MarshalString (result )
107+ inputMap ["input" ] = []interface {}{lastItem }
108+ result , err := sonic .MarshalString (inputMap )
138109 if err != nil {
139- logs .CtxWarn (ctx , "chat processor: failed to marshal user input: %v" , err )
140- return ""
110+ logs .CtxWarn (ctx , "chat processor: failed to marshal input: %v" , err )
111+ return "" , true
141112 }
142- return output
113+ return result , true
143114}
144115
145116type ChatProcessorFactory struct {}
0 commit comments