Skip to content

fix(memory): make PgSQL the single source of truth for user entity al…#896

Merged
keeees merged 2 commits intorelease/v0.3.0from
fix/extract-aliases
Apr 14, 2026
Merged

fix(memory): make PgSQL the single source of truth for user entity al…#896
keeees merged 2 commits intorelease/v0.3.0from
fix/extract-aliases

Conversation

@lanceyq
Copy link
Copy Markdown
Collaborator

@lanceyq lanceyq commented Apr 14, 2026

…iases

  • Skip alias merging for user entities during dedup (_merge_attribute and _merge_entities_with_aliases) to prevent dirty data from overwriting PgSQL authoritative aliases
  • Add PgSQL→Neo4j alias sync after Neo4j write in write_tools to ensure Neo4j user entities always reflect the PgSQL source
  • Remove deduped_aliases (Neo4j history) from alias sync in extraction_orchestrator, only append newly extracted aliases to PgSQL
  • Guard Neo4j MERGE cypher to preserve existing aliases for user entities (name IN ['用户','我','User','I'])
  • Fix emotion_analytics_service query to use ExtractedEntity label and entity_type property

Summary by Sourcery

使 PostgreSQL 成为用户别名(alias)的唯一可信来源,并确保 Neo4j 和分析逻辑遵循该模型。

Bug Fixes:

  • 防止别名合并逻辑在占位符用户实体上覆盖权威的用户别名。
  • 修复情绪分析中的用户画像查询,使其在 Neo4j 中使用正确的 ExtractedEntity 标签和 entity_type 属性。

Enhancements:

  • 更改终端用户别名的同步逻辑,仅将会话中新增抽取的别名追加写入 PostgreSQL,而不回写 Neo4j 的历史记录。
  • 在 Neo4j 写入成功后,从 PostgreSQL 同步用户实体别名至 Neo4j,以保持 Neo4j 中的用户实体与权威别名来源一致。
  • 为 Neo4j 的 MERGE Cypher 增加保护,使写操作不会修改 Neo4j 中用户占位符实体的别名。
Original summary in English

Summary by Sourcery

Make PostgreSQL the single source of truth for user aliases and ensure Neo4j and analytics respect that model.

Bug Fixes:

  • Prevent alias merging logic from overwriting authoritative user aliases for placeholder user entities.
  • Fix emotion analytics user profile query to use the correct ExtractedEntity label and entity_type property in Neo4j.

Enhancements:

  • Change end user alias synchronization to append only newly extracted aliases from conversations into PostgreSQL without feeding back Neo4j history.
  • After successful Neo4j writes, sync user entity aliases from PostgreSQL into Neo4j to keep user entities aligned with the authoritative alias source.
  • Guard Neo4j MERGE cypher so that write operations do not modify aliases for user placeholder entities in Neo4j.

…iases

- Skip alias merging for user entities during dedup (_merge_attribute and
  _merge_entities_with_aliases) to prevent dirty data from overwriting
  PgSQL authoritative aliases
- Add PgSQL→Neo4j alias sync after Neo4j write in write_tools to
  ensure Neo4j user entities always reflect the PgSQL source
- Remove deduped_aliases (Neo4j history) from alias sync in
  extraction_orchestrator, only append newly extracted aliases to PgSQL
- Guard Neo4j MERGE cypher to preserve existing aliases for user
  entities (name IN ['用户','我','User','I'])
- Fix emotion_analytics_service query to use ExtractedEntity label
  and entity_type property
@lanceyq lanceyq requested a review from keeees April 14, 2026 09:29
@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai bot commented Apr 14, 2026

Reviewer's Guide

此 PR 将 PostgreSQL 作为用户别名(user aliases)的权威数据源,并调整了抽取、去重逻辑、Neo4j 写入路径和分析查询,使 Neo4j 不再向 PgSQL 回写别名,而是由 PgSQL 同步用户实体到 Neo4j,同时修复了一个实体查询中的标签/属性不匹配问题。

写入路径中 PgSQL→Neo4j 用户别名同步的时序图

