Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 113 additions & 30 deletions tccli/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,24 +333,42 @@ def _filling_param_info(self, param_info, para, param_type, member):
param_info[para["name"]]["members"] = member
return param_info

def _get_param_info(self, param_model, object_model):
def _get_param_info(self, param_model, object_model, visited=None, depth=0, max_depth=20):
# visited 沿当前 DFS 路径记录已展开的复合类型名,用于检测自引用/相互引用导致的环
if visited is None:
visited = frozenset()
param_info = {}
for para in param_model:
member = para["member"]
# 命中环或超过最大深度:不再向下展开,作为占位 leaf 处理
recursive_hit = member not in BASE_TYPE and (member in visited or depth >= max_depth)
if para["type"] == "list":
if para["member"] not in BASE_TYPE:
self._filling_param_info(
param_info, para, "list",
[self._get_param_info(object_model[para["member"]]["members"], object_model)])
if member not in BASE_TYPE:
if recursive_hit:
self._filling_param_info(
param_info, para, "list", [member])
else:
self._filling_param_info(
param_info, para, "list",
[self._get_param_info(
object_model[member]["members"], object_model,
visited | {member}, depth + 1, max_depth)])
else:
self._filling_param_info(
param_info, para, "list", [para["member"]])
param_info, para, "list", [member])
else:
if para["member"] not in BASE_TYPE:
param_info = self._filling_param_info(
param_info, para, para["member"],
self._get_param_info(object_model[para["member"]]["members"], object_model))
if member not in BASE_TYPE:
if recursive_hit:
param_info = self._filling_param_info(
param_info, para, member, member)
else:
param_info = self._filling_param_info(
param_info, para, member,
self._get_param_info(
object_model[member]["members"], object_model,
visited | {member}, depth + 1, max_depth))
else:
self._filling_param_info(param_info, para, para["member"], para["member"])
self._filling_param_info(param_info, para, member, member)
return param_info

def get_param_info(self, service, version, action):
Expand All @@ -363,21 +381,37 @@ def get_output_param_info(self, service, version, action):
param_model = service_model["objects"]
return self._get_param_info(param_model[action + "Response"]["members"], param_model)

def _generate_param_skeleton(self, param_model, name):
def _generate_param_skeleton(self, param_model, name, visited=None, depth=0, max_depth=20):
# visited 沿路径记录已展开的复合类型名,用于检测自引用导致的无限递归
if visited is None:
visited = frozenset()
param_skeleton = {}
for para in param_model:
member = para["member"]
recursive_hit = member not in BASE_TYPE and (member in visited or depth >= max_depth)
if para["type"] == "list":
if para["member"] not in BASE_TYPE:
param_skeleton[para["name"]] = \
[self._generate_param_skeleton(name[para["member"]]["members"], name)]
if member not in BASE_TYPE:
if recursive_hit:
# 自引用:用占位字符串表示该处需要使用 JSON 整体传入
param_skeleton[para["name"]] = ["RecursiveRef<%s>" % member]
else:
param_skeleton[para["name"]] = \
[self._generate_param_skeleton(
name[member]["members"], name,
visited | {member}, depth + 1, max_depth)]
else:
param_skeleton[para["name"]] = [PARAM_TYPE_MAP[para["member"]]]
param_skeleton[para["name"]] = [PARAM_TYPE_MAP[member]]
else:
if para["member"] not in BASE_TYPE:
param_skeleton[para["name"]] = \
self._generate_param_skeleton(name[para["member"]]["members"], name)
if member not in BASE_TYPE:
if recursive_hit:
param_skeleton[para["name"]] = "RecursiveRef<%s>" % member
else:
param_skeleton[para["name"]] = \
self._generate_param_skeleton(
name[member]["members"], name,
visited | {member}, depth + 1, max_depth)
else:
param_skeleton[para["name"]] = PARAM_TYPE_MAP[para["member"]]
param_skeleton[para["name"]] = PARAM_TYPE_MAP[member]
return param_skeleton

