Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGLOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## [0.1.25] - 2026-01-22
### Added
- add to_runnable decorator
- lcc tool message add name and tool_call_id
- support set finish time of span

## [0.1.24] - 2026-01-16
### Added
- client init set default client if not exist
Expand Down
1 change: 1 addition & 0 deletions cozeloop/decorator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@

coze_loop_decorator= CozeLoopDecorator()
observe = coze_loop_decorator.observe
to_runnable = coze_loop_decorator.to_runnable
178 changes: 178 additions & 0 deletions cozeloop/decorator/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import Optional, Callable, Any, overload, Dict, Generic, Iterator, TypeVar, List, cast, AsyncIterator
from functools import wraps

from langchain_core.runnables import RunnableLambda, RunnableConfig
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是不是应该放intergration/langchain里面,不用langchain的,照理都不需要import langchain_core?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉不用挪包,只需要把import放进to_runnable注解就可以了,因为有通用方法,而且都是注解,放在一个注解类里也更合理些。


from cozeloop import Client, Span, start_span
from cozeloop.decorator.utils import is_async_func, is_gen_func, is_async_gen_func, is_class_func

Expand Down Expand Up @@ -311,6 +313,182 @@ async def async_stream_wrapper(*args: Any, **kwargs: Any):
else:
return decorator(func)

def to_runnable(
self,
func: Callable = None,
) -> Callable:
"""
Decorator to be RunnableLambda.

:param func: The function to be decorated, Requirements are as follows:
1. When the func is called, parameter config(RunnableConfig) is required, you must use the config containing cozeloop callback handler of 'current request', otherwise, the trace may be lost!

Examples:
@to_runnable
def runnable_func(my_input: dict) -> str:
return input

async def scorer_leader(state: MyState) -> dict | str:
await runnable_func({"a": "111", "b": 222, "c": "333"}, config=state.config) # config is required
"""

def decorator(func: Callable):

@wraps(func)
def sync_wrapper(*args: Any, **kwargs: Any):
config = kwargs.pop("config", None)
config = _convert_config(config)
res = None
try:
extra = {}
if len(args) > 0 and is_class_func(func):
extra = {"_inner_class_self": args[0]}
args = args[1:]
inp = {}
if len(args) > 0:
inp['args'] = args
if len(kwargs) > 0:
inp['kwargs'] = kwargs
res = RunnableLambda(_param_wrapped_func).invoke(input=inp, config=config, **extra)
if hasattr(res, "__iter__"):
return res
except StopIteration:
pass
except Exception as e:
raise e
finally:
if res is not None:
return res

@wraps(func)
async def async_wrapper(*args: Any, **kwargs: Any):
config = kwargs.pop("config", None)
config = _convert_config(config)
res = None
try:
extra = {}
if len(args) > 0 and is_class_func(func):
extra = {"_inner_class_self": args[0]}
args = args[1:]
inp = {}
if len(args) > 0:
inp['args'] = args
if len(kwargs) > 0:
inp['kwargs'] = kwargs
res = await RunnableLambda(_param_wrapped_func_async).ainvoke(input=inp, config=config, **extra)
if hasattr(res, "__aiter__"):
return res
except StopIteration:
pass
except StopAsyncIteration:
pass
except Exception as e:
if e.args and e.args[0] == 'coroutine raised StopIteration': # coroutine StopIteration
pass
else:
raise e
finally:
if res is not None:
return res

@wraps(func)
def gen_wrapper(*args: Any, **kwargs: Any):
config = kwargs.pop("config", None)
config = _convert_config(config)
try:
extra = {}
if len(args) > 0 and is_class_func(func):
extra = {"_inner_class_self": args[0]}
args = args[1:]
inp = {}
if len(args) > 0:
inp['args'] = args
if len(kwargs) > 0:
inp['kwargs'] = kwargs
gen = RunnableLambda(_param_wrapped_func).invoke(input=inp, config=config, *extra)
try:
for item in gen:
yield item
except StopIteration:
pass
except Exception as e:
raise e