sequenceDiagram
    actor Client
    participant WriteService as write_tools_write
    participant PgSQL as PgSQL_end_user_info
    participant Neo4j as Neo4j_ExtractedEntity
    participant Celery as Celery_clustering_task

    Client->>WriteService: write(all_entity_nodes, memory_config, ...)
    WriteService->>Neo4j: execute_query(save ExtractedEntity nodes)
    Neo4j-->>WriteService: success

    alt all_entity_nodes not empty
        WriteService->>WriteService: end_user_id = all_entity_nodes[0].end_user_id
        opt end_user_id exists
            WriteService->>PgSQL: EndUserInfoRepository.get_by_end_user_id(end_user_id)
            PgSQL-->>WriteService: info.aliases as pg_aliases
            alt pg_aliases not empty
                WriteService->>Neo4j: MATCH ExtractedEntity WHERE end_user_id = end_user_id AND name in 用户/我/User/I SET aliases = pg_aliases
                Neo4j-->>WriteService: aliases updated
            else pg_aliases empty
                WriteService->>WriteService: skip alias sync
            end
        end

        WriteService->>Celery: run_incremental_clustering.apply_async(...)
        Celery-->>WriteService: task_id
    else no entities
        WriteService->>WriteService: skip alias sync and clustering
    end

    WriteService-->>Client: write result
Loading

从当前对话抽取结果更新 PgSQL 用户别名的时序图

sequenceDiagram
    participant Engine as ExtractionEngine
    participant Orchestrator as extraction_orchestrator
    participant PgSQL as PgSQL_end_user_info
    participant Neo4j as Neo4j_ExtractedEntity

    Engine->>Orchestrator: _update_end_user_other_name(entity_nodes, dialog_data_list, end_user_id)

    Orchestrator->>Orchestrator: current_aliases = _extract_current_aliases(entity_nodes, dialog_data_list)
    Orchestrator->>Neo4j: _fetch_neo4j_assistant_aliases(end_user_id)
    Neo4j-->>Orchestrator: neo4j_assistant_aliases

    Orchestrator->>Orchestrator: filter current_aliases by assistant aliases
    alt current_aliases empty
        Orchestrator-->>Engine: return (no update)
    else
        Orchestrator->>PgSQL: EndUserRepository.get(end_user_id)
        PgSQL-->>Orchestrator: end_user
        alt end_user not found
            Orchestrator-->>Engine: return
        else
            Orchestrator->>PgSQL: EndUserInfoRepository.get_by_end_user_id(end_user_id)
            PgSQL-->>Orchestrator: info.aliases as db_aliases
            Orchestrator->>Orchestrator: filter placeholder names in db_aliases
            Orchestrator->>Orchestrator: merged_aliases = db_aliases + current_aliases (dedup, keep order)

            alt info exists
                Orchestrator->>PgSQL: update EndUserInfo.aliases = merged_aliases
            else
                Orchestrator->>PgSQL: insert EndUserInfo with aliases = merged_aliases
            end

            Orchestrator-->>Engine: done
        end
    end
Loading

文件级改动

Change Details Files
停止使用去重逻辑来修改用户实体的别名;它们的别名不再从抽取数据或 Neo4j 历史中合并。
  • _merge_attribute 中对别名合并增加保护:若实体的规范名称(canonical name)是已知的用户占位名,则跳过别名标准化与更新,保留其现有别名
  • 对用户占位实体,在 _merge_entities_with_aliases 中短路处理,使其别名在模糊合并过程中不被修改;对其他实体,则保留归一化后的合并行为,并简化代码路径
api/app/core/memory/storage_services/extraction_engine/deduplication/deduped_and_disamb.py
更改终端用户别名同步逻辑,将 PgSQL 视为唯一真实来源,仅追加当前对话中新抽取到的别名。
  • 更新 _update_end_user_other_name,仅依赖当前对话中抽取的别名,去除之前包含 Neo4j 历史的 deduped_aliases
  • 移除与 deduped_aliases 相关的过滤和日志,仅使用 current_aliases 来判断是否为空以及计算 first_alias
  • 合并别名时使用 “PgSQL 中已有别名 + current_aliases” 并做去重,避免任何来自 Neo4j 的别名数据
