feat(components):for node support break

This commit is contained in:
rulego-team
2025-11-27 18:09:41 +08:00
parent f16f4543b8
commit b03d404b18
2 changed files with 54 additions and 5 deletions

View File

@@ -312,6 +312,25 @@ func (md *Metadata) PutValue(key, value string) {
md.data[key] = value
}
// Delete removes the specified key and its value with Copy-on-Write semantics
func (md *Metadata) Delete(key string) {
md.mu.Lock()
defer md.mu.Unlock()
if md.shared {
// Create a new copy of the data
newData := make(map[string]string, len(md.data))
for k, v := range md.data {
newData[k] = v
}
md.data = newData
md.shared = false
}
delete(md.data, key)
}
// Values returns all key-value pairs in the metadata.
func (md *Metadata) Values() map[string]string {
md.mu.RLock()

View File

@@ -304,7 +304,7 @@ func (x *ForNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg) {
msg.Metadata.PutValue(KeyLoopItem, str.ToString(item))
}
// 执行并检查是否有取消请求
// 执行并检查是否有取消请求
if lastMsg, itemDataList, err = x.executeItem(ctxWithCancel, ctx, msg, x.Config.Mode); err != nil {
break
} else if x.Config.Mode == MergeValues {
@@ -312,12 +312,18 @@ func (x *ForNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg) {
} else if x.Config.Mode == ReplaceValues {
msg = lastMsg
}
// 检测是否触发中断
if msg.Metadata.GetValue(MdKeyBreak) == MdValueBreak {
msg.Metadata.Delete(MdKeyBreak)
break
}
}
case []int:
for index, item := range v {
msg.Metadata.PutValue(KeyLoopIndex, strconv.Itoa(index))
msg.Metadata.PutValue(KeyLoopItem, str.ToString(item))
// 执行并检查是否有取消请求
// 执行并检查是否有取消请求
if lastMsg, itemDataList, err = x.executeItem(ctxWithCancel, ctx, msg, x.Config.Mode); err != nil {
break
} else if x.Config.Mode == MergeValues {
@@ -325,12 +331,18 @@ func (x *ForNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg) {
} else if x.Config.Mode == ReplaceValues {
msg = lastMsg
}
// 检测是否触发中断
if msg.Metadata.GetValue(MdKeyBreak) == MdValueBreak {
msg.Metadata.Delete(MdKeyBreak)
break
}
}
case []int64:
for index, item := range v {
msg.Metadata.PutValue(KeyLoopIndex, strconv.Itoa(index))
msg.Metadata.PutValue(KeyLoopItem, str.ToString(item))
// 执行并检查是否有取消请求
// 执行并检查是否有取消请求
if lastMsg, itemDataList, err = x.executeItem(ctxWithCancel, ctx, msg, x.Config.Mode); err != nil {
break
} else if x.Config.Mode == MergeValues {
@@ -338,12 +350,18 @@ func (x *ForNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg) {
} else if x.Config.Mode == ReplaceValues {
msg = lastMsg
}
// 检测是否触发中断
if msg.Metadata.GetValue(MdKeyBreak) == MdValueBreak {
msg.Metadata.Delete(MdKeyBreak)
break
}
}
case []float64:
for index, item := range v {
msg.Metadata.PutValue(KeyLoopIndex, strconv.Itoa(index))
msg.Metadata.PutValue(KeyLoopItem, str.ToString(item))
// 执行并检查是否有取消请求
// 执行并检查是否有取消请求
if lastMsg, itemDataList, err = x.executeItem(ctxWithCancel, ctx, msg, x.Config.Mode); err != nil {
break
} else if x.Config.Mode == MergeValues {
@@ -351,6 +369,12 @@ func (x *ForNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg) {
} else if x.Config.Mode == ReplaceValues {
msg = lastMsg
}
// 检测是否触发中断
if msg.Metadata.GetValue(MdKeyBreak) == MdValueBreak {
msg.Metadata.Delete(MdKeyBreak)
break
}
}
case map[string]interface{}:
index := 0
@@ -363,7 +387,7 @@ func (x *ForNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg) {
} else {
msg.Metadata.PutValue(KeyLoopItem, str.ToString(item))
}
// 执行并检查是否有取消请求
// 执行并检查是否有取消请求
if lastMsg, itemDataList, err = x.executeItem(ctxWithCancel, ctx, msg, x.Config.Mode); err != nil {
break
} else if x.Config.Mode == MergeValues {
@@ -371,6 +395,12 @@ func (x *ForNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg) {
} else if x.Config.Mode == ReplaceValues {
msg = lastMsg
}
// 检测是否触发中断
if msg.Metadata.GetValue(MdKeyBreak) == MdValueBreak {
msg.Metadata.Delete(MdKeyBreak)
break
}
index++
}
default: