TuGraph安装与启动

在docker中拉取镜像

docker pull tugraph/tugraph-runtime-arm64v8-centos7

使用Docker Desktop启动Tugraph

在浏览器中输入https://localhost:7070

输入数据库地址:127.0.0.1:7687,默认用户名:admin,默认密码:73@TuGraph

图:TuGraph服务成功启动后的Web界面

数据导入与图谱构建

创建图谱

数据导入支持多种格式,包括CSV和JSON。通过neo4j 的GraphDatabase工具进行批量导入。

步骤 1:生成图谱结构配置
  • 工具脚本:write_conf.py

  • 核心操作:读取disease3.csv列结构,生成example.json配置文件。

    • 定义 14 类顶点(实体):Disease(疾病)、Alias(别名)、Symptom(症状)等,统一以name为唯一主键(非空、带索引)。

    • 定义 13 类边(关系):HAS_ALIAS(疾病 - 别名)、HAS_SYMPTOM(疾病 - 症状)等,明确边的连接约束(如 Disease→Symptom)。

  • 关键作用:避免硬编码节点 / 边结构,让图谱构建更灵活可维护。

步骤 2:拆分节点与边数据
  • 工具脚本:write_V_E.py

  • 核心操作:解析disease3.csv,按example.json的结构,生成两类文件:

    • 节点 CSV:每个顶点类型对应 1 个 CSV(如Disease.csvSymptom.csv),存储实体的唯一值(去重后)。

    • 边 CSV:每个边类型对应 1 个 CSV(如HAS_SYMPTOM.csv),存储 “源实体 - 目标实体” 的关联关系(如 “乙肝 - 转氨酶增高”)。

  • 关键处理:拆分多值字段(如将 “转氨酶增高 肝肿大” 拆分为两条独立边数据)。

步骤 3:图谱导入与结构创建
  • 工具脚本:tugraph_build_graph.py

  • 核心操作:

    1. 连接 TuGraph:通过 Neo4j 驱动连接数据库,验证连通性。

    2. 初始化数据库:删除旧库(避免冲突),创建目标数据库(如disease_graph)。

    3. 创建约束:基于example.json,为所有顶点的name属性创建唯一约束(确保实体不重复)。

    4. 批量导入:读取节点 / 边 CSV 文件,通过 Cypher 的MERGE语句(避免重复)批量导入节点和边。

        以下为tugraph_build_graph.py的核心代码片段:

from neo4j import GraphDatabase