@wraps(func)
async def async_gen_wrapper(*args: Any, **kwargs: Any):
config = kwargs.pop("config", None)
config = _convert_config(config)
try:
extra = {}
if len(args) > 0 and is_class_func(func):
extra = {"_inner_class_self": args[0]}
args = args[1:]
inp = {}
if len(args) > 0:
inp['args'] = args
if len(kwargs) > 0:
inp['kwargs'] = kwargs
gen = RunnableLambda(_param_wrapped_func_async).invoke(input=inp, config=config, **extra)
items = []
try:
async for item in gen:
items.append(item)
yield item
finally:
pass
except StopIteration:
pass
except StopAsyncIteration:
pass
except Exception as e:
if e.args and e.args[0] == 'coroutine raised StopIteration':
pass
else:
raise e

# for convert parameter
def _param_wrapped_func(input_dict: dict, **kwargs) -> Any:
real_args = input_dict.get("args", ())
real_kwargs = input_dict.get("kwargs", {})

inner_class_self = kwargs.get("_inner_class_self", None)
if inner_class_self is not None:
real_args = (inner_class_self, *real_args)

return func(*real_args, **real_kwargs)

async def _param_wrapped_func_async(input_dict: dict, **kwargs) -> Any:
real_args = input_dict.get("args", ())
real_kwargs = input_dict.get("kwargs", {})

inner_class_self = kwargs.get("_inner_class_self", None)
if inner_class_self is not None:
real_args = (inner_class_self, *real_args)

return await func(*real_args, **real_kwargs)

def _convert_config(config: RunnableConfig = None) -> RunnableConfig | None:
if config is None:
config = RunnableConfig(run_name=func.__name__)
config['run_name'] = func.__name__
elif isinstance(config, dict):
config['run_name'] = func.__name__
return config

if is_async_gen_func(func):
return async_gen_wrapper
if is_gen_func(func):
return gen_wrapper
elif is_async_func(func):
return async_wrapper
else:
return sync_wrapper

if func is None:
return decorator
else:
return decorator(func)