def generate_param_skeleton(self, service, version, action):
Expand All @@ -396,7 +430,7 @@ def get_unfold_param_info(self, service, version, action, profile="default", par
if param_array:
all_param_list = self._add_array_item(all_param_list, profile)

return self._filling_unfold_param_info(all_param_list, service, version, action)
return self._filling_unfold_param_info(all_param_list, service, version, action, object_model)

def _add_array_item(self, param_list, profile):
is_conf_exist, conf_path = Utils.file_existed(os.path.join(os.path.expanduser("~"), ".tccli"),
Expand All @@ -415,27 +449,42 @@ def _add_array_item(self, param_list, profile):
all_param_list.append(tmp)
return all_param_list

def _recur_get_unfold_param_info(self, param_model, object_model, return_param_list, param_list):
def _recur_get_unfold_param_info(self, param_model, object_model, return_param_list, param_list,
visited=None, depth=0, max_depth=20):
for para in param_model:
self._get_unfold_param_info(object_model, return_param_list, param_list, para)
self._get_unfold_param_info(object_model, return_param_list, param_list, para,
visited, depth, max_depth)
if param_list.pop().isdigit():
param_list.pop()

def _get_unfold_param_info(self, object_model, return_param_list, param_list, para):
def _get_unfold_param_info(self, object_model, return_param_list, param_list, para,
visited=None, depth=0, max_depth=20):
# visited 沿路径维护,识别自引用类型(如 AllocationRuleExpression.Children)
if visited is None:
visited = frozenset()
param_list.append(para["name"])
if para["type"] == "list" and para["member"] not in BASE_TYPE:
param_list.append('0')
if para["member"] not in BASE_TYPE:
self._recur_get_unfold_param_info(object_model[para["member"]]["members"],
object_model, return_param_list, param_list)
member = para["member"]
if member not in BASE_TYPE:
# 命中环或超出最大深度:把当前路径作为占位 leaf 登记,不再继续展开
if member in visited or depth >= max_depth:
tmp = copy.deepcopy(param_list)
return_param_list.append(tmp)
if param_list.pop().isdigit():
param_list.pop()
return
self._recur_get_unfold_param_info(object_model[member]["members"],
object_model, return_param_list, param_list,
visited | {member}, depth + 1, max_depth)
else:
tmp = copy.deepcopy(param_list)
return_param_list.append(tmp)

if param_list.pop().isdigit():
param_list.pop()

def _filling_unfold_param_info(self, param_list, service, version, action):
def _filling_unfold_param_info(self, param_list, service, version, action, object_model=None):
unfold_param = {}
param_info = self.get_param_info(service, version, action)
for param in param_list:
Expand All @@ -448,12 +497,22 @@ def _filling_unfold_param_info(self, param_list, service, version, action):
type_name = res["type_name"]
required = res.get("required")
document = res["document"]
recursive_truncated = False
recursive_type = None

for idx, item in enumerate(tmp_param[1:]):
# 命中自引用截断:当前 res 的 members 是占位字符串(类型名)而非 dict
if res["type"] == "Array":
res = res["members"][0][item]
members_container = res["members"][0]
else:
res = res["members"][item]
members_container = res["members"]
if not isinstance(members_container, dict) or item not in members_container:
# 该 leaf 是被环检测截断的占位项,不再向下钻取
recursive_truncated = True
recursive_type = members_container if isinstance(members_container, str) \
else (res.get("type_name") or "")
break
res = members_container[item]

# ?? seriously ??
if required == "Required" and res["required"] == "Optional":
Expand All @@ -466,9 +525,33 @@ def _filling_unfold_param_info(self, param_list, service, version, action):
document = res["document"]
break

# 二次判定:路径走完后,若该 leaf 自身是被环检测截断的复合类型(members 为占位)
if not recursive_truncated:
final_members = res.get("members")
if isinstance(final_members, list) and len(final_members) == 1 \
and isinstance(final_members[0], str) \
and final_members[0] not in BASE_TYPE \
and final_members[0] not in CLI_BASE_TYPE:
recursive_truncated = True
recursive_type = final_members[0]
elif isinstance(final_members, str) \
and final_members not in BASE_TYPE \
and final_members not in CLI_BASE_TYPE:
recursive_truncated = True
recursive_type = final_members

if len([item for item in param if item.isdigit() and int(item) > 0]) > 0:
required = "Optional"

if recursive_truncated:
# 自引用截断点统一标记为 Object,提示用户用 JSON 整体传入
param_type = "Object"
type_name = recursive_type or "Object"
required = "Optional"
document = (document or "") + \
("\n注意:该字段为自引用类型 %s,更深层嵌套请使用 --cli-input-json 整体传入 JSON,"
"或将本字段值作为 JSON 字符串整体传入。" % (recursive_type or ""))

unfold_param[".".join(param)]["type"] = param_type
unfold_param[".".join(param)]["type_name"] = type_name
unfold_param[".".join(param)]["required"] = required
Expand Down