def build_tu_graph():
    try:
        # 1. 读取配置文件
        conf = read_tu_conf(TU_CONFIG["conf_path"])
        vertex_conf = conf["vertex"]  # 节点配置
        edge_conf = conf["edge"]      # 边配置
        
        # 提取所有节点标签(如["Disease", "Alias", "Part"...])
        vertex_labels = [v["label"] for v in vertex_conf]
        # 提取所有边标签(如["HAS_ALIAS", "IS_OF_PART"...])
        edge_labels = [e["label"] for e in edge_conf]
        
        # 2. 验证CSV文件(节点+边)
        check_csv_exist(TU_CONFIG["csv_dir"], vertex_labels)
        # 验证边CSV(每个边标签对应一个CSV)
        edge_csvs = [f"{label}.csv" for label in edge_labels]
        missing_edge_csvs = []
        for csv_name in edge_csvs:
            csv_path = os.path.join(TU_CONFIG["csv_dir"], csv_name)
            if not os.path.exists(csv_path):
                missing_edge_csvs.append(csv_path)
        if missing_edge_csvs:
            raise FileNotFoundError(f"❌ 以下边CSV文件缺失:\n{', '.join(missing_edge_csvs)}")
        print("✅ 所有边CSV文件均存在")
        
        # 3. 连接TuGraph
        driver = GraphDatabase.driver(TU_CONFIG["uri"], auth=TU_CONFIG["auth"])
        driver.verify_connectivity()
        print(f"✅ 成功连接TuGraph(URI: {TU_CONFIG['uri']})")
        
        with driver.session(database=TU_CONFIG["db_name"]) as session:
            # 4. 初始化数据库:删除旧库(避免schema冲突)
            # db.dropDB() 不接受参数,会清空当前连接的数据库
            try:
                session.run("CALL db.dropDB()")
                print(f"✅ 已清空数据库:{TU_CONFIG['db_name']}")
            except Exception as e:
                # 如果数据库不存在或清空失败,继续执行(可能是新数据库)
                print(f"⚠️  数据库清空操作:{str(e)},继续执行...")
            print(f"✅ 数据库初始化完成(数据库:{TU_CONFIG['db_name']})")
            
            # 5. 基于配置创建节点标签(严格遵循example.json的属性定义)
            print("\n===== 开始创建节点标签 =====")
            for v in vertex_conf:
                label = v["label"]
                properties = v["properties"]  # 从配置获取属性(如[{"name":"name","type":"STRING"...}])
                primary_key = v["primary"]    # 从配置获取主键(如"name")
                
                # 找到主键属性
                primary_prop = [p for p in properties if p['name'] == primary_key][0]
                primary_type = primary_prop['type']
                primary_optional = str(primary_prop['optional']).lower()
                
                # 构建参数列表:标签名, 主键字段名, 主键属性名, 主键类型, 主键是否可选, [其他属性...]
                # 格式参考:CALL db.createVertexLabel('person', 'id', 'id', 'INT32', false, 'name', 'STRING', false)
                params = [
                    f"'{label}'",
                    f"'{primary_key}'",
                    f"'{primary_key}'",
                    f"'{primary_type}'",
                    primary_optional
                ]
                
                # 添加其他非主键属性
                for prop in properties:
                    if prop['name'] != primary_key:
                        params.append(f"'{prop['name']}'")
                        params.append(f"'{prop['type']}'")
                        params.append(str(prop['optional']).lower())
                
                # 构建单行 Cypher 查询(TuGraph 要求单行格式)
                cypher_query = f"CALL db.createVertexLabel({', '.join(params)})"
                session.run(cypher_query)
                print(f"✅ 节点标签创建完成:{label}(主键:{primary_key})")
            
            # 6. 基于配置创建边标签(严格遵循example.json的约束)
            print("\n===== 开始创建边标签 =====")
            for e in edge_conf:
                label = e["label"]
                constraints = e["constraints"]  # 从配置获取连接约束(如[["Disease","Alias"]])
                # 构建约束JSON字符串:格式为 [["源节点","目标节点"]]
                # 示例:'[["person","person"]]'
                constraint_json = json.dumps(constraints)
                
                # 构建单行 Cypher 查询(TuGraph 要求单行格式)
                # 格式参考:CALL db.createEdgeLabel('is_friend','[["person","person"]]')
                cypher_query = f"CALL db.createEdgeLabel('{label}', '{constraint_json}')"
                session.run(cypher_query)
                print(f"✅ 边标签创建完成:{label}(约束:{constraints[0][0]}→{constraints[0][1]})")
            
            # 7. 导入节点数据(从write_V_E生成的CSV读取,去重导入)
            print("\n===== 开始导入节点数据 =====")
            for v in vertex_conf:
                label = v["label"]
                csv_path = os.path.join(TU_CONFIG["csv_dir"], f"{label}.csv")
                # 读取CSV:每行一个节点值(write_V_E生成的格式)
                df = pd.read_csv(csv_path, header=None, names=["name"], encoding="utf-8")
                # 去重:避免write_V_E生成重复节点
                df_unique = df.drop_duplicates(subset=["name"]).dropna()
                
                # 批量导入节点(MERGE避免重复)
                for _, row in df_unique.iterrows():
                    node_name = row["name"].strip()
                    if node_name in ["nan", "", "None"]:
                        continue
                    # 使用单行 Cypher 查询(TuGraph 要求)
                    cypher_query = f"MERGE (n:{label} {{name: $name}}) ON CREATE SET n.name = $name"
                    session.run(cypher_query, name=node_name)
                print(f"✅ 节点数据导入完成:{label}(共{len(df_unique)}个唯一节点)")
            
            # 8. 导入边数据(从write_V_E生成的CSV读取,关联节点)
            print("\n===== 开始导入边数据 =====")
            for e in edge_conf:
                edge_label = e["label"]
                constraints = e["constraints"][0]  # 源节点标签、目标节点标签(如["Disease","Alias"])
                src_label = constraints[0]
                dst_label = constraints[1]
                csv_path = os.path.join(TU_CONFIG["csv_dir"], f"{edge_label}.csv")
                
                # 读取边CSV:每行格式为"源节点名 目标节点名"(write_V_E生成的格式)
                df = pd.read_csv(csv_path, header=None, names=["src_name", "dst_name"], 
                                 sep="\s+", encoding="utf-8")  # 空格分隔
                df_unique = df.drop_duplicates().dropna()
                
                # 批量导入边(MATCH源/目标节点,MERGE边避免重复)
                for _, row in df_unique.iterrows():
                    src_name = row["src_name"].strip()
                    dst_name = row["dst_name"].strip()
                    if src_name in ["nan", ""] or dst_name in ["nan", ""]:
                        continue
                    # 使用单行 Cypher 查询(TuGraph 要求)
                    cypher_query = f"MATCH (src:{src_label} {{name: $src_name}}) MATCH (dst:{dst_label} {{name: $dst_name}}) MERGE (src)-[r:{edge_label}]->(dst)"
                    session.run(cypher_query, src_name=src_name, dst_name=dst_name)
                print(f"✅ 边数据导入完成:{edge_label}(共{len(df_unique)}个唯一关系)")
        
        print("\n🎉 TuGraph疾病图谱构建完成!")
        driver.close()
    
    except Exception as e:
        print(f"\n❌ 图谱构建失败:{str(e)}")
        raise