api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py
在 Neo4j 写入成功后,将 PgSQL 用户别名推送回 Neo4j,使 Neo4j 用户实体反映 PgSQL 的权威数据。
  • write_to_neo4j 成功且存在实体节点后,通过 EndUserInfoRepository 从 PgSQL 加载 end_user_info.aliases
  • 如果 PgSQL 中存在别名,则运行一条 Neo4j 查询,查找该终端用户在 Neo4j 中名称为用户占位名之一的 ExtractedEntity 节点,并用 PgSQL 中的别名覆盖其 aliases
  • 将 PgSQL→Neo4j 同步逻辑包裹在 try/except 中,记录失败日志但不影响主流程,然后按原先逻辑继续调度 Celery 的聚类任务
api/app/core/memory/agent/utils/write_tools.py
修复情绪分析查询,使用正确的 Neo4j 标签和实体类型属性。
  • _get_simple_user_profile 中修改 Cypher 查询,使其匹配 :ExtractedEntity 而不是 :Entity
  • 返回 e.entity_type 作为 type,而不是 e.type,以与当前 schema 对齐
api/app/services/emotion_analytics_service.py
确保在实体 upsert 过程中,Neo4j 的 MERGE 逻辑不会覆盖用户别名。
  • 更新 Neo4j Cypher 中 aliasesSET 表达式:当实体名称为用户占位名时,无论传入的 entity.aliases 如何,都保持 e.aliases 不变
  • 对非用户实体保持现有行为:当传入别名存在时设置或合并别名
api/app/repositories/neo4j/cypher_queries.py

Tips and commands

与 Sourcery 交互

  • 触发新评审: 在 pull request 中评论 @sourcery-ai review
  • 继续讨论: 直接回复 Sourcery 的评审评论。
  • 从评审评论生成 GitHub issue: 在某条评审评论下回复,要求 Sourcery 从该评论创建 issue。你也可以直接回复 @sourcery-ai issue,从该评论创建一个 issue。
  • 生成 pull request 标题: 在 pull request 标题的任意位置写入 @sourcery-ai 即可随时生成标题;你也可以在 PR 中评论 @sourcery-ai title 来(重新)生成标题。
  • 生成 pull request 摘要: 在 pull request 正文任意位置写入 @sourcery-ai summary,即可在该位置生成 PR 摘要;你也可以在 PR 中评论 @sourcery-ai summary 来(重新)生成摘要。
  • 生成 Reviewer's Guide: 在 pull request 中评论 @sourcery-ai guide,即可(重新)生成 reviewer's guide。
  • 一次性解决所有 Sourcery 评论: 在 pull request 中评论 @sourcery-ai resolve,将所有 Sourcery 评论标记为已解决。适用于你已处理完所有评论且不希望再看到它们的情况。
  • 清除所有 Sourcery 评审: 在 pull request 中评论 @sourcery-ai dismiss,以清除所有现有的 Sourcery 评审。若你希望从一次全新的评审开始,尤其有用——别忘了随后评论 @sourcery-ai review 触发新评审!

自定义使用体验

打开你的 dashboard 来:

  • 启用或禁用评审特性,例如由 Sourcery 生成的 pull request 摘要、reviewer's guide 等。
  • 更改评审语言。
  • 添加、移除或编辑自定义评审指令。
  • 调整其他评审设置。

获取帮助

Original review guide in English

Reviewer's Guide

This PR makes PostgreSQL the authoritative source of user aliases and adjusts the extraction, deduplication, Neo4j write path, and analytics query so that Neo4j no longer feeds aliases back into PgSQL, and user entities in Neo4j are synchronized from PgSQL instead, while fixing an entity query label/property mismatch.

Sequence diagram for PgSQL→Neo4j user alias synchronization in write path

sequenceDiagram
    actor Client
    participant WriteService as write_tools_write
    participant PgSQL as PgSQL_end_user_info
    participant Neo4j as Neo4j_ExtractedEntity
    participant Celery as Celery_clustering_task

    Client->>WriteService: write(all_entity_nodes, memory_config, ...)
    WriteService->>Neo4j: execute_query(save ExtractedEntity nodes)
    Neo4j-->>WriteService: success

    alt all_entity_nodes not empty
        WriteService->>WriteService: end_user_id = all_entity_nodes[0].end_user_id
        opt end_user_id exists
            WriteService->>PgSQL: EndUserInfoRepository.get_by_end_user_id(end_user_id)
            PgSQL-->>WriteService: info.aliases as pg_aliases
            alt pg_aliases not empty
                WriteService->>Neo4j: MATCH ExtractedEntity WHERE end_user_id = end_user_id AND name in 用户/我/User/I SET aliases = pg_aliases
                Neo4j-->>WriteService: aliases updated
            else pg_aliases empty
                WriteService->>WriteService: skip alias sync
            end
        end

        WriteService->>Celery: run_incremental_clustering.apply_async(...)
        Celery-->>WriteService: task_id
    else no entities
        WriteService->>WriteService: skip alias sync and clustering
    end

    WriteService-->>Client: write result
