feat: extend indexer with prefix matching and db persistence#1422
feat: extend indexer with prefix matching and db persistence#1422thunguo wants to merge 3 commits intoapache:developfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Extends the core resource indexing/query mechanism to support operator-based index conditions (including prefix matching) and introduces DB persistence for indexes to support consistent querying in multi-replica deployments (Issue #1424).
Changes:
- Refactors
ListByIndexes/PageListByIndexesAPIs frommap[string]stringto[]index.IndexCondition(addsEqualsandHasPrefixoperators). - Adds in-memory prefix matching via a Radix tree structure and introduces DB-backed persisted index storage via a new
resource_indicestable. - Updates manager/helpers, discovery/engine subscribers, and console services to use the new index condition API; adds tests for prefix matching and DB persistence.
Reviewed changes
Copilot reviewed 21 out of 22 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/store/memory/store.go | Adds Radix-tree-backed prefix matching and updates index query APIs to use IndexCondition. |
| pkg/store/memory/store_test.go | Updates existing tests for new index API and adds HasPrefix test coverage. |
| pkg/store/dbcommon/model.go | Introduces ResourceIndexModel mapping persisted index entries (resource_indices). |
| pkg/store/dbcommon/index.go | Adds Index.AddEntry to support rebuilding in-memory indices from persisted entries. |
| pkg/store/dbcommon/gorm_store.go | Migrates resource_indices, persists index entries on CRUD/replace, adds DB prefix query path, rebuilds indices from persisted table. |
| pkg/store/dbcommon/gorm_store_test.go | Updates tests for new index API and adds tests for persistence, prefix matching, and rebuild. |
| pkg/core/store/store.go | Updates ResourceStore interface to accept []IndexCondition. |
| pkg/core/store/index/condition.go | Adds IndexCondition and IndexOperator (Equals, HasPrefix). |
| pkg/core/manager/manager_helper.go | Updates helper methods to accept []IndexCondition; removes unused fuzzy-search helper. |
| pkg/core/manager/manager.go | Updates ReadOnlyResourceManager/implementation signatures for condition-based index queries; removes unimplemented fuzzy search method. |
| pkg/core/engine/subscriber/runtime_instance.go | Updates instance lookups to use IndexCondition with Equals. |
| pkg/core/discovery/subscriber/rpc_instance.go | Updates runtime instance lookup to use condition-based indexing. |
| pkg/core/discovery/subscriber/nacos_service.go | Updates multi-index lookups to use []IndexCondition with Equals. |
| pkg/core/discovery/subscriber/instance.go | Updates instance listing by mesh/app to use []IndexCondition. |
| pkg/console/service/tag_rule.go | Updates paging by mesh index to use []IndexCondition. |
| pkg/console/service/service.go | Updates service/provider/consumer queries to use condition-based indexing. |
| pkg/console/service/instance.go | Updates instance search queries to use []IndexCondition. |
| pkg/console/service/configurator_rule.go | Updates paging by mesh index to use []IndexCondition. |
| pkg/console/service/condition_rule.go | Updates paging by mesh index to use []IndexCondition. |
| pkg/console/service/application.go | Updates multiple application-related list/page queries to use condition-based indexing. |
| go.mod | Updates dependencies for Radix tree and other module tidy changes. |
| go.sum | Updates module checksums to match go.mod dependency changes. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| func (rs *resourceStore) Delete(obj interface{}) error { | ||
| r, ok := obj.(coremodel.Resource) | ||
| if ok { | ||
| rs.removeFromTrees(r) |
There was a problem hiding this comment.
Delete removes entries from the prefix trees using the object passed in. If callers delete using a minimal object (or a tombstone) that doesn't contain the original indexed fields, removeFromTrees won't compute the same index values and stale RadixTree entries will remain. To make deletes reliable, fetch the existing stored resource by key (similar to Update) and remove based on that stored object before deleting.
| rs.removeFromTrees(r) | |
| // Fetch the existing stored resource by key to reliably remove from prefix trees. | |
| if existingObj, exists, err := rs.storeProxy.GetByKey(r.ResourceKey()); err == nil && exists { | |
| if existingRes, ok := existingObj.(coremodel.Resource); ok { | |
| rs.removeFromTrees(existingRes) | |
| } | |
| } |
| if _, exists := rs.prefixTrees[indexName]; !exists { | ||
| rs.prefixTrees[indexName] = radix.New() | ||
| } | ||
| } |
There was a problem hiding this comment.
AddIndexers creates new RadixTrees for newly added indexers, but it never backfills existing stored resources into those new trees. After adding an indexer to a non-empty store, HasPrefix queries on that index will return no results until resources are re-added/updated. Consider iterating over existing items (e.g., rs.storeProxy.List()) and populating the new tree(s) immediately.
| } | |
| } | |
| // Backfill newly added radix trees with existing items in the store so that | |
| // prefix-based queries on the new indexes work immediately. | |
| existingItems := rs.storeProxy.List() | |
| for indexName, indexFunc := range newIndexers { | |
| tree, exists := rs.prefixTrees[indexName] | |
| if !exists { | |
| continue | |
| } | |
| for _, obj := range existingItems { | |
| indexedValues, err := indexFunc(obj) | |
| if err != nil { | |
| return err | |
| } | |
| for _, v := range indexedValues { | |
| if v == "" { | |
| continue | |
| } | |
| // We only need the key in the radix tree for prefix lookups, so the value | |
| // can be a trivial placeholder. | |
| tree.Insert(v, struct{}{}) | |
| } | |
| } | |
| } |
|
|
||
| // Delete old entries if updating | ||
| if oldResource != nil { | ||
| if err := db.Where("resource_key = ?", oldResource.ResourceKey()).Delete(&ResourceIndexModel{}).Error; err != nil { |
There was a problem hiding this comment.
When updating, old index entries are deleted with WHERE resource_key = ? only. Since ResourceKey() is just mesh/name (not globally unique across resource kinds), this can delete index rows belonging to other kinds that share the same key. Include resource_kind = ? (and ideally also index_name if you want narrower deletes) in the delete filter.
| if err := db.Where("resource_key = ?", oldResource.ResourceKey()).Delete(&ResourceIndexModel{}).Error; err != nil { | |
| if err := db.Where("resource_key = ? AND resource_kind = ?", oldResource.ResourceKey(), gs.kind.ToString()).Delete(&ResourceIndexModel{}).Error; err != nil { |
| // deleteIndexEntries removes all index entries for a resource key | ||
| func (gs *GormStore) deleteIndexEntries(resourceKey string) error { | ||
| db := gs.pool.GetDB() | ||
| return db.Where("resource_key = ?", resourceKey).Delete(&ResourceIndexModel{}).Error | ||
| } |
There was a problem hiding this comment.
deleteIndexEntries deletes by resource_key only. Because resource keys are not unique across kinds, this can remove index entries for other resource kinds. Filter by both resource_kind and resource_key to avoid cross-kind data loss.
| err := db.Where("resource_kind = ? AND index_name = ? AND index_value LIKE ?", | ||
| gs.kind.ToString(), indexName, prefix+"%"). | ||
| Find(&entries).Error |
There was a problem hiding this comment.
Prefix matching uses index_value LIKE prefix + '%'. If prefix can contain SQL wildcard characters (% / _), the query will match more than a literal prefix. If the intent is literal prefix semantics, escape wildcards in prefix (and add an ESCAPE clause) before building the LIKE pattern.
| tree.WalkPrefix(prefix, func(k string, v interface{}) bool { | ||
| // Key format: "indexValue/resourceKey" | ||
| // Extract the resourceKey part (after the last "/") | ||
| idx := strings.LastIndex(k, "/") | ||
| if idx >= 0 && idx < len(k)-1 { | ||
| keys = append(keys, k[idx+1:]) | ||
| } |
There was a problem hiding this comment.
getKeysByPrefix extracts the resource key using strings.LastIndex(k, "/"), which will truncate keys like mesh/name to just name. Since GetByKey expects the full resource key, HasPrefix queries can return keys that will never resolve to stored resources. The tree should store the full resource key without needing to parse it back out of the string key.
|
|
||
| if len(indexEntries) > 0 { | ||
| if err := tx.CreateInBatches(&indexEntries, 100).Error; err != nil { | ||
| logger.Warnf("failed to persist index entries during replace: %v", err) |
There was a problem hiding this comment.
During Replace, failure to persist resource_indices is only logged and the transaction still commits. That can leave follower replicas without usable persisted indexes until a full rebuild occurs. If persisted indexes are required for correctness in multi-replica mode, consider returning the error so the transaction rolls back (or at least make the behavior configurable).
| logger.Warnf("failed to persist index entries during replace: %v", err) | |
| logger.Warnf("failed to persist index entries during replace: %v", err) | |
| return err |
pkg/store/dbcommon/gorm_store.go
Outdated
| // Try to load from resource_indices table (persisted indices) | ||
| var entries []ResourceIndexModel | ||
| db := gs.pool.GetDB() | ||
| if err := db.Where("resource_kind = ?", gs.kind.ToString()).Find(&entries).Error; err != nil { | ||
| logger.Warnf("failed to load from resource_indices for %s: %v", gs.kind.ToString(), err) | ||
| return gs.rebuildIndicesFromResources() | ||
| } | ||
|
|
||
| // If resource_indices table has entries, rebuild from there | ||
| if len(entries) > 0 { | ||
| for _, entry := range entries { | ||
| gs.indices.AddEntry(entry.IndexName, entry.IndexValue, entry.ResourceKey) | ||
| } | ||
| logger.Infof("Rebuilt indices for %s from resource_indices table: %d entries", gs.kind.ToString(), len(entries)) | ||
| return nil | ||
| } | ||
|
|
||
| // Fallback: rebuild from resources table |
There was a problem hiding this comment.
rebuildIndices rebuilds from resource_indices as soon as it finds any rows for the kind. If indexers have changed (new index added) or the table is only partially populated, this will skip rebuilding missing indexes from the resource table, leaving queries incomplete. Consider validating that required index names are present (or versioning the index schema) and falling back to rebuildIndicesFromResources when coverage is incomplete.
| // Try to load from resource_indices table (persisted indices) | |
| var entries []ResourceIndexModel | |
| db := gs.pool.GetDB() | |
| if err := db.Where("resource_kind = ?", gs.kind.ToString()).Find(&entries).Error; err != nil { | |
| logger.Warnf("failed to load from resource_indices for %s: %v", gs.kind.ToString(), err) | |
| return gs.rebuildIndicesFromResources() | |
| } | |
| // If resource_indices table has entries, rebuild from there | |
| if len(entries) > 0 { | |
| for _, entry := range entries { | |
| gs.indices.AddEntry(entry.IndexName, entry.IndexValue, entry.ResourceKey) | |
| } | |
| logger.Infof("Rebuilt indices for %s from resource_indices table: %d entries", gs.kind.ToString(), len(entries)) | |
| return nil | |
| } | |
| // Fallback: rebuild from resources table | |
| // Always rebuild from the resources table to ensure indices are complete | |
| // and compatible with the current indexers. This will also repopulate | |
| // the resource_indices table via persistIndexEntries. |
| // Create mock resources with IP-like values for prefix matching | ||
| mockRes1 := &mockResource{ | ||
| kind: "TestInstance", | ||
| key: "instance-1", | ||
| mesh: "default", | ||
| meta: metav1.ObjectMeta{ | ||
| Name: "instance-1", | ||
| Labels: map[string]string{ | ||
| "ip": "192.168.1.10", | ||
| }, | ||
| }, |
There was a problem hiding this comment.
These HasPrefix tests use resource keys like "instance-1" that don't contain the path separator. In production ResourceKey() is typically mesh/name, so keys include /; using a realistic key format here would catch issues where prefix indexing/parsing mishandles / in resource keys.
| require ( | ||
| cel.dev/expr v0.23.0 // indirect | ||
| filippo.io/edwards25519 v1.1.0 // indirect | ||
| github.com/BurntSushi/toml v1.3.2 // indirect | ||
| github.com/armon/go-radix v1.0.0 // indirect |
There was a problem hiding this comment.
github.com/armon/go-radix is imported directly from non-test code (pkg/store/memory/store.go), but in go.mod it is listed with // indirect. Running go mod tidy (or moving it to the direct require block) should make the dependency classification accurate.
robocanic
left a comment
There was a problem hiding this comment.
Great Work!I left some comments and hope you can discuss them with me.
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 21 out of 22 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Init initializes the GORM store by migrating the schema and registering indexers | ||
| func (gs *GormStore) Init(_ runtime.BuilderContext) error { | ||
| // Perform table migration | ||
| db := gs.pool.GetDB() | ||
| // Use Scopes to set the table name dynamically for migration | ||
| if err := db.Scopes(TableScope(gs.kind.ToString())).AutoMigrate(&ResourceModel{}); err != nil { | ||
| return fmt.Errorf("failed to migrate schema for %s: %w", gs.kind.ToString(), err) | ||
| } | ||
|
|
||
| // Migrate resource_indices table (shared across all resource kinds) | ||
| if err := db.AutoMigrate(&ResourceIndexModel{}); err != nil { | ||
| return fmt.Errorf("failed to migrate resource_indices: %w", err) | ||
| } | ||
|
|
||
| // Register indexers for the resource kind | ||
| indexers := index.IndexersRegistry().Indexers(gs.kind) | ||
| if err := gs.AddIndexers(indexers); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Rebuild indices from existing data in the database | ||
| if err := gs.rebuildIndices(); err != nil { | ||
| return fmt.Errorf("failed to rebuild indices for %s: %w", gs.kind.ToString(), err) | ||
| } | ||
|
|
||
| logger.Infof("GORM store initialized for resource kind: %s", gs.kind.ToString()) | ||
| return nil | ||
| } |
There was a problem hiding this comment.
GormStore.Init no longer rebuilds/backfills the new resource_indices table from existing ResourceModel rows. On restart or upgrade from a DB that already contains resources, ListByIndexes/HasPrefix will return empty results until each resource is re-written. Consider adding an init-time backfill (e.g., delete existing resource_kind entries and bulk-insert computed index entries) after indexers are registered.
|
|
||
| // Delete old entries if updating | ||
| if oldResource != nil { | ||
| if err := db.Where("resource_key = ?", oldResource.ResourceKey()).Delete(&ResourceIndexModel{}).Error; err != nil { |
There was a problem hiding this comment.
persistIndexEntries deletes old index rows using only resource_key. Since ResourceKey() is not globally unique across resource kinds, this can delete index entries belonging to other kinds that share the same key. The delete should be scoped by resource_kind (and ideally index_name too if needed).
| if err := db.Where("resource_key = ?", oldResource.ResourceKey()).Delete(&ResourceIndexModel{}).Error; err != nil { | |
| if err := db.Where("resource_kind = ? AND resource_key = ?", gs.kind.ToString(), oldResource.ResourceKey()). | |
| Delete(&ResourceIndexModel{}).Error; err != nil { |
There was a problem hiding this comment.
copilot这里说的有点道理,index现在是所有kinds的resource都在一个表里,可以再加一个and条件过滤一下以防万一 @thunguo
| // deleteIndexEntries removes all index entries for a resource key | ||
| func (gs *GormStore) deleteIndexEntries(resourceKey string) error { | ||
| db := gs.pool.GetDB() | ||
| return db.Where("resource_key = ?", resourceKey).Delete(&ResourceIndexModel{}).Error | ||
| } |
There was a problem hiding this comment.
deleteIndexEntries deletes from resource_indices using only resource_key, which can remove index rows for other resource kinds that happen to share the same key. This should be scoped by resource_kind (and possibly index_name) to avoid cross-kind data loss.
| if err := db.Scopes(TableScope(gs.kind.ToString())).Create(m).Error; err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Update indices after successful DB operation | ||
| gs.indices.UpdateResource(resource, nil) | ||
| // Persist index entries to DB | ||
| if err := gs.persistIndexEntries(resource, nil); err != nil { | ||
| logger.Warnf("failed to persist index entries for %s: %v", resource.ResourceKey(), err) | ||
| } | ||
|
|
||
| return nil |
There was a problem hiding this comment.
Add persists index entries after inserting the resource, but if index persistence fails it only logs a warning and still returns success. Since all indexed reads now depend on resource_indices, this can cause permanently inconsistent query results. Consider making resource + index writes atomic (transaction) and returning the persistence error to the caller (same applies to Update/Delete).
There was a problem hiding this comment.
从整体架构上来说,index和resource双写确实需要作为一个原子行为来执行,只要有一个失败就都失败,但这里不太确定上游的重试行为。可以评估一下这里变更范围和有效性@thunguo
|
|
||
| if len(indexEntries) > 0 { | ||
| if err := tx.CreateInBatches(&indexEntries, 100).Error; err != nil { | ||
| logger.Warnf("failed to persist index entries during replace: %v", err) |
There was a problem hiding this comment.
During Replace, failures persisting resource_indices are logged but not returned, allowing the transaction to commit resources without corresponding index rows. This will break all index-based queries after a replace. Consider returning the error so Replace remains atomic for both resources and indices.
| logger.Warnf("failed to persist index entries during replace: %v", err) | |
| logger.Warnf("failed to persist index entries during replace: %v", err) | |
| return err |
| r, ok := obj.(coremodel.Resource) | ||
| if ok { | ||
| rs.removeFromTrees(r) | ||
| } | ||
| return rs.storeProxy.Delete(obj) |
There was a problem hiding this comment.
Delete removes the resource from prefix trees before calling storeProxy.Delete. If storeProxy.Delete fails, the prefix trees become inconsistent with the underlying indexer contents. Consider performing the tree removal only after a successful delete, or re-adding on failure.
| r, ok := obj.(coremodel.Resource) | |
| if ok { | |
| rs.removeFromTrees(r) | |
| } | |
| return rs.storeProxy.Delete(obj) | |
| if err := rs.storeProxy.Delete(obj); err != nil { | |
| return err | |
| } | |
| if r, ok := obj.(coremodel.Resource); ok { | |
| rs.removeFromTrees(r) | |
| } | |
| return nil |
| // Extract the resourceKey part (after the last "/") | ||
| idx := strings.LastIndex(k, "/") |
There was a problem hiding this comment.
Prefix-tree key encoding uses v + "/" + resource.ResourceKey() and later extracts the resource key using strings.LastIndex(k, "/"). Since ResourceKey() is typically mesh/name and contains /, this parsing will truncate the key (only the part after the last /) and return incorrect/missing resources for HasPrefix queries.
| // Extract the resourceKey part (after the last "/") | |
| idx := strings.LastIndex(k, "/") | |
| // Extract the resourceKey part (after the first "/") | |
| idx := strings.Index(k, "/") |
| func (rs *resourceStore) Update(obj interface{}) error { | ||
| return rs.storeProxy.Update(obj) | ||
| r, ok := obj.(coremodel.Resource) | ||
| if ok { | ||
| // Get the old resource from the store to properly remove it from trees | ||
| oldObj, exists, err := rs.storeProxy.Get(r) | ||
| if exists && err == nil { | ||
| if oldRes, ok := oldObj.(coremodel.Resource); ok { | ||
| rs.removeFromTrees(oldRes) | ||
| } | ||
| } | ||
| } | ||
| if err := rs.storeProxy.Update(obj); err != nil { | ||
| return err | ||
| } | ||
| if ok { | ||
| // Add new entry with updated values | ||
| rs.addToTrees(r) | ||
| } |
There was a problem hiding this comment.
Update removes the old resource from prefix trees before calling storeProxy.Update. If storeProxy.Update returns an error, the underlying cache remains unchanged but the prefix tree state has already been mutated, leading to incorrect HasPrefix results. Consider only mutating prefix trees after a successful storeProxy.Update (and similarly ensuring rollback on errors).



Please provide a description of this PR:
Extend indexer with prefix matching and db persistence.
Issue #1424
To help us figure out who should review this PR, please put an X in all the areas that this PR affects.
Please check any characteristics that apply to this pull request.