运行结果:

===== 开始导入节点数据 =====
✅ 节点数据导入完成:Disease(共28个唯一节点)
✅ 节点数据导入完成:Alias(共46个唯一节点)
✅ 节点数据导入完成:Part(共22个唯一节点)
✅ 节点数据导入完成:Age(共48个唯一节点)
✅ 节点数据导入完成:Infection(共2个唯一节点)
✅ 节点数据导入完成:Insurance(共2个唯一节点)
✅ 节点数据导入完成:Department(共26个唯一节点)
✅ 节点数据导入完成:Checklist(共118个唯一节点)
✅ 节点数据导入完成:Symptom(共122个唯一节点)
✅ 节点数据导入完成:Complication(共79个唯一节点)
✅ 节点数据导入完成:Treatment(共21个唯一节点)
✅ 节点数据导入完成:Drug(共77个唯一节点)
✅ 节点数据导入完成:Period(共23个唯一节点)
✅ 节点数据导入完成:Rate(共21个唯一节点)
✅ 节点数据导入完成:Money(共19个唯一节点)

===== 开始导入边数据 =====
✅ 边数据导入完成:HAS_ALIAS(共46个唯一关系)
✅ 边数据导入完成:IS_OF_PART(共31个唯一关系)
✅ 边数据导入完成:IS_OF_AGE(共57个唯一关系)
✅ 边数据导入完成:IS_INFECTIOUS(共28个唯一关系)
✅ 边数据导入完成:In_Insurance(共28个唯一关系)
✅ 边数据导入完成:IS_OF_Department(共48个唯一关系)
✅ 边数据导入完成:HAS_Checklist(共127个唯一关系)
✅ 边数据导入完成:HAS_SYMPTOM(共134个唯一关系)
✅ 边数据导入完成:HAS_Complication(共91个唯一关系)
✅ 边数据导入完成:HAS_Treatment(共52个唯一关系)
✅ 边数据导入完成:HAS_Drug(共79个唯一关系)
✅ 边数据导入完成:Cure_Period(共27个唯一关系)
✅ 边数据导入完成:Cure_Rate(共29个唯一关系)
✅ 边数据导入完成:NEED_Money(共37个唯一关系)

导入完成后,Web网页会显示图谱的实体关系和属性信息。


图:图谱数据导入后的可视化展示

Cypher查询与Python交互

TuGraph支持通过Cypher语句进行复杂查询,参考 cypher_python_demo.py:

from neo4j import GraphDatabase  # TuGraph兼容Neo4j驱动,无需额外安装

# -------------------------- 1. 基础配置(适配TuGraph) --------------------------
TU_CONFIG = {
    "uri": "bolt://localhost:7687",  # TuGraph默认Bolt端口
    "auth": ("admin", "73@TuGraph"), # TuGraph默认账号/密码
    "db_name": "disease_graph"       # 图谱数据库名
}

# -------------------------- 2. 核心交互类(简化版) --------------------------
class TuGraphCypherClient:
    def __init__(self):
        # 初始化连接
        self.driver = GraphDatabase.driver(
            TU_CONFIG["uri"], 
            auth=TU_CONFIG["auth"]
        )
    
    def close(self):
        # 关闭连接
        self.driver.close()
    
    def run_cypher(self, cypher, params=None):
        """执行Cypher查询,返回结果列表"""
        with self.driver.session(database=TU_CONFIG["db_name"]) as session:
            result = session.run(cypher, params or {})
            # 转换结果为列表(方便处理)
            return [record.data() for record in result]

# -------------------------- 3. 示例:执行核心验证查询 --------------------------
if __name__ == "__main__":
    # 初始化客户端
    client = TuGraphCypherClient()
    print("✅ 连接TuGraph成功!\n")

    # ========== 示例1:统计图谱总顶点/边数 ==========
    print("【1. 统计图谱总数】")
    # 统计总顶点数
    vertex_count = client.run_cypher("MATCH (n) RETURN count(n) AS total_vertices")[0]
    # 统计总边数
    edge_count = client.run_cypher("MATCH ()-[r]->() RETURN count(r) AS total_edges")[0]
    print(f"总实体数:{vertex_count['total_vertices']}")
    print(f"总关系数:{edge_count['total_edges']}\n")

    # ========== 示例2:查询单个实体的属性 ==========
    print("【2. 查询乙肝的实体属性】")
    disease_prop = client.run_cypher(
        "MATCH (d:Disease {name: $name}) RETURN properties(d) AS prop",
        params={"name": "乙肝"}  # 参数化查询(避免注入)
    )
    if disease_prop:
        print(f"乙肝属性:{disease_prop[0]['prop']}\n")
    else:
        print("未找到乙肝实体\n")

    # ========== 示例3:查询实体的关联关系 ==========
    print("【3. 查询乙肝的症状关系】")
    symptom_rels = client.run_cypher("""
        MATCH (d:Disease {name: $name})-[r:HAS_SYMPTOM]->(s:Symptom)
        RETURN s.name AS symptom_name
    """, params={"name": "乙肝"})
    print("乙肝的症状:")
    for idx, rel in enumerate(symptom_rels, 1):
        print(f"  {idx}. {rel['symptom_name']}")

    # 关闭连接
    client.close()
    print("\n✅ 所有查询执行完成!")

