feat: 添加联网搜索、地理位置查询、天气查询
This commit is contained in:
358
robot_mcp_server.py
Normal file
358
robot_mcp_server.py
Normal file
@@ -0,0 +1,358 @@
|
||||
"""
|
||||
机器人用户档案 MCP Server
|
||||
存储并维护用户基本信息、偏好习惯。
|
||||
"""
|
||||
import json
|
||||
import logging
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
# 压制 mcp 库的 INFO 日志,只保留 WARNING 及以上
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
|
||||
DB_PATH = Path(__file__).parent / "users.db"
|
||||
mcp = FastMCP("robot-user-db")
|
||||
|
||||
|
||||
def _db_connect() -> sqlite3.Connection:
|
||||
"""统一数据库连接入口,确保启用外键约束。"""
|
||||
conn = sqlite3.connect(DB_PATH)
|
||||
conn.execute("PRAGMA foreign_keys = ON")
|
||||
return conn
|
||||
|
||||
|
||||
def _create_preferences_table(conn: sqlite3.Connection) -> None:
|
||||
conn.execute("""
|
||||
CREATE TABLE preferences (
|
||||
user_name TEXT NOT NULL,
|
||||
category TEXT NOT NULL,
|
||||
content TEXT NOT NULL,
|
||||
updated_at TEXT DEFAULT (datetime('now')),
|
||||
PRIMARY KEY (user_name, category),
|
||||
FOREIGN KEY (user_name) REFERENCES users(name)
|
||||
ON UPDATE CASCADE
|
||||
ON DELETE CASCADE
|
||||
)
|
||||
""")
|
||||
|
||||
|
||||
# --- 初始化数据库 ---
|
||||
def _init_db():
|
||||
with _db_connect() as conn:
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS users (
|
||||
name TEXT PRIMARY KEY,
|
||||
age INTEGER,
|
||||
created_at TEXT DEFAULT (datetime('now')),
|
||||
last_seen TEXT
|
||||
)
|
||||
""")
|
||||
|
||||
preferences_exists = conn.execute(
|
||||
"SELECT 1 FROM sqlite_master WHERE type='table' AND name='preferences'"
|
||||
).fetchone() is not None
|
||||
|
||||
if not preferences_exists:
|
||||
_create_preferences_table(conn)
|
||||
|
||||
|
||||
_init_db()
|
||||
|
||||
|
||||
# --- MCP 工具定义 ---
|
||||
|
||||
@mcp.tool()
|
||||
def get_user_profile(user_name: str) -> str:
|
||||
"""
|
||||
获取用户档案(基本信息、所有偏好)。
|
||||
在每次对话的第一轮调用,用于了解用户背景。
|
||||
"""
|
||||
with _db_connect() as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
conn.execute(
|
||||
"UPDATE users SET last_seen = datetime('now') WHERE name = ?",
|
||||
(user_name,)
|
||||
)
|
||||
user = conn.execute(
|
||||
"SELECT * FROM users WHERE name = ?", (user_name,)
|
||||
).fetchone()
|
||||
if not user:
|
||||
return json.dumps(
|
||||
{"found": False, "message": f"用户 {user_name} 尚无档案,这是第一次见面。"},
|
||||
ensure_ascii=False
|
||||
)
|
||||
|
||||
prefs = conn.execute(
|
||||
"SELECT category, content FROM preferences WHERE user_name = ?",
|
||||
(user_name,)
|
||||
).fetchall()
|
||||
|
||||
return json.dumps({
|
||||
"found": True,
|
||||
"basic": {
|
||||
"name": user["name"],
|
||||
"age": user["age"],
|
||||
"last_seen": user["last_seen"],
|
||||
},
|
||||
"preferences": {p["category"]: p["content"] for p in prefs},
|
||||
}, ensure_ascii=False, indent=2)
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def upsert_user(user_name: str, age: int = None) -> str:
|
||||
"""
|
||||
创建或更新用户基本信息。
|
||||
当得知用户姓名、年龄等基本信息时调用。
|
||||
"""
|
||||
with _db_connect() as conn:
|
||||
existing = conn.execute(
|
||||
"SELECT 1 FROM users WHERE name = ?",
|
||||
(user_name,),
|
||||
).fetchone() is not None
|
||||
conn.execute(
|
||||
"""INSERT INTO users (name, age, last_seen)
|
||||
VALUES (?, ?, datetime('now'))
|
||||
ON CONFLICT(name) DO UPDATE SET
|
||||
age = COALESCE(excluded.age, users.age),
|
||||
last_seen = datetime('now')""",
|
||||
(user_name, age),
|
||||
)
|
||||
if existing:
|
||||
return f"已更新用户 {user_name} 的档案。"
|
||||
return f"已为 {user_name} 创建新档案。"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def set_preference(user_name: str, category: str, content: str) -> str:
|
||||
"""
|
||||
更新用户的某项偏好或习惯(同一 category 只保留最新值)。
|
||||
category 示例:'话题喜好'、'沟通风格'、'工作习惯'、'忌讳'、'饮食偏好'。
|
||||
在对话中发现新偏好时调用。
|
||||
"""
|
||||
with _db_connect() as conn:
|
||||
# 保证 users 中存在主档,满足 preferences 外键约束。
|
||||
conn.execute(
|
||||
"""INSERT INTO users (name, last_seen)
|
||||
VALUES (?, datetime('now'))
|
||||
ON CONFLICT(name) DO UPDATE SET last_seen = datetime('now')""",
|
||||
(user_name,),
|
||||
)
|
||||
conn.execute(
|
||||
"""INSERT INTO preferences (user_name, category, content)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT(user_name, category)
|
||||
DO UPDATE SET content = excluded.content, updated_at = datetime('now')""",
|
||||
(user_name, category, content)
|
||||
)
|
||||
return f"已更新 {user_name} 的偏好 [{category}]:{content}。"
|
||||
|
||||
# ============================================================
|
||||
# 联网工具:定位 / 天气 / 搜索
|
||||
# ============================================================
|
||||
|
||||
import requests
|
||||
|
||||
# WMO 天气代码 → 中文描述
|
||||
_WMO = {
|
||||
0: "晴天", 1: "基本晴朗", 2: "局部多云", 3: "阴天",
|
||||
45: "雾", 48: "冻雾",
|
||||
51: "轻微毛毛雨", 53: "毛毛雨", 55: "密集毛毛雨",
|
||||
61: "小雨", 63: "中雨", 65: "大雨",
|
||||
71: "小雪", 73: "中雪", 75: "大雪", 77: "冰粒",
|
||||
80: "阵雨", 81: "中等阵雨", 82: "强阵雨",
|
||||
85: "阵雪", 86: "强阵雪",
|
||||
95: "雷阵雨", 96: "伴有冰雹的雷阵雨", 99: "强雷阵雨",
|
||||
}
|
||||
|
||||
|
||||
def _lookup_location_cn(ip: str = None) -> dict | None:
|
||||
"""
|
||||
使用国内 IP 归属地接口查询地理信息。
|
||||
优先返回城市/省份;该接口不提供经纬度。
|
||||
"""
|
||||
params = {"json": "true"}
|
||||
if ip:
|
||||
params["ip"] = ip
|
||||
resp = requests.get(
|
||||
"https://whois.pconline.com.cn/ipJson.jsp",
|
||||
params=params,
|
||||
timeout=8,
|
||||
)
|
||||
# 该接口常见返回为 gbk 编码
|
||||
resp.encoding = resp.apparent_encoding or "gbk"
|
||||
raw = resp.text.strip()
|
||||
if raw.startswith("var returnCitySN"):
|
||||
raw = raw.split("=", 1)[-1].strip().rstrip(";")
|
||||
data = json.loads(raw)
|
||||
|
||||
city = (data.get("city") or "").strip()
|
||||
region = (data.get("pro") or "").strip()
|
||||
ip_value = (data.get("ip") or ip or "").strip()
|
||||
if not (city or region):
|
||||
return None
|
||||
return {
|
||||
"city": city or region,
|
||||
"region": region,
|
||||
"country": "中国",
|
||||
"lat": None,
|
||||
"lon": None,
|
||||
"ip": ip_value,
|
||||
}
|
||||
|
||||
|
||||
def _lookup_location_ipapi() -> dict | None:
|
||||
"""回退定位:使用 ip-api。"""
|
||||
data = requests.get(
|
||||
"http://ip-api.com/json/",
|
||||
params={"lang": "zh-CN", "fields": "status,city,regionName,country,lat,lon,query"},
|
||||
timeout=8,
|
||||
).json()
|
||||
if data.get("status") != "success":
|
||||
return None
|
||||
return {
|
||||
"city": data.get("city") or "",
|
||||
"region": data.get("regionName") or "",
|
||||
"country": data.get("country") or "",
|
||||
"lat": data.get("lat"),
|
||||
"lon": data.get("lon"),
|
||||
"ip": data.get("query") or "",
|
||||
}
|
||||
|
||||
|
||||
def _lookup_location() -> dict | None:
|
||||
"""统一定位入口:中国 IP 接口优先,失败回退 ip-api。"""
|
||||
return _lookup_location_cn() or _lookup_location_ipapi()
|
||||
|
||||
|
||||
def _geocode_city(city: str) -> tuple[float | None, float | None, str]:
|
||||
"""根据城市名查经纬度,供天气查询使用。"""
|
||||
geo = requests.get(
|
||||
"https://geocoding-api.open-meteo.com/v1/search",
|
||||
params={"name": city, "count": 1, "language": "zh"},
|
||||
timeout=8,
|
||||
).json()
|
||||
results = geo.get("results")
|
||||
if not results:
|
||||
return None, None, city
|
||||
return (
|
||||
results[0].get("latitude"),
|
||||
results[0].get("longitude"),
|
||||
results[0].get("name", city),
|
||||
)
|
||||
|
||||
|
||||
def _resolve_weather_target(
|
||||
city: str | None, lat: float | None, lon: float | None
|
||||
) -> tuple[float | None, float | None, str | None, str | None]:
|
||||
"""统一解析天气查询目标,减少重复分支。"""
|
||||
auto_locate_error = "自动定位失败,请手动传入城市名。"
|
||||
if lat is not None and lon is not None:
|
||||
return lat, lon, city, None
|
||||
|
||||
if city:
|
||||
lat, lon, city = _geocode_city(city)
|
||||
if lat is None or lon is None:
|
||||
return None, None, city, f"找不到城市:{city}"
|
||||
return lat, lon, city, None
|
||||
|
||||
loc = _lookup_location()
|
||||
if not loc:
|
||||
return None, None, None, auto_locate_error
|
||||
city = loc["city"] or city
|
||||
lat, lon = loc.get("lat"), loc.get("lon")
|
||||
if lat is None or lon is None:
|
||||
if not city:
|
||||
return None, None, None, auto_locate_error
|
||||
lat, lon, city = _geocode_city(city)
|
||||
if lat is None or lon is None:
|
||||
return None, None, None, auto_locate_error
|
||||
return lat, lon, city, None
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def get_location() -> str:
|
||||
"""
|
||||
通过 IP 地址获取当前地理位置(城市、省份、国家、经纬度)。
|
||||
在查询天气前,或需要了解用户所在城市时调用。
|
||||
"""
|
||||
try:
|
||||
loc = _lookup_location()
|
||||
if not loc:
|
||||
return "定位失败,请稍后再试。"
|
||||
return json.dumps({
|
||||
"城市": loc["city"],
|
||||
"省份": loc["region"],
|
||||
"国家": loc["country"],
|
||||
"纬度": loc["lat"],
|
||||
"经度": loc["lon"],
|
||||
"IP": loc["ip"],
|
||||
}, ensure_ascii=False)
|
||||
except Exception as e:
|
||||
return f"定位失败:{e}"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def get_weather(city: str = None, lat: float = None, lon: float = None) -> str:
|
||||
"""
|
||||
获取实时天气信息。
|
||||
可以传入城市名(city),或经纬度(lat/lon);若都不传则自动定位。
|
||||
返回温度、天气状况、风速。
|
||||
"""
|
||||
try:
|
||||
lat, lon, city, err = _resolve_weather_target(city, lat, lon)
|
||||
if err:
|
||||
return err
|
||||
|
||||
# 查询天气
|
||||
resp = requests.get(
|
||||
"https://api.open-meteo.com/v1/forecast",
|
||||
params={
|
||||
"latitude": lat, "longitude": lon,
|
||||
"current": "temperature_2m,apparent_temperature,weather_code,wind_speed_10m,relative_humidity_2m",
|
||||
"timezone": "auto",
|
||||
},
|
||||
timeout=10,
|
||||
).json()
|
||||
cur = resp.get("current", {})
|
||||
code = cur.get("weather_code", -1)
|
||||
return json.dumps({
|
||||
"城市": city or f"{lat},{lon}",
|
||||
"天气": _WMO.get(code, f"未知(code={code})"),
|
||||
"温度": f"{cur.get('temperature_2m', '?')}°C",
|
||||
"体感温度": f"{cur.get('apparent_temperature', '?')}°C",
|
||||
"湿度": f"{cur.get('relative_humidity_2m', '?')}%",
|
||||
"风速": f"{cur.get('wind_speed_10m', '?')} km/h",
|
||||
}, ensure_ascii=False)
|
||||
except Exception as e:
|
||||
return f"天气查询失败:{e}"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def web_search(query: str, max_results: int = 5) -> str:
|
||||
"""
|
||||
联网搜索,获取实时信息(新闻、百科、价格等)。
|
||||
返回最多 max_results 条结果(标题 + 摘要 + 链接)。
|
||||
"""
|
||||
try:
|
||||
query = query.strip()
|
||||
if not query:
|
||||
return "搜索关键词不能为空。"
|
||||
max_results = max(1, min(max_results, 10))
|
||||
|
||||
from ddgs import DDGS
|
||||
results = DDGS().text(query, max_results=max_results)
|
||||
if not results:
|
||||
return "搜索无结果。"
|
||||
output = []
|
||||
for i, r in enumerate(results, 1):
|
||||
output.append(f"{i}. {r['title']}\n {r['body'][:150]}\n {r['href']}")
|
||||
return "\n\n".join(output)
|
||||
except Exception as e:
|
||||
return f"搜索失败:{e}"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
mcp.run()
|
||||
Reference in New Issue
Block a user