小红书数据采集终极指南:如何高效构建Python自动化工具

发布时间:2026/6/13 1:27:36
小红书数据采集终极指南:如何高效构建Python自动化工具
小红书数据采集终极指南如何高效构建Python自动化工具【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs小红书作为国内领先的生活方式分享平台汇聚了海量用户生成内容对于数据分析师、营销从业者和开发者来说小红书数据采集已成为获取市场洞察的重要途径。xhs 是一个基于小红书 Web 端请求封装的 Python SDK提供了完整的小红书数据采集解决方案让开发者能够高效、稳定地获取平台公开数据。本文将从项目概述、快速入门、核心功能到实战应用全面解析如何使用 xhs 构建专业的数据采集系统。项目概述与价值定位xhs 项目是一个专门为小红书平台设计的 Python 数据采集工具包它封装了复杂的网络请求和签名逻辑提供了简洁易用的 API 接口。通过这个工具开发者可以轻松获取小红书笔记内容、用户信息、搜索数据等公开信息为内容分析、竞品研究和市场趋势预测提供数据支持。核心优势对比特性xhs SDK传统爬虫官方API易用性⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐稳定性⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐功能完整性⭐⭐⭐⭐⭐⭐⭐⭐维护成本低高低合规性中等低高快速入门指南安装与配置安装 xhs 非常简单可以通过 pip 直接安装# 安装最新版本 pip install xhs # 或者从源码安装 git clone https://gitcode.com/gh_mirrors/xh/xhs cd xhs pip install -e .基础使用示例让我们从最简单的示例开始展示如何快速获取小红书笔记数据from xhs import XhsClient import datetime # 初始化客户端 cookie your_cookie_string xhs_client XhsClient(cookie) # 获取指定笔记详情 note_id 6505318c000000001f03c5a6 note xhs_client.get_note_by_id(note_id) print(f笔记标题{note[title]}) print(f作者{note[user][nickname]}) print(f点赞数{note[likes]}) print(f收藏数{note[collects]}) print(f发布时间{datetime.datetime.fromtimestamp(note[time]/1000)})核心功能详解1. 多维度搜索功能xhs 提供了强大的搜索功能支持多种搜索条件和排序方式from xhs import SearchSortType, SearchNoteType # 初始化客户端 xhs_client XhsClient(cookie) # 综合搜索示例 search_results xhs_client.search( keywordPython编程, sortSearchSortType.GENERAL, note_typeSearchNoteType.VIDEO, page1 ) # 分析搜索结果 for result in search_results[items][:5]: # 前5条结果 print(f笔记ID{result[id]}) print(f标题{result[title]}) print(f互动数据 - 点赞{result[likes]}, 收藏{result[collects]}) print(f用户{result[user][nickname]}) print(- * 50)2. 内容分类浏览系统xhs 支持按内容分类获取推荐流覆盖小红书主要内容领域from xhs import FeedType # 获取不同分类的内容 feed_types { 美食: FeedType.FOOD, 穿搭: FeedType.FASION, 旅行: FeedType.TRAVEL, 健身: FeedType.FITNESS, 游戏: FeedType.GAME } for category_name, feed_type in feed_types.items(): print(f\n获取{category_name}分类内容...) notes xhs_client.get_home_feed(feed_typefeed_type) # 分析热门内容 top_notes sorted(notes[items], keylambda x: x[likes], reverseTrue)[:3] for note in top_notes: print(f - {note[title][:30]}... (点赞: {note[likes]}))3. 用户信息获取除了笔记内容xhs 还支持获取用户相关信息# 获取用户信息 user_info xhs_client.get_user_info(user_id用户ID) print(f用户昵称{user_info[nickname]}) print(f粉丝数{user_info[fans_count]}) print(f笔记数{user_info[notes_count]}) print(f个人简介{user_info[desc]})核心源码xhs/core.py 包含了所有核心功能的实现实战应用场景场景一内容趋势分析平台构建一个自动化内容趋势分析系统帮助品牌发现热门话题import pandas as pd from collections import Counter from datetime import datetime, timedelta class ContentTrendAnalyzer: def __init__(self, xhs_client): self.xhs_client xhs_client self.trend_data [] def collect_trending_data(self, keyword, days7): 收集指定关键词的趋势数据 for day in range(days): target_date datetime.now() - timedelta(daysday) # 搜索相关笔记 results self.xhs_client.search( keywordkeyword, sortSearchSortType.GENERAL, page1 ) for note in results[items]: self.trend_data.append({ date: target_date.date(), keyword: keyword, title: note[title], likes: note[likes], collects: note[collects], comments: note[comments], user: note[user][nickname] }) return pd.DataFrame(self.trend_data) def analyze_trend_patterns(self, df): 分析趋势模式 # 按日期统计 daily_stats df.groupby(date).agg({ likes: mean, collects: mean, comments: mean }).reset_index() # 热门关键词提取 all_titles .join(df[title].tolist()) word_counts Counter(all_titles.split()) trending_keywords word_counts.most_common(10) return { daily_stats: daily_stats, trending_keywords: trending_keywords, total_notes: len(df), avg_engagement: df[likes].mean() }场景二竞品监控系统为市场营销团队构建竞品内容监控系统import schedule import time from datetime import datetime class CompetitorMonitor: def __init__(self, competitors, xhs_client): self.competitors competitors self.xhs_client xhs_client self.monitoring_data {} def setup_monitoring_schedule(self): 设置监控计划 # 每小时监控一次 schedule.every(1).hours.do(self.monitor_all_competitors) # 每天生成报告 schedule.every().day.at(09:00).do(self.generate_daily_report) def monitor_all_competitors(self): 监控所有竞品 print(f开始竞品监控 - {datetime.now()}) for competitor in self.competitors: try: self.monitor_competitor(competitor) time.sleep(2) # 避免请求过快 except Exception as e: print(f监控 {competitor} 时出错: {e}) def monitor_competitor(self, competitor_name): 监控单个竞品 # 搜索竞品相关内容 search_results self.xhs_client.search( keywordcompetitor_name, sortSearchSortType.TIME_DESC ) # 分析最新内容 latest_posts search_results[items][:10] for post in latest_posts: post_id post[id] # 获取详细数据 note_detail self.xhs_client.get_note_by_id(post_id) # 计算互动率 engagement_rate self.calculate_engagement_rate(note_detail) # 存储数据 if competitor_name not in self.monitoring_data: self.monitoring_data[competitor_name] [] self.monitoring_data[competitor_name].append({ post_id: post_id, title: post[title], engagement_rate: engagement_rate, timestamp: datetime.now(), likes: note_detail[likes], collects: note_detail[collects], comments: note_detail[comments] }) print(f✓ 监控到 {competitor_name} 的新内容: {post[title][:50]}...)场景三内容质量评估系统构建内容质量评估系统帮助内容创作者优化发布策略class ContentQualityAnalyzer: def __init__(self, xhs_client): self.xhs_client xhs_client def analyze_content_quality(self, note_id): 分析内容质量 note self.xhs_client.get_note_by_id(note_id) quality_score 0 quality_factors [] # 1. 互动率评估 engagement_score self.calculate_engagement_score(note) quality_score engagement_score quality_factors.append(f互动得分: {engagement_score}) # 2. 内容完整性评估 completeness_score self.evaluate_content_completeness(note) quality_score completeness_score quality_factors.append(f完整度得分: {completeness_score}) # 3. 时效性评估 timeliness_score self.evaluate_timeliness(note) quality_score timeliness_score quality_factors.append(f时效性得分: {timeliness_score}) # 4. 多媒体质量评估 media_score self.evaluate_media_quality(note) quality_score media_score quality_factors.append(f媒体质量得分: {media_score}) return { total_score: quality_score, factors: quality_factors, recommendations: self.generate_recommendations(quality_score, quality_factors) } def calculate_engagement_score(self, note): 计算互动得分 likes note.get(likes, 0) collects note.get(collects, 0) comments note.get(comments, 0) # 加权计算互动得分 engagement_score (likes * 0.4 collects * 0.3 comments * 0.3) / 100 return min(engagement_score, 25) # 最高25分 def generate_recommendations(self, score, factors): 生成优化建议 recommendations [] if score 60: recommendations.append(建议增加互动引导如提问或投票) recommendations.append(优化标题和封面图提高点击率) recommendations.append(增加内容深度提供更多实用信息) elif score 80: recommendations.append(继续保持内容质量) recommendations.append(尝试不同内容形式如视频或图文结合) recommendations.append(增加发布时间频率) else: recommendations.append(内容质量优秀继续保持) recommendations.append(可以考虑系列化内容创作) recommendations.append(与粉丝互动建立社区) return recommendations性能优化与最佳实践1. 请求频率控制策略为了避免被平台限制需要合理控制请求频率import time from functools import wraps import random class RateLimiter: def __init__(self, max_calls5, period60): self.max_calls max_calls self.period period self.calls [] def __call__(self, func): wraps(func) def wrapper(*args, **kwargs): now time.time() # 清理过期记录 self.calls[:] [t for t in self.calls if t now - self.period] # 检查是否超过限制 if len(self.calls) self.max_calls: sleep_time self.period - (now - self.calls[0]) print(f⚠️ 达到频率限制等待 {sleep_time:.1f} 秒) time.sleep(sleep_time random.uniform(0.5, 1.5)) self.calls.clear() # 记录本次调用 self.calls.append(time.time()) # 添加随机延迟模拟人类行为 time.sleep(random.uniform(0.5, 2)) return func(*args, **kwargs) return wrapper # 使用装饰器 RateLimiter(max_calls3, period60) def safe_search(keyword, page1): 安全的搜索函数 return xhs_client.search(keywordkeyword, pagepage)2. 错误处理与重试机制健壮的错误处理是数据采集系统的关键import logging from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type # 配置日志 logging.basicConfig(levellogging.INFO) logger logging.getLogger(__name__) class RobustXhsClient: def __init__(self, cookie): self.xhs_client XhsClient(cookie) self.session self._create_session() def _create_session(self): 创建带重试机制的会话 session requests.Session() # 配置重试策略 retry_strategy requests.adapters.Retry( total3, backoff_factor1, status_forcelist[429, 500, 502, 503, 504], allowed_methods[GET, POST] ) adapter requests.adapters.HTTPAdapter( max_retriesretry_strategy, pool_connections10, pool_maxsize10 ) session.mount(http://, adapter) session.mount(https://, adapter) return session retry( stopstop_after_attempt(3), waitwait_exponential(multiplier1, min2, max10), retryretry_if_exception_type((requests.exceptions.RequestException, DataFetchError)) ) def get_note_with_retry(self, note_id): 带重试的笔记获取 try: logger.info(f尝试获取笔记: {note_id}) note self.xhs_client.get_note_by_id(note_id) logger.info(f成功获取笔记: {note_id}) return note except Exception as e: logger.error(f获取笔记失败: {note_id}, 错误: {e}) raise3. 数据存储优化方案import sqlite3 import json from datetime import datetime from contextlib import contextmanager class XhsDataStorage: def __init__(self, db_pathxhs_data.db): self.db_path db_path self._init_database() contextmanager def get_connection(self): 获取数据库连接上下文管理器 conn sqlite3.connect(self.db_path) try: yield conn finally: conn.close() def _init_database(self): 初始化数据库结构 with self.get_connection() as conn: cursor conn.cursor() # 创建笔记表 cursor.execute( CREATE TABLE IF NOT EXISTS notes ( id TEXT PRIMARY KEY, title TEXT, content TEXT, user_id TEXT, likes INTEGER DEFAULT 0, collects INTEGER DEFAULT 0, comments INTEGER DEFAULT 0, publish_time DATETIME, category TEXT, tags TEXT, raw_data TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ) # 创建用户表 cursor.execute( CREATE TABLE IF NOT EXISTS users ( user_id TEXT PRIMARY KEY, nickname TEXT, avatar TEXT, notes_count INTEGER DEFAULT 0, fans_count INTEGER DEFAULT 0, following_count INTEGER DEFAULT 0, desc TEXT, raw_data TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ) # 创建索引 cursor.execute(CREATE INDEX IF NOT EXISTS idx_notes_publish_time ON notes(publish_time)) cursor.execute(CREATE INDEX IF NOT EXISTS idx_notes_likes ON notes(likes)) cursor.execute(CREATE INDEX IF NOT EXISTS idx_notes_category ON notes(category)) conn.commit() def save_note(self, note_data): 保存笔记数据 with self.get_connection() as conn: cursor conn.cursor() # 提取标签 tags note_data.get(tags, []) tags_str ,.join(tags) if tags else cursor.execute( INSERT OR REPLACE INTO notes (id, title, content, user_id, likes, collects, comments, publish_time, category, tags, raw_data, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) , ( note_data[id], note_data.get(title, ), note_data.get(desc, ), note_data[user][user_id], note_data.get(likes, 0), note_data.get(collects, 0), note_data.get(comments, 0), datetime.fromtimestamp(note_data[time]/1000) if time in note_data else None, note_data.get(category, ), tags_str, json.dumps(note_data, ensure_asciiFalse) )) conn.commit()常见问题排错问题一签名验证失败症状请求返回签名错误或验证失败解决方案检查 cookie 是否过期或无效验证签名函数逻辑是否正确适当增加延迟避免频繁请求def enhanced_sign(uri, dataNone, a1, web_session): 增强版签名函数 import time from playwright.sync_api import sync_playwright for retry in range(3): try: with sync_playwright() as playwright: browser playwright.chromium.launch(headlessTrue) context browser.new_context() page context.new_page() # 设置更长的页面加载时间 page.set_default_timeout(10000) # 添加必要的初始化脚本 page.goto(https://www.xiaohongshu.com, wait_untilnetworkidle) # 设置cookie context.add_cookies([ {name: a1, value: a1, domain: .xiaohongshu.com, path: /} ]) page.reload(wait_untilnetworkidle) time.sleep(3) # 增加等待时间确保页面完全加载 # 检查签名函数是否存在 signature_available page.evaluate(typeof window._webmsxyw function) if not signature_available: raise Exception(签名函数未加载成功) # 执行签名 encrypt_params page.evaluate( ([url, data]) window._webmsxyw(url, data), [uri, data] ) browser.close() return { x-s: encrypt_params[X-s], x-t: str(encrypt_params[X-t]) } except Exception as e: if retry 2: raise Exception(f签名失败重试3次后仍然失败: {str(e)}) # 指数退避重试 wait_time (retry 1) * 2 print(f第{retry1}次签名失败等待{wait_time}秒后重试) time.sleep(wait_time)问题二IP被封禁处理症状请求返回403错误或连接被拒绝解决方案使用代理IP池轮换降低请求频率实现智能重试机制class ProxyManager: def __init__(self, proxy_list): self.proxy_list proxy_list self.current_index 0 self.failed_proxies set() def get_current_proxy(self): 获取当前可用的代理 if not self.proxy_list: return None # 跳过失效的代理 while (self.proxy_list[self.current_index] in self.failed_proxies and len(self.failed_proxies) len(self.proxy_list)): self.current_index (self.current_index 1) % len(self.proxy_list) return self.proxy_list[self.current_index] def rotate_proxy(self): 切换到下一个代理 self.current_index (self.current_index 1) % len(self.proxy_list) print(f切换到代理: {self.get_current_proxy()}) def mark_proxy_failed(self, proxy): 标记代理失效 self.failed_proxies.add(proxy) print(f标记代理失效: {proxy}) # 如果所有代理都失效清空失效列表重新开始 if len(self.failed_proxies) len(self.proxy_list): print(所有代理都失效清空失效列表重新开始) self.failed_proxies.clear() def is_proxy_available(self, proxy): 检查代理是否可用 return proxy not in self.failed_proxies问题三数据格式变化处理症状API返回数据格式变化导致解析失败解决方案实现兼容性解析添加数据验证建立格式监控class AdaptiveDataParser: def __init__(self): self.field_mappings { title: [title, note_title, desc, content], likes: [likes, like_count, likeCount], collects: [collects, collect_count, collectCount], comments: [comments, comment_count, commentCount], user_id: [user.user_id, author.user_id, user_id], nickname: [user.nickname, author.nickname, nickname] } def parse_note_data(self, raw_data): 自适应解析笔记数据 result {} # 尝试多种可能的字段路径 for target_field, possible_paths in self.field_mappings.items(): value self._extract_field(raw_data, possible_paths) if value is not None: result[target_field] value # 设置默认值 result.setdefault(likes, 0) result.setdefault(collects, 0) result.setdefault(comments, 0) result.setdefault(title, ) result.setdefault(user, {user_id: , nickname: 未知用户}) return result def _extract_field(self, data, paths): 从多个可能路径中提取字段 for path in paths: try: if . in path: # 处理嵌套路径 parts path.split(.) value data for part in parts: value value.get(part, {}) if value ! {}: return value else: # 处理直接路径 value data.get(path) if value is not None: return value except (KeyError, AttributeError, TypeError): continue return None def validate_note_data(self, note_data): 验证笔记数据完整性 required_fields [id, title, user] for field in required_fields: if field not in note_data or not note_data[field]: return False, f缺少必要字段: {field} # 验证用户信息 if user_id not in note_data.get(user, {}): return False, 用户信息不完整 # 验证数值字段 numeric_fields [likes, collects, comments] for field in numeric_fields: if field in note_data and not isinstance(note_data[field], (int, float)): return False, f字段 {field} 类型错误 return True, 数据验证通过扩展与进阶1. 异步请求支持对于需要处理大量请求的场景可以扩展异步支持import asyncio import aiohttp from typing import List, Dict, Any class AsyncXhsClient: def __init__(self, cookie, max_concurrent10): self.cookie cookie self.max_concurrent max_concurrent self.semaphore asyncio.Semaphore(max_concurrent) async def fetch_multiple_notes(self, note_ids: List[str]) - Dict[str, Any]: 异步获取多个笔记 tasks [] results {} async with aiohttp.ClientSession() as session: for note_id in note_ids: task asyncio.create_task( self._fetch_note_with_semaphore(session, note_id) ) tasks.append((note_id, task)) for note_id, task in tasks: try: results[note_id] await task except Exception as e: results[note_id] {error: str(e)} return results async def _fetch_note_with_semaphore(self, session, note_id): 使用信号量控制并发 async with self.semaphore: return await self._fetch_note(session, note_id) async def _fetch_note(self, session, note_id): 获取单个笔记 url fhttps://www.xiaohongshu.com/explore/{note_id} headers { Cookie: self.cookie, User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 } async with session.get(url, headersheaders) as response: if response.status 200: return await response.json() else: raise Exception(f请求失败: {response.status})2. 数据可视化扩展结合数据可视化库创建直观的数据分析报告import matplotlib.pyplot as plt import seaborn as sns from wordcloud import WordCloud class XhsDataVisualizer: def __init__(self, data_storage): self.storage data_storage def create_engagement_heatmap(self, categoryNone): 创建互动热力图 with self.storage.get_connection() as conn: query SELECT strftime(%H, publish_time) as hour, strftime(%w, publish_time) as weekday, AVG(likes) as avg_likes, COUNT(*) as post_count FROM notes WHERE publish_time IS NOT NULL if category: query f AND category {category} query GROUP BY hour, weekday ORDER BY hour, weekday df pd.read_sql_query(query, conn) # 创建热力图 pivot_table df.pivot(indexweekday, columnshour, valuesavg_likes) plt.figure(figsize(12, 8)) sns.heatmap(pivot_table, cmapYlOrRd, annotTrue, fmt.0f) plt.title(小红书笔记互动热力图) plt.xlabel(发布时间小时) plt.ylabel(星期几) plt.tight_layout() return plt def create_word_cloud(self, categoryNone, max_words100): 创建词云图 with self.storage.get_connection() as conn: query SELECT title FROM notes WHERE title ! if category: query f AND category {category} df pd.read_sql_query(query, conn) # 合并所有标题 text .join(df[title].tolist()) # 生成词云 wordcloud WordCloud( width800, height400, background_colorwhite, max_wordsmax_words, font_pathsimhei.ttf # 中文字体 ).generate(text) plt.figure(figsize(12, 8)) plt.imshow(wordcloud, interpolationbilinear) plt.axis(off) plt.title(小红书笔记标题词云) return plt3. 监控告警系统构建监控告警系统及时发现数据采集异常import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from datetime import datetime, timedelta class MonitoringAlertSystem: def __init__(self, email_config): self.email_config email_config self.alerts [] def check_data_quality(self, storage, hours24): 检查数据质量 with storage.get_connection() as conn: # 检查最近24小时的数据 cutoff_time datetime.now() - timedelta(hourshours) # 检查数据完整性 query SELECT COUNT(*) as total_notes, SUM(CASE WHEN title IS NULL OR title THEN 1 ELSE 0 END) as missing_titles, SUM(CASE WHEN likes 0 AND collects 0 AND comments 0 THEN 1 ELSE 0 END) as zero_engagement, AVG(likes) as avg_likes FROM notes WHERE created_at ? cursor conn.cursor() cursor.execute(query, (cutoff_time,)) result cursor.fetchone() # 分析数据质量 quality_issues [] if result[missing_titles] 0: quality_issues.append(f缺失标题的笔记: {result[missing_titles]}条) if result[zero_engagement] result[total_notes] * 0.1: # 超过10% quality_issues.append(f零互动笔记过多: {result[zero_engagement]}条) if result[avg_likes] 10: # 平均点赞低于10 quality_issues.append(f平均点赞数过低: {result[avg_likes]:.1f}) return { total_notes: result[total_notes], quality_issues: quality_issues, avg_likes: result[avg_likes] } def send_alert_email(self, subject, message): 发送告警邮件 msg MIMEMultipart() msg[From] self.email_config[sender] msg[To] , .join(self.email_config[recipients]) msg[Subject] subject msg.attach(MIMEText(message, plain)) try: with smtplib.SMTP(self.email_config[smtp_server], self.email_config[smtp_port]) as server: server.starttls() server.login(self.email_config[username], self.email_config[password]) server.send_message(msg) print(f告警邮件发送成功: {subject}) except Exception as e: print(f发送告警邮件失败: {e}) def monitor_and_alert(self, storage): 监控并发送告警 quality_report self.check_data_quality(storage) if quality_report[quality_issues]: subject f小红书数据采集质量告警 - {datetime.now().strftime(%Y-%m-%d %H:%M)} message f 数据采集质量报告 总笔记数{quality_report[total_notes]} 平均点赞数{quality_report[avg_likes]:.1f} 发现的问题 for issue in quality_report[quality_issues]: message f- {issue}\n message \n建议检查采集配置和网络连接。 self.send_alert_email(subject, message) self.alerts.append({ timestamp: datetime.now(), issues: quality_report[quality_issues] })通过本文的全面介绍您已经掌握了使用 xhs 进行小红书数据采集的核心技术和最佳实践。从基础使用到高级功能从性能优化到实战应用xhs 为开发者提供了一个强大而灵活的数据采集解决方案。无论您是数据分析师、营销人员还是开发者都可以基于 xhs 构建符合自己需求的自动化工具。记住技术是工具合规使用是关键。在使用 xhs 进行数据采集时请务必遵守相关法律法规和平台规则合理控制请求频率尊重用户隐私仅将数据用于合法合规的用途。示例代码example/ 提供了丰富的使用示例 配置文件setup.cfg 包含项目配置信息 测试用例tests/ 确保代码质量【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考