""" 机器人用户档案 MCP Server 存储并维护用户基本信息、偏好习惯。 """ import json import logging import sqlite3 from pathlib import Path # 压制 mcp 库的 INFO 日志,只保留 WARNING 及以上 logging.basicConfig(level=logging.WARNING) from mcp.server.fastmcp import FastMCP DB_PATH = Path(__file__).parent / "users.db" mcp = FastMCP("robot-user-db") def _db_connect() -> sqlite3.Connection: """统一数据库连接入口,确保启用外键约束。""" conn = sqlite3.connect(DB_PATH) conn.execute("PRAGMA foreign_keys = ON") return conn def _create_preferences_table(conn: sqlite3.Connection) -> None: conn.execute(""" CREATE TABLE preferences ( user_name TEXT NOT NULL, category TEXT NOT NULL, content TEXT NOT NULL, updated_at TEXT DEFAULT (datetime('now')), PRIMARY KEY (user_name, category), FOREIGN KEY (user_name) REFERENCES users(name) ON UPDATE CASCADE ON DELETE CASCADE ) """) # --- 初始化数据库 --- def _init_db(): with _db_connect() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS users ( name TEXT PRIMARY KEY, age INTEGER, created_at TEXT DEFAULT (datetime('now')), last_seen TEXT ) """) preferences_exists = conn.execute( "SELECT 1 FROM sqlite_master WHERE type='table' AND name='preferences'" ).fetchone() is not None if not preferences_exists: _create_preferences_table(conn) _init_db() # --- MCP 工具定义 --- @mcp.tool() def get_user_profile(user_name: str) -> str: """ 获取用户档案(基本信息、所有偏好)。 在每次对话的第一轮调用,用于了解用户背景。 """ with _db_connect() as conn: conn.row_factory = sqlite3.Row conn.execute( "UPDATE users SET last_seen = datetime('now') WHERE name = ?", (user_name,) ) user = conn.execute( "SELECT * FROM users WHERE name = ?", (user_name,) ).fetchone() if not user: return json.dumps( {"found": False, "message": f"用户 {user_name} 尚无档案,这是第一次见面。"}, ensure_ascii=False ) prefs = conn.execute( "SELECT category, content FROM preferences WHERE user_name = ?", (user_name,) ).fetchall() return json.dumps({ "found": True, "basic": { "name": user["name"], "age": user["age"], "last_seen": user["last_seen"], }, "preferences": {p["category"]: p["content"] for p in prefs}, }, ensure_ascii=False, indent=2) @mcp.tool() def upsert_user(user_name: str, age: int = None) -> str: """ 创建或更新用户基本信息。 当得知用户姓名、年龄等基本信息时调用。 """ with _db_connect() as conn: existing = conn.execute( "SELECT 1 FROM users WHERE name = ?", (user_name,), ).fetchone() is not None conn.execute( """INSERT INTO users (name, age, last_seen) VALUES (?, ?, datetime('now')) ON CONFLICT(name) DO UPDATE SET age = COALESCE(excluded.age, users.age), last_seen = datetime('now')""", (user_name, age), ) if existing: return f"已更新用户 {user_name} 的档案。" return f"已为 {user_name} 创建新档案。" @mcp.tool() def set_preference(user_name: str, category: str, content: str) -> str: """ 更新用户的某项偏好或习惯(同一 category 只保留最新值)。 category 示例:'话题喜好'、'沟通风格'、'工作习惯'、'忌讳'、'饮食偏好'。 在对话中发现新偏好时调用。 """ with _db_connect() as conn: # 保证 users 中存在主档,满足 preferences 外键约束。 conn.execute( """INSERT INTO users (name, last_seen) VALUES (?, datetime('now')) ON CONFLICT(name) DO UPDATE SET last_seen = datetime('now')""", (user_name,), ) conn.execute( """INSERT INTO preferences (user_name, category, content) VALUES (?, ?, ?) ON CONFLICT(user_name, category) DO UPDATE SET content = excluded.content, updated_at = datetime('now')""", (user_name, category, content) ) return f"已更新 {user_name} 的偏好 [{category}]:{content}。" # ============================================================ # 联网工具:定位 / 天气 / 搜索 # ============================================================ import requests # WMO 天气代码 → 中文描述 _WMO = { 0: "晴天", 1: "基本晴朗", 2: "局部多云", 3: "阴天", 45: "雾", 48: "冻雾", 51: "轻微毛毛雨", 53: "毛毛雨", 55: "密集毛毛雨", 61: "小雨", 63: "中雨", 65: "大雨", 71: "小雪", 73: "中雪", 75: "大雪", 77: "冰粒", 80: "阵雨", 81: "中等阵雨", 82: "强阵雨", 85: "阵雪", 86: "强阵雪", 95: "雷阵雨", 96: "伴有冰雹的雷阵雨", 99: "强雷阵雨", } def _lookup_location_cn(ip: str = None) -> dict | None: """ 使用国内 IP 归属地接口查询地理信息。 优先返回城市/省份;该接口不提供经纬度。 """ params = {"json": "true"} if ip: params["ip"] = ip resp = requests.get( "https://whois.pconline.com.cn/ipJson.jsp", params=params, timeout=8, ) # 该接口常见返回为 gbk 编码 resp.encoding = resp.apparent_encoding or "gbk" raw = resp.text.strip() if raw.startswith("var returnCitySN"): raw = raw.split("=", 1)[-1].strip().rstrip(";") data = json.loads(raw) city = (data.get("city") or "").strip() region = (data.get("pro") or "").strip() ip_value = (data.get("ip") or ip or "").strip() if not (city or region): return None return { "city": city or region, "region": region, "country": "中国", "lat": None, "lon": None, "ip": ip_value, } def _lookup_location_ipapi() -> dict | None: """回退定位:使用 ip-api。""" data = requests.get( "http://ip-api.com/json/", params={"lang": "zh-CN", "fields": "status,city,regionName,country,lat,lon,query"}, timeout=8, ).json() if data.get("status") != "success": return None return { "city": data.get("city") or "", "region": data.get("regionName") or "", "country": data.get("country") or "", "lat": data.get("lat"), "lon": data.get("lon"), "ip": data.get("query") or "", } def _lookup_location() -> dict | None: """统一定位入口:中国 IP 接口优先,失败回退 ip-api。""" return _lookup_location_cn() or _lookup_location_ipapi() def _geocode_city(city: str) -> tuple[float | None, float | None, str]: """根据城市名查经纬度,供天气查询使用。""" geo = requests.get( "https://geocoding-api.open-meteo.com/v1/search", params={"name": city, "count": 1, "language": "zh"}, timeout=8, ).json() results = geo.get("results") if not results: return None, None, city return ( results[0].get("latitude"), results[0].get("longitude"), results[0].get("name", city), ) def _resolve_weather_target( city: str | None, lat: float | None, lon: float | None ) -> tuple[float | None, float | None, str | None, str | None]: """统一解析天气查询目标,减少重复分支。""" auto_locate_error = "自动定位失败,请手动传入城市名。" if lat is not None and lon is not None: return lat, lon, city, None if city: lat, lon, city = _geocode_city(city) if lat is None or lon is None: return None, None, city, f"找不到城市:{city}" return lat, lon, city, None loc = _lookup_location() if not loc: return None, None, None, auto_locate_error city = loc["city"] or city lat, lon = loc.get("lat"), loc.get("lon") if lat is None or lon is None: if not city: return None, None, None, auto_locate_error lat, lon, city = _geocode_city(city) if lat is None or lon is None: return None, None, None, auto_locate_error return lat, lon, city, None @mcp.tool() def get_location() -> str: """ 通过 IP 地址获取当前地理位置(城市、省份、国家、经纬度)。 在查询天气前,或需要了解用户所在城市时调用。 """ try: loc = _lookup_location() if not loc: return "定位失败,请稍后再试。" return json.dumps({ "城市": loc["city"], "省份": loc["region"], "国家": loc["country"], "纬度": loc["lat"], "经度": loc["lon"], "IP": loc["ip"], }, ensure_ascii=False) except Exception as e: return f"定位失败:{e}" @mcp.tool() def get_weather(city: str = None, lat: float = None, lon: float = None) -> str: """ 获取实时天气信息。 可以传入城市名(city),或经纬度(lat/lon);若都不传则自动定位。 返回温度、天气状况、风速。 """ try: lat, lon, city, err = _resolve_weather_target(city, lat, lon) if err: return err # 查询天气 resp = requests.get( "https://api.open-meteo.com/v1/forecast", params={ "latitude": lat, "longitude": lon, "current": "temperature_2m,apparent_temperature,weather_code,wind_speed_10m,relative_humidity_2m", "timezone": "auto", }, timeout=10, ).json() cur = resp.get("current", {}) code = cur.get("weather_code", -1) return json.dumps({ "城市": city or f"{lat},{lon}", "天气": _WMO.get(code, f"未知(code={code})"), "温度": f"{cur.get('temperature_2m', '?')}°C", "体感温度": f"{cur.get('apparent_temperature', '?')}°C", "湿度": f"{cur.get('relative_humidity_2m', '?')}%", "风速": f"{cur.get('wind_speed_10m', '?')} km/h", }, ensure_ascii=False) except Exception as e: return f"天气查询失败:{e}" @mcp.tool() def web_search(query: str, max_results: int = 5) -> str: """ 联网搜索,获取实时信息(新闻、百科、价格等)。 返回最多 max_results 条结果(标题 + 摘要 + 链接)。 """ try: query = query.strip() if not query: return "搜索关键词不能为空。" max_results = max(1, min(max_results, 10)) from ddgs import DDGS results = DDGS().text(query, max_results=max_results) if not results: return "搜索无结果。" output = [] for i, r in enumerate(results, 1): output.append(f"{i}. {r['title']}\n {r['body'][:150]}\n {r['href']}") return "\n\n".join(output) except Exception as e: return f"搜索失败:{e}" if __name__ == "__main__": mcp.run()