Loading

Sequence diagram for updating PgSQL user aliases from current dialog extraction

sequenceDiagram
    participant Engine as ExtractionEngine
    participant Orchestrator as extraction_orchestrator
    participant PgSQL as PgSQL_end_user_info
    participant Neo4j as Neo4j_ExtractedEntity

    Engine->>Orchestrator: _update_end_user_other_name(entity_nodes, dialog_data_list, end_user_id)

    Orchestrator->>Orchestrator: current_aliases = _extract_current_aliases(entity_nodes, dialog_data_list)
    Orchestrator->>Neo4j: _fetch_neo4j_assistant_aliases(end_user_id)
    Neo4j-->>Orchestrator: neo4j_assistant_aliases

    Orchestrator->>Orchestrator: filter current_aliases by assistant aliases
    alt current_aliases empty
        Orchestrator-->>Engine: return (no update)
    else
        Orchestrator->>PgSQL: EndUserRepository.get(end_user_id)
        PgSQL-->>Orchestrator: end_user
        alt end_user not found
            Orchestrator-->>Engine: return
        else
            Orchestrator->>PgSQL: EndUserInfoRepository.get_by_end_user_id(end_user_id)
            PgSQL-->>Orchestrator: info.aliases as db_aliases
            Orchestrator->>Orchestrator: filter placeholder names in db_aliases
            Orchestrator->>Orchestrator: merged_aliases = db_aliases + current_aliases (dedup, keep order)

            alt info exists
                Orchestrator->>PgSQL: update EndUserInfo.aliases = merged_aliases
            else
                Orchestrator->>PgSQL: insert EndUserInfo with aliases = merged_aliases
            end

            Orchestrator-->>Engine: done
        end
    end
Loading

File-Level Changes

Change Details Files
Stop using deduplication logic to modify aliases for user entities; their aliases are no longer merged from extracted data or Neo4j history.
  • Guard alias merging in _merge_attribute so that entities whose canonical name matches known user placeholder names skip alias normalization and updates, preserving their aliases
  • Short-circuit _merge_entities_with_aliases for user placeholder entities so their aliases are not modified during fuzzy-merge; for other entities, keep the normalized merge behavior with a simplified code path
api/app/core/memory/storage_services/extraction_engine/deduplication/deduped_and_disamb.py
Change end-user alias synchronization to treat PgSQL as the single source of truth and only append newly extracted aliases from the current dialog.
  • Update _update_end_user_other_name to rely on aliases extracted from the current dialog only, removing deduped_aliases that previously included Neo4j history
  • Remove filtering and logging related to deduped_aliases and use only current_aliases when checking for emptiness and when computing the first_alias
  • Merge aliases as PgSQL existing aliases plus current_aliases with de-duplication, avoiding any Neo4j-derived alias data
api/app/core/memory/storage_services/extraction_engine/extraction_orchestrator.py
After a successful Neo4j write, push PgSQL user aliases back into Neo4j so that Neo4j user entities reflect the PgSQL authority.
  • After write_to_neo4j succeeds and entity nodes exist, load end_user_info.aliases from PgSQL using EndUserInfoRepository
  • If PgSQL has aliases, run a Neo4j query to find ExtractedEntity nodes for the end user whose name is one of the user placeholder names and overwrite their aliases with the PgSQL aliases
  • Wrap the PgSQL→Neo4j sync in a try/except and log failures without impacting the main flow, then proceed to schedule the clustering Celery task as before
api/app/core/memory/agent/utils/write_tools.py
Fix the emotion analytics query to use the correct Neo4j label and type property for entities.
  • Change the Cypher in _get_simple_user_profile to match on :ExtractedEntity instead of :Entity
  • Return e.entity_type as type instead of e.type to align with the current schema