class _CozeLoopTraceStream(Generic[S]):
def __init__(
Expand Down
11 changes: 10 additions & 1 deletion cozeloop/integration/langchain/trace_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from langchain_core.outputs import LLMResult, ChatGeneration
from langchain_core.agents import AgentFinish, AgentAction
from langchain_core.prompt_values import PromptValue, ChatPromptValue
from langchain_core.messages import BaseMessage, AIMessageChunk, AIMessage
from langchain_core.messages import BaseMessage, AIMessageChunk, AIMessage, ToolMessage
from langchain_core.prompts import AIMessagePromptTemplate, HumanMessagePromptTemplate, SystemMessagePromptTemplate
from langchain_core.outputs import ChatGenerationChunk, GenerationChunk

Expand Down Expand Up @@ -581,6 +581,15 @@ def _convert_inputs(inputs: Any) -> Any:
if inputs.content != '':
format_inputs['content'] = inputs.content
return format_inputs
if isinstance(inputs, ToolMessage):
"""
Must be before BaseMessage.
"""
content = {"content": inputs.content}
if inputs.artifact is not None:
content['artifact'] = _convert_inputs(inputs.artifact) # artifact is existed when response_format="content_and_artifact".
message = Message(role=inputs.type, content=content)
return message
if isinstance(inputs, BaseMessage):
message = Message(role=inputs.type, content=inputs.content,
tool_calls=inputs.additional_kwargs.get('tool_calls', []))
Expand Down
4 changes: 3 additions & 1 deletion cozeloop/integration/langchain/trace_model/llm_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class Message:
tool_calls: List[ToolCall] = None
metadata: Optional[dict] = None
reasoning_content: Optional[str] = None
name: Optional[str] = None
tool_call_id: Optional[str] = None

def __post_init__(self):
if self.role is not None and (self.role == 'AIMessageChunk' or self.role == 'ai'):
Expand Down Expand Up @@ -155,7 +157,7 @@ def __init__(self, messages: List[Union[BaseMessage, List[BaseMessage]]], invoca
if message.additional_kwargs is not None and message.additional_kwargs.get('name', ''):
name = message.additional_kwargs.get('name', '')
tool_call = ToolCall(id=message.tool_call_id, type=message.type, function=ToolFunction(name=name))
self._messages.append(Message(role=message.type, content=message.content, tool_calls=[tool_call]))
self._messages.append(Message(role=message.type, content=message.content, tool_calls=[tool_call], name=name, tool_call_id=message.tool_call_id))
else:
self._messages.append(Message(role=message.type, content=message.content))

Expand Down
3 changes: 3 additions & 0 deletions cozeloop/internal/trace/noop_span.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ def set_system_tags(self, system_tags: Dict[str, Any]) -> None:
def set_deployment_env(self, deployment_env: str) -> None:
pass

def set_finish_time(self, finish_time: datetime) -> None:
pass

def __enter__(self):
return self

Expand Down
10 changes: 9 additions & 1 deletion cozeloop/internal/trace/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(self, span_type: str = '', name: str = '', space_id: str = '', trac
self.space_id = space_id
self.parent_span_id = parent_span_id
self.start_time = start_time if start_time else datetime.now()
self.finish_time: datetime = None
self.duration = duration
self.tag_map = tag_map if tag_map else {}
self.system_tag_map = system_tag_map if system_tag_map else {}
Expand Down Expand Up @@ -396,6 +397,10 @@ def set_system_tags(self, system_tags: Dict[str, Any]) -> None:
def set_deployment_env(self, deployment_env: str) -> None:
self.set_tags({DEPLOYMENT_ENV: deployment_env})

def set_finish_time(self, finish_time: datetime) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是不是注释下,一般不需要调用

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

self.finish_time = finish_time


def get_rectified_map(self, input_map: Dict[str, Any]) -> (Dict[str, Any], List[str], int):
validate_map = {}
cut_off_keys = []
Expand Down Expand Up @@ -541,7 +546,10 @@ def set_stat_info(self):
if input_tokens > 0 or output_tokens > 0:
self.set_tags({TOKENS: int(input_tokens) + int(output_tokens)})

duration = int((datetime.now().timestamp() - self.start_time.timestamp()) * 1000000)
finish_time_stamp = datetime.now().timestamp()
if self.finish_time is not None:
finish_time_stamp = self.finish_time.timestamp()
duration = int((finish_time_stamp - self.start_time.timestamp()) * 1000000)
with self.lock:
self.duration = duration

Expand Down
7 changes: 7 additions & 0 deletions cozeloop/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,18 @@ def set_system_tags(self, system_tags: Dict[str, Any]) -> None:
Set system tags. DO NOT use this method unless you know what you are doing.
"""

@abstractmethod
def set_deployment_env(self, deployment_env: str) -> None:
"""
Set the deployment environment of the span, identify custom environments.
"""

@abstractmethod
def set_finish_time(self, finish_time: datetime) -> None:
"""
Set the finish time of the span.
"""


class Span(CommonSpanSetter, SpanContext):
"""
Expand Down
2 changes: 2 additions & 0 deletions examples/trace/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import os
import time
from datetime import datetime, timedelta

import cozeloop

Expand Down Expand Up @@ -100,6 +101,7 @@ def do_simple_demo():
span.set_error(str(e))

# 3. span finish
span.set_finish_time(datetime.now() + timedelta(seconds=3)) # set finish time as your need to change duration
span.finish()

# 4. (optional) flush or close
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "cozeloop"
version = "0.1.24"
version = "0.1.25"
description = "coze loop sdk"
authors = ["JiangQi715 <jiangqi.rrt@bytedance.com>"]
license = "MIT"
Expand Down