Files
face_agent/robot_mcp_server.py

359 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
机器人用户档案 MCP Server
存储并维护用户基本信息、偏好习惯。
"""
import json
import logging
import sqlite3
from pathlib import Path
# 压制 mcp 库的 INFO 日志,只保留 WARNING 及以上
logging.basicConfig(level=logging.WARNING)
from mcp.server.fastmcp import FastMCP
DB_PATH = Path(__file__).parent / "users.db"
mcp = FastMCP("robot-user-db")
def _db_connect() -> sqlite3.Connection:
"""统一数据库连接入口,确保启用外键约束。"""
conn = sqlite3.connect(DB_PATH)
conn.execute("PRAGMA foreign_keys = ON")
return conn
def _create_preferences_table(conn: sqlite3.Connection) -> None:
conn.execute("""
CREATE TABLE preferences (
user_name TEXT NOT NULL,
category TEXT NOT NULL,
content TEXT NOT NULL,
updated_at TEXT DEFAULT (datetime('now')),
PRIMARY KEY (user_name, category),
FOREIGN KEY (user_name) REFERENCES users(name)
ON UPDATE CASCADE
ON DELETE CASCADE
)
""")
# --- 初始化数据库 ---
def _init_db():
with _db_connect() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
name TEXT PRIMARY KEY,
age INTEGER,
created_at TEXT DEFAULT (datetime('now')),
last_seen TEXT
)
""")
preferences_exists = conn.execute(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name='preferences'"
).fetchone() is not None
if not preferences_exists:
_create_preferences_table(conn)
_init_db()
# --- MCP 工具定义 ---
@mcp.tool()
def get_user_profile(user_name: str) -> str:
"""
获取用户档案(基本信息、所有偏好)。
在每次对话的第一轮调用,用于了解用户背景。
"""
with _db_connect() as conn:
conn.row_factory = sqlite3.Row
conn.execute(
"UPDATE users SET last_seen = datetime('now') WHERE name = ?",
(user_name,)
)
user = conn.execute(
"SELECT * FROM users WHERE name = ?", (user_name,)
).fetchone()
if not user:
return json.dumps(
{"found": False, "message": f"用户 {user_name} 尚无档案,这是第一次见面。"},
ensure_ascii=False
)
prefs = conn.execute(
"SELECT category, content FROM preferences WHERE user_name = ?",
(user_name,)
).fetchall()
return json.dumps({
"found": True,
"basic": {
"name": user["name"],
"age": user["age"],
"last_seen": user["last_seen"],
},
"preferences": {p["category"]: p["content"] for p in prefs},
}, ensure_ascii=False, indent=2)
@mcp.tool()
def upsert_user(user_name: str, age: int = None) -> str:
"""
创建或更新用户基本信息。
当得知用户姓名、年龄等基本信息时调用。
"""
with _db_connect() as conn:
existing = conn.execute(
"SELECT 1 FROM users WHERE name = ?",
(user_name,),
).fetchone() is not None
conn.execute(
"""INSERT INTO users (name, age, last_seen)
VALUES (?, ?, datetime('now'))
ON CONFLICT(name) DO UPDATE SET
age = COALESCE(excluded.age, users.age),
last_seen = datetime('now')""",
(user_name, age),
)
if existing:
return f"已更新用户 {user_name} 的档案。"
return f"已为 {user_name} 创建新档案。"
@mcp.tool()
def set_preference(user_name: str, category: str, content: str) -> str:
"""
更新用户的某项偏好或习惯(同一 category 只保留最新值)。
category 示例:'话题喜好''沟通风格''工作习惯''忌讳''饮食偏好'
在对话中发现新偏好时调用。
"""
with _db_connect() as conn:
# 保证 users 中存在主档,满足 preferences 外键约束。
conn.execute(
"""INSERT INTO users (name, last_seen)
VALUES (?, datetime('now'))
ON CONFLICT(name) DO UPDATE SET last_seen = datetime('now')""",
(user_name,),
)
conn.execute(
"""INSERT INTO preferences (user_name, category, content)
VALUES (?, ?, ?)
ON CONFLICT(user_name, category)
DO UPDATE SET content = excluded.content, updated_at = datetime('now')""",
(user_name, category, content)
)
return f"已更新 {user_name} 的偏好 [{category}]{content}"
# ============================================================
# 联网工具:定位 / 天气 / 搜索
# ============================================================
import requests
# WMO 天气代码 → 中文描述
_WMO = {
0: "晴天", 1: "基本晴朗", 2: "局部多云", 3: "阴天",
45: "", 48: "冻雾",
51: "轻微毛毛雨", 53: "毛毛雨", 55: "密集毛毛雨",
61: "小雨", 63: "中雨", 65: "大雨",
71: "小雪", 73: "中雪", 75: "大雪", 77: "冰粒",
80: "阵雨", 81: "中等阵雨", 82: "强阵雨",
85: "阵雪", 86: "强阵雪",
95: "雷阵雨", 96: "伴有冰雹的雷阵雨", 99: "强雷阵雨",
}
def _lookup_location_cn(ip: str = None) -> dict | None:
"""
使用国内 IP 归属地接口查询地理信息。
优先返回城市/省份;该接口不提供经纬度。
"""
params = {"json": "true"}
if ip:
params["ip"] = ip
resp = requests.get(
"https://whois.pconline.com.cn/ipJson.jsp",
params=params,
timeout=8,
)
# 该接口常见返回为 gbk 编码
resp.encoding = resp.apparent_encoding or "gbk"
raw = resp.text.strip()
if raw.startswith("var returnCitySN"):
raw = raw.split("=", 1)[-1].strip().rstrip(";")
data = json.loads(raw)
city = (data.get("city") or "").strip()
region = (data.get("pro") or "").strip()
ip_value = (data.get("ip") or ip or "").strip()
if not (city or region):
return None
return {
"city": city or region,
"region": region,
"country": "中国",
"lat": None,
"lon": None,
"ip": ip_value,
}
def _lookup_location_ipapi() -> dict | None:
"""回退定位:使用 ip-api。"""
data = requests.get(
"http://ip-api.com/json/",
params={"lang": "zh-CN", "fields": "status,city,regionName,country,lat,lon,query"},
timeout=8,
).json()
if data.get("status") != "success":
return None
return {
"city": data.get("city") or "",
"region": data.get("regionName") or "",
"country": data.get("country") or "",
"lat": data.get("lat"),
"lon": data.get("lon"),
"ip": data.get("query") or "",
}
def _lookup_location() -> dict | None:
"""统一定位入口:中国 IP 接口优先,失败回退 ip-api。"""
return _lookup_location_cn() or _lookup_location_ipapi()
def _geocode_city(city: str) -> tuple[float | None, float | None, str]:
"""根据城市名查经纬度,供天气查询使用。"""
geo = requests.get(
"https://geocoding-api.open-meteo.com/v1/search",
params={"name": city, "count": 1, "language": "zh"},
timeout=8,
).json()
results = geo.get("results")
if not results:
return None, None, city
return (
results[0].get("latitude"),
results[0].get("longitude"),
results[0].get("name", city),
)
def _resolve_weather_target(
city: str | None, lat: float | None, lon: float | None
) -> tuple[float | None, float | None, str | None, str | None]:
"""统一解析天气查询目标,减少重复分支。"""
auto_locate_error = "自动定位失败,请手动传入城市名。"
if lat is not None and lon is not None:
return lat, lon, city, None
if city:
lat, lon, city = _geocode_city(city)
if lat is None or lon is None:
return None, None, city, f"找不到城市:{city}"
return lat, lon, city, None
loc = _lookup_location()
if not loc:
return None, None, None, auto_locate_error
city = loc["city"] or city
lat, lon = loc.get("lat"), loc.get("lon")
if lat is None or lon is None:
if not city:
return None, None, None, auto_locate_error
lat, lon, city = _geocode_city(city)
if lat is None or lon is None:
return None, None, None, auto_locate_error
return lat, lon, city, None
@mcp.tool()
def get_location() -> str:
"""
通过 IP 地址获取当前地理位置(城市、省份、国家、经纬度)。
在查询天气前,或需要了解用户所在城市时调用。
"""
try:
loc = _lookup_location()
if not loc:
return "定位失败,请稍后再试。"
return json.dumps({
"城市": loc["city"],
"省份": loc["region"],
"国家": loc["country"],
"纬度": loc["lat"],
"经度": loc["lon"],
"IP": loc["ip"],
}, ensure_ascii=False)
except Exception as e:
return f"定位失败:{e}"
@mcp.tool()
def get_weather(city: str = None, lat: float = None, lon: float = None) -> str:
"""
获取实时天气信息。
可以传入城市名city或经纬度lat/lon若都不传则自动定位。
返回温度、天气状况、风速。
"""
try:
lat, lon, city, err = _resolve_weather_target(city, lat, lon)
if err:
return err
# 查询天气
resp = requests.get(
"https://api.open-meteo.com/v1/forecast",
params={
"latitude": lat, "longitude": lon,
"current": "temperature_2m,apparent_temperature,weather_code,wind_speed_10m,relative_humidity_2m",
"timezone": "auto",
},
timeout=10,
).json()
cur = resp.get("current", {})
code = cur.get("weather_code", -1)
return json.dumps({
"城市": city or f"{lat},{lon}",
"天气": _WMO.get(code, f"未知(code={code})"),
"温度": f"{cur.get('temperature_2m', '?')}°C",
"体感温度": f"{cur.get('apparent_temperature', '?')}°C",
"湿度": f"{cur.get('relative_humidity_2m', '?')}%",
"风速": f"{cur.get('wind_speed_10m', '?')} km/h",
}, ensure_ascii=False)
except Exception as e:
return f"天气查询失败:{e}"
@mcp.tool()
def web_search(query: str, max_results: int = 5) -> str:
"""
联网搜索,获取实时信息(新闻、百科、价格等)。
返回最多 max_results 条结果(标题 + 摘要 + 链接)。
"""
try:
query = query.strip()
if not query:
return "搜索关键词不能为空。"
max_results = max(1, min(max_results, 10))
from ddgs import DDGS
results = DDGS().text(query, max_results=max_results)
if not results:
return "搜索无结果。"
output = []
for i, r in enumerate(results, 1):
output.append(f"{i}. {r['title']}\n {r['body'][:150]}\n {r['href']}")
return "\n\n".join(output)
except Exception as e:
return f"搜索失败:{e}"
if __name__ == "__main__":
mcp.run()