api/app/services/emotion_analytics_service.py
Ensure Neo4j MERGE logic does not overwrite user aliases during entity upserts.
  • Update the aliases SET expression in the Neo4j cypher so that when the entity name is a user placeholder, e.aliases is left unchanged regardless of incoming entity.aliases
  • Keep the existing behavior for non-user entities, which sets or merges aliases when incoming aliases are present
api/app/repositories/neo4j/cypher_queries.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 3 个问题,并留下了一些整体性的反馈:

  • 用户占位名的特殊处理在多个模块中存在重复(例如 _USER_PLACEHOLDER_NAMES,以及在 Cypher 和 write_tools 中硬编码的 ['用户','我','User','I']);建议把这份列表集中到一个共享常量中,以避免后续出现不一致。
  • write_tools.write 中,end_user_id 是直接从 all_entity_nodes[0] 取得的,没有校验所有节点是否共享相同的 end_user_id;你可能需要加断言或校验其一致性,以避免在边缘场景中把别名同步到错误的用户。
  • PgSQL→Neo4j 的别名同步只在 pg_aliases 非空时才会运行,这意味着一旦 Neo4j 中的别名被设置,就无法被清空;如果“清空别名”是一个合法场景,建议显式处理空列表的情况,而不是直接跳过。
给 AI Agents 的提示
Please address the comments from this code review:

## Overall Comments
- 用户占位名的特殊处理在多个模块中存在重复(例如 `_USER_PLACEHOLDER_NAMES`,以及在 Cypher 和 write_tools 中硬编码的 `['用户','我','User','I']`);建议把这份列表集中到一个共享常量中,以避免后续出现不一致。
-`write_tools.write` 中,`end_user_id` 是直接从 `all_entity_nodes[0]` 取得的,没有校验所有节点是否共享相同的 `end_user_id`;你可能需要加断言或校验其一致性,以避免在边缘场景中把别名同步到错误的用户。
- PgSQL→Neo4j 的别名同步只在 `pg_aliases` 非空时才会运行,这意味着一旦 Neo4j 中的别名被设置,就无法被清空;如果“清空别名”是一个合法场景,建议显式处理空列表的情况,而不是直接跳过。

## Individual Comments