运行结果:


✅ 连接TuGraph成功!

【1. 统计图谱总数】
总实体数:654
总关系数:814

【2. 查询乙肝的实体属性】
乙肝属性:{"_LABEL_":"Disease","_VID_":1,"name":"乙肝"}

【3. 查询乙肝的症状关系】
乙肝的症状:
  1. 转氨酶增高
  2. 肝肿大
  3. 乙肝表面抗原(HBsAg)阳性
  4. 乙肝e抗原(HBeAg)阳性
  5. 肝功能异常

✅ 所有查询执行完成!

常见操作包括创建节点、建立关系、属性过滤和路径查询。批量操作时建议使用事务处理。

对话系统交互实现

基于TuGraph的对话系统需要处理自然语言到Cypher的转换。典型交互流程包括:

  • 用户输入自然语言查询
  • 系统解析生成Cypher语句
  • 执行查询并返回结构化结果
  • 将结果转换为自然语言响应

以下为tugraph_interaction.py的核心代码:

def tu_graph_interaction():
    print("="*60)
    print("🎯 TuGraph疾病图谱交互系统")
    print("📌 支持查询场景:症状、药物、并发症、治疗方法、检查项目、治疗周期、治愈率、治疗费用")
    print("📌 支持操作:查询(输入示例:查询乙肝的症状)、更新描述(输入示例:更新乙肝描述 乙肝是...)、删除测试(输入示例:删除测试疾病X)")
    print("📌 退出系统:输入「退出」或「quit」")
    print("="*60)
    
    while True:
        user_input = input("\n请输入您的操作:").strip()
        if not user_input:
            continue
        
        # 处理退出
        if user_input.lower() in ["退出", "quit", "exit"]:
            print("👋 感谢使用,再见!")
            break
        
        # 处理“查询”操作
        if user_input.startswith("查询"):
            # 解析格式:查询[疾病名]的[场景]
            try:
                # 提取疾病名和场景(如“查询乙肝的症状”→疾病名“乙肝”,场景“症状”)
                parts = user_input.split("的")
                if len(parts) != 2:
                    raise ValueError("格式错误")
                disease_name = parts[0].replace("查询", "").strip()
                scene = parts[1].strip()
                # 执行查询
                query_disease_related(disease_name, scene)
            except Exception as e:
                print(f"❌ 查询格式错误,请输入示例:查询乙肝的症状")
            continue
        
        # 处理“更新描述”操作
        if user_input.startswith("更新") and "描述" in user_input:
            # 解析格式:更新[疾病名]描述[描述内容]
            try:
                parts = user_input.split("描述")
                if len(parts) != 2:
                    raise ValueError("格式错误")
                disease_name = parts[0].replace("更新", "").strip()
                desc = parts[1].strip()
                if not desc:
                    raise ValueError("描述不能为空")
                # 执行更新
                update_disease_desc(disease_name, desc)
            except Exception as e:
                print(f"❌ 更新格式错误,请输入示例:更新乙肝描述 乙肝是由乙肝病毒引起的肝脏炎症")
            continue
        
        # 处理“删除测试”操作
        if user_input.startswith("删除"):
            # 解析格式:删除[测试疾病名]
            disease_name = user_input.replace("删除", "").strip()
            # 二次确认(避免误删正式数据)
            confirm = input(f"⚠️  确认删除测试疾病「{disease_name}」吗?(输入y确认,其他取消):").strip()
            if confirm.lower() == "y":
                delete_test_data(disease_name)
            else:
                print("❌ 已取消删除")
            continue
        
        # 未识别操作
        print("❌ 未识别的操作,请参考示例格式输入")

示例交互输出:

图:对话系统成功执行的查询交互界面

Logo

火山引擎开发者社区是火山引擎打造的AI技术生态平台,聚焦Agent与大模型开发,提供豆包系列模型(图像/视频/视觉)、智能分析与会话工具,并配套评测集、动手实验室及行业案例库。社区通过技术沙龙、挑战赛等活动促进开发者成长,新用户可领50万Tokens权益,助力构建智能应用。

更多推荐