### Comment 1
<location path="api/app/core/memory/agent/utils/write_tools.py" line_range="204-209" />
<code_context>
+                            with get_db_context() as db_session:
+                                info = EndUserInfoRepository(db_session).get_by_end_user_id(uuid.UUID(end_user_id))
+                                pg_aliases = info.aliases if info and info.aliases else []
+                            if pg_aliases:
+                                await neo4j_connector.execute_query(
+                                    """
+                                    MATCH (e:ExtractedEntity)
+                                    WHERE e.end_user_id = $end_user_id AND e.name IN ['用户', '我', 'User', 'I']
+                                    SET e.aliases = $aliases
+                                    """,
+                                    end_user_id=end_user_id, aliases=pg_aliases,
</code_context>
<issue_to_address>
**issue (bug_risk):** 请考虑在 PgSQL 别名列表为空时,同步清空 Neo4j 中对应用户实体的别名。

由于当前查询只在 `pg_aliases` 非空时执行,因此任何在 PgSQL 中“有意清空别名”的操作(例如用户主动删除数据、清理任务)都不会反映到 Neo4j 中,从而留下过期的别名。为了确保 Neo4j 以 PgSQL 为权威数据源,建议在 `info` 存在时始终执行该查询,并在 `pg_aliases` 为空时,将 `e.aliases` 设置为空列表(或根据你的约定设置为 `NULL`)。
</issue_to_address>

### Comment 2
<location path="api/app/core/memory/agent/utils/write_tools.py" line_range="207-208" />
<code_context>
+                            if pg_aliases:
+                                await neo4j_connector.execute_query(
+                                    """
+                                    MATCH (e:ExtractedEntity)
+                                    WHERE e.end_user_id = $end_user_id AND e.name IN ['用户', '我', 'User', 'I']
+                                    SET e.aliases = $aliases
+                                    """,
</code_context>
<issue_to_address>
**suggestion (bug_risk):** 建议避免在 Cypher 中硬编码用户占位名,以减少与 Python 侧占位名集合漂移的风险。

此处的 `IN ['用户', '我', 'User', 'I']` 过滤条件与 Python 中定义的占位名集合(`_USER_PLACEHOLDER_NAMES` / `USER_PLACEHOLDER_NAMES`)形成了重复。如果这两者出现不一致,一些用户实体可能无法再收到别名更新。请考虑将 Python 中的占位名列表作为参数传入该查询,或改为使用 Python 和 Cypher 共同依赖的共享配置。

建议实现:

```python
                            if pg_aliases:
                                await neo4j_connector.execute_query(
                                    """
                                    MATCH (e:ExtractedEntity)
                                    WHERE e.end_user_id = $end_user_id AND e.name IN $user_placeholder_names
                                    SET e.aliases = $aliases
                                    """,
                                    end_user_id=end_user_id,
                                    aliases=pg_aliases,
                                    user_placeholder_names=USER_PLACEHOLDER_NAMES,
                                )
                                logger.info(f"[AliasSync] Neo4j 用户实体 aliases 已用 PgSQL 权威源覆盖: {pg_aliases}")

```

1. 确保 `USER_PLACEHOLDER_NAMES`(或对应的常量)在该模块中已被导入或可用:
   - 如果在本文件中定义,请确认其命名为 `USER_PLACEHOLDER_NAMES`- 如果定义在其他位置(例如 `app.core.memory.agent.constants`),请在文件顶部增加如下导入:
   `from app.core.memory.agent.constants import USER_PLACEHOLDER_NAMES`2. 如果规范的常量名称是 `_USER_PLACEHOLDER_NAMES`,则可以:
   - 直接导入并起别名为 `USER_PLACEHOLDER_NAMES`,或者
   - 将参数值改为实际名称(例如 `user_placeholder_names=_USER_PLACEHOLDER_NAMES`),其余保持不变。
</issue_to_address>

### Comment 3
<location path="api/app/core/memory/storage_services/extraction_engine/deduplication/deduped_and_disamb.py" line_range="91-98" />
<code_context>
-                    unique_aliases.append(alias_stripped)
+            # 收集所有需要合并的别名
+            all_aliases = list(getattr(canonical, "aliases", []) or [])
+            if incoming_name and incoming_name != canonical_name:
+                all_aliases.append(incoming_name)
+            all_aliases.extend(getattr(ent, "aliases", []) or [])

-            # 排序并赋值
-            canonical.aliases = sorted(unique_aliases)
+            try:
+                from app.core.memory.utils.alias_utils import normalize_aliases
</code_context>
<issue_to_address>
**suggestion (bug_risk):** 建议在合并别名时过滤掉用户占位名,避免污染非用户实体的别名集合。

在当前合并逻辑中,只要 `canonical_name` 不在 `_USER_PLACEHOLDER_NAMES` 中,就会把来自 `ent` 的别名(包括 `incoming_name`)添加到 `canonical`。如果一个带有用户占位名(例如“我”/“User”)的实体被合并进一个非用户的 canonical 实体,那么该占位名就会被存储为该实体的别名,这与“用户别名仅通过 PgSQL 管理”的设计意图相冲突。为避免这一问题,请在处理 `incoming_name``ent.aliases` 时,跳过所有出现在 `_USER_PLACEHOLDER_NAMES` 中的别名(不区分大小写)。

```suggestion
            # 收集所有需要合并的别名
            all_aliases = list(getattr(canonical, "aliases", []) or [])

            # 过滤掉用户占位名,避免污染非用户实体的别名集合(对 incoming_name)
            if incoming_name and incoming_name != canonical_name:
                incoming_name_stripped = incoming_name.strip()
                if incoming_name_stripped and incoming_name_stripped.lower() not in _USER_PLACEHOLDER_NAMES:
                    all_aliases.append(incoming_name_stripped)

            # 过滤掉用户占位名,避免污染非用户实体的别名集合(对 ent.aliases)
            ent_aliases = getattr(ent, "aliases", []) or []
            filtered_ent_aliases = []
            for alias in ent_aliases:
                if not isinstance(alias, str):
                    continue
                alias_stripped = alias.strip()
                if not alias_stripped:
                    continue
                if alias_stripped.lower() in _USER_PLACEHOLDER_NAMES:
                    continue
                filtered_ent_aliases.append(alias_stripped)

            all_aliases.extend(filtered_ent_aliases)

            try:
                from app.core.memory.utils.alias_utils import normalize_aliases
```
</issue_to_address>

Sourcery 对开源项目永久免费——如果你觉得我们的 Review 有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续的 Review。
Original comment in English

Hey - I've found 3 issues, and left some high level feedback:

  • The special-casing of user placeholder names is duplicated across modules (e.g., _USER_PLACEHOLDER_NAMES, hardcoded ['用户','我','User','I'] in Cypher and write_tools); consider centralizing this list in a shared constant to avoid divergence.
  • In write_tools.write, end_user_id is taken from all_entity_nodes[0] without checking that all nodes share the same end_user_id; you may want to assert or validate homogeneity to avoid syncing aliases to the wrong user in edge cases.
  • The PgSQL→Neo4j alias sync only runs when pg_aliases is non-empty, which means Neo4j aliases can never be cleared once set; if clearing aliases is a valid scenario, consider explicitly handling the empty-list case instead of skipping.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The special-casing of user placeholder names is duplicated across modules (e.g., `_USER_PLACEHOLDER_NAMES`, hardcoded `['用户','我','User','I']` in Cypher and write_tools); consider centralizing this list in a shared constant to avoid divergence.
- In `write_tools.write`, `end_user_id` is taken from `all_entity_nodes[0]` without checking that all nodes share the same `end_user_id`; you may want to assert or validate homogeneity to avoid syncing aliases to the wrong user in edge cases.
- The PgSQL→Neo4j alias sync only runs when `pg_aliases` is non-empty, which means Neo4j aliases can never be cleared once set; if clearing aliases is a valid scenario, consider explicitly handling the empty-list case instead of skipping.

## Individual Comments

### Comment 1
<location path="api/app/core/memory/agent/utils/write_tools.py" line_range="204-209" />
<code_context>
+                            with get_db_context() as db_session:
+                                info = EndUserInfoRepository(db_session).get_by_end_user_id(uuid.UUID(end_user_id))
+                                pg_aliases = info.aliases if info and info.aliases else []
+                            if pg_aliases:
+                                await neo4j_connector.execute_query(
+                                    """
+                                    MATCH (e:ExtractedEntity)
+                                    WHERE e.end_user_id = $end_user_id AND e.name IN ['用户', '我', 'User', 'I']
+                                    SET e.aliases = $aliases
+                                    """,
+                                    end_user_id=end_user_id, aliases=pg_aliases,
</code_context>
<issue_to_address>
**issue (bug_risk):** Consider handling the case where PgSQL has an empty alias list by also clearing Neo4j aliases for user entities.

Because the query only runs when `pg_aliases` is non-empty, any intentional clearing of aliases in PgSQL (e.g., user removes data, cleanup job) will not be reflected in Neo4j, leaving stale aliases. To ensure Neo4j reflects PgSQL as the source of truth, consider always executing the query when `info` exists and setting `e.aliases` to an empty list (or `NULL`, per your conventions) when `pg_aliases` is empty.
</issue_to_address>

### Comment 2
<location path="api/app/core/memory/agent/utils/write_tools.py" line_range="207-208" />
<code_context>
+                            if pg_aliases:
+                                await neo4j_connector.execute_query(
+                                    """
+                                    MATCH (e:ExtractedEntity)
+                                    WHERE e.end_user_id = $end_user_id AND e.name IN ['用户', '我', 'User', 'I']
+                                    SET e.aliases = $aliases
+                                    """,
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Avoid hardcoding the user placeholder names in Cypher to reduce drift with the Python-side placeholder set.

This `IN ['用户', '我', 'User', 'I']` filter duplicates the placeholder-name set defined in Python (`_USER_PLACEHOLDER_NAMES` / `USER_PLACEHOLDER_NAMES`). If these fall out of sync, some user entities may stop receiving alias updates. Please either pass the Python placeholder list into this query as a parameter or move the list to shared config used by both Python and Cypher.

Suggested implementation:

```python
                            if pg_aliases:
                                await neo4j_connector.execute_query(
                                    """
                                    MATCH (e:ExtractedEntity)
                                    WHERE e.end_user_id = $end_user_id AND e.name IN $user_placeholder_names
                                    SET e.aliases = $aliases
                                    """,
                                    end_user_id=end_user_id,
                                    aliases=pg_aliases,
                                    user_placeholder_names=USER_PLACEHOLDER_NAMES,
                                )
                                logger.info(f"[AliasSync] Neo4j 用户实体 aliases 已用 PgSQL 权威源覆盖: {pg_aliases}")

```

1. Ensure that `USER_PLACEHOLDER_NAMES` (or the appropriate constant) is imported or available in this module:
   - If defined in this file, make sure it is named `USER_PLACEHOLDER_NAMES`.
   - If defined elsewhere (e.g. `app.core.memory.agent.constants`), add an import at the top of this file, such as:
   `from app.core.memory.agent.constants import USER_PLACEHOLDER_NAMES`.
2. If the canonical constant is named `_USER_PLACEHOLDER_NAMES` instead, either:
   - Import it and alias it to `USER_PLACEHOLDER_NAMES`, or
   - Change the parameter value to use the actual name (e.g. `user_placeholder_names=_USER_PLACEHOLDER_NAMES`) and keep the rest the same.
</issue_to_address>

### Comment 3
<location path="api/app/core/memory/storage_services/extraction_engine/deduplication/deduped_and_disamb.py" line_range="91-98" />
<code_context>
-                    unique_aliases.append(alias_stripped)
+            # 收集所有需要合并的别名
+            all_aliases = list(getattr(canonical, "aliases", []) or [])
+            if incoming_name and incoming_name != canonical_name:
+                all_aliases.append(incoming_name)
+            all_aliases.extend(getattr(ent, "aliases", []) or [])

-            # 排序并赋值
-            canonical.aliases = sorted(unique_aliases)
+            try:
+                from app.core.memory.utils.alias_utils import normalize_aliases
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider filtering out user placeholder names from incoming aliases to avoid polluting non-user entities.

In this merge path, aliases from `ent` (including `incoming_name`) are added to `canonical` whenever `canonical_name` is not in `_USER_PLACEHOLDER_NAMES`. If an entity with a user placeholder name (e.g., “我”/“User”) is merged into a non-user canonical entity, that placeholder will be stored as an alias, which conflicts with the intent that user aliases are handled only via PgSQL. To prevent this, skip adding aliases that are in `_USER_PLACEHOLDER_NAMES` (case-insensitive) for both `incoming_name` and `ent.aliases`.

```suggestion
            # 收集所有需要合并的别名
            all_aliases = list(getattr(canonical, "aliases", []) or [])

            # 过滤掉用户占位名,避免污染非用户实体的别名集合(对 incoming_name)
            if incoming_name and incoming_name != canonical_name:
                incoming_name_stripped = incoming_name.strip()
                if incoming_name_stripped and incoming_name_stripped.lower() not in _USER_PLACEHOLDER_NAMES:
                    all_aliases.append(incoming_name_stripped)

            # 过滤掉用户占位名,避免污染非用户实体的别名集合(对 ent.aliases)
            ent_aliases = getattr(ent, "aliases", []) or []
            filtered_ent_aliases = []
            for alias in ent_aliases:
                if not isinstance(alias, str):
                    continue
                alias_stripped = alias.strip()
                if not alias_stripped:
                    continue
                if alias_stripped.lower() in _USER_PLACEHOLDER_NAMES:
                    continue
                filtered_ent_aliases.append(alias_stripped)

            all_aliases.extend(filtered_ent_aliases)

            try:
                from app.core.memory.utils.alias_utils import normalize_aliases
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

…logic

- Replace hardcoded user placeholder name lists in write_tools and
user_memory_service with shared _USER_PLACEHOLDER_NAMES constant
- Filter user placeholder names during alias merging in _merge_attribute
  to prevent cross-role alias contamination on non-user entities
- Use toLower() in Cypher query for case-insensitive name matching
- Change PgSQL->Neo4j alias sync condition from 'if pg_aliases' to
  'if info is not None' so empty aliases correctly clear stale data
@keeees keeees merged commit 29aef45 into release/v0.3.0 Apr 14, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants