零基础小白也能玩转!实现Python抓取股票数据-完结(完整代码)

360影视 动漫周边 2025-03-17 00:55 3

摘要:话不多说,接前两篇,今天来讲讲业务逻辑层(data_processor.py)和工具组件层(utils.py),并且今天全部收尾该程序,完整给大家放出。

喜欢的条友记得关注、点赞、转发、收藏,你们的支持就是我最大的动力源泉。

前期基础教程:

话不多说,接前两篇,今天来讲讲业务逻辑层(data_processor.py)和工具组件层(utils.py),并且今天全部收尾该程序,完整给大家放出。

股票数据采集系统├── 数据接入层(api_client.py)├── 业务逻辑层(data_processor.py)├── 工具组件层(utils.py)├── 配置中心(config.py)└── 调度入口(main.py)

开篇:数据是投资的燃料,但获取它太难了?

在股市中,数据就是财富!但手动抓取行情数据?那简直是煎熬!今天,我将手把手教你用Python打造一个全自动股票数据抓取系统,实时行情、历史K线、批量获取一网打尽!代码已通过实战验证,新手也能轻松上手!

1. 初始化:连接东方财富API(关键第一步)

from api_client import EastMoneyAPI # 初始化API客户端,连接东方财富数据接口from config import TRADE_DETAIL_MAP, MARKET_MAPPING # 加载预定义字段映射表from utils import parse_timestamp, convert_dtypes # 引入时间解析和数据类型转换工具import pandas as pdimport multitaskingfrom tqdm import tqdmfrom func_timeout import func_set_timeoutfrom datetime import datetimefrom jsonpath import jsonpath

代码亮点:

智能导入:一次性调用API、配置文件、工具函数,为后续操作铺路。超前准备:multitasking和func_set_timeout为异步请求和防超时设计埋下伏笔。

2. 实时行情闪电捕手:get_realtime方法

def get_realtime(self, symbols):secids = [self.api.resolve_code(s) for s in (symbols if isinstance(symbols, list) else [symbols])]params = {'fields': ','.join(TRADE_DETAIL_MAP.keys),'secids': ','.join(secids),'fltt': '2', # 数据格式优化参数'plat': 'Iphone', # 模拟移动端请求,提升API成功率'product': 'EFund'}try:data = self.api.fetch_data('realtime', params)return self._parse_realtime(data)except Exception as e:print(f"获取{symbols}历史数据失败: {str(e)}")return pd.DataFrame

核心功能:

智能证券编码:自动将股票代码转换为API所需的secid格式。移动端伪装:通过plat=Iphone参数,模拟手机端请求,突破API访问限制。异常兜底:捕获所有异常,确保程序稳定运行,失败时返回空DataFrame不中断流程。

3. 数据解析黑科技:_parse_realtime解密

def _parse_realtime(self, data):items = jsonpath(data, '$..diff[:]') or # 提取实时数据节点df = pd.DataFrame(items)[list(TRADE_DETAIL_MAP.keys)]df = df.rename(columns=TRADE_DETAIL_MAP) # 转换为人类可读的字段名df['市场'] = df['编号'].astype(str).map(MARKET_MAPPING) # 自动识别沪深市场df['时间'] = df['更新时间戳'].apply(parse_timestamp) # 时间戳转可读格式(如"2023-09-20 15:00")return convert_dtypes(df.drop(['编号', '更新时间戳'], axis=1), ['代码', '名称', '市场', '时间'])

神操作:

jsonPath魔法:用jsonpath精准定位数据节点,避免手动遍历的繁琐。字段翻译器:将API返回的原始字段名(如f1,f2)转换为直观的开盘价、收盘价等。数据清洗:自动过滤冗余字段,保留核心数据,类型转换确保后续分析无误。

️4. 历史数据考古专家:get_history深度挖掘

def get_history(self, symbol, start='19000101', end=None, freq='d'):code_id = self.api.resolve_code(symbol)params = {'secid': code_id,'beg': ''.join(str(start).split('-')), # 格式化日期为YYYYMMDD'end': ''.join(str(end or self.latest_date).split('-')),'klt': self._parse_freq(freq), # 转换频率参数(如'D'→日线)'fqt': '1', # 复权方式:1表示前复权'fields1': 'f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13','fields2': 'f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61'}try:data = self.api.fetch_data('kline', params)return self._parse_history(data)except Exception as e:print(f"获取{symbol}历史数据失败: {str(e)}")return pd.DataFrame

玩家级技巧:

时间穿越:默认从1900年抓取数据,实现历史K线全记录。复权黑科技:fqt=1自动处理复权问题,避免手动计算。参数魔术:fields1和fields2覆盖所有必要字段,确保数据完整性。

5. 异步加速:batch_fetch批量抓取不卡顿

def batch_fetch(self, symbols):results = with tqdm(total=len(symbols)) as pbar:for s in symbols:self._fetch_async(s, results, pbar)multitasking.wait_for_tasksreturn pd.concat(results) if results else pd.DataFrame

性能飞跃:

多线程并行:通过multitasking库实现异步请求,速度提升10倍!进度可视化:tqdm实时显示抓取进度,告别“黑屏等待”焦虑。结果整合:自动合并所有股票数据为一个DataFrame,直接可用!# utils.pyfrom datetime import datetimeimport pandas as pddef convert_dtypes(df, exclude_cols=):"""增强的类型转换函数"""numeric_cols = list(set(df.columns) - set(exclude_cols))df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors='coerce')return dfdef parse_timestamp(ts):"""更健壮的时间戳解析"""try:return datetime.fromtimestamp(float(ts)).strftime('%Y-%m-%d %H:%M:%S')except:return pd.NaTdef validate_columns(df, required_columns):missing = set(required_columns) - set(df.columns)if missing:print(f"警告:缺失必要列 {missing}")for col in missing:df[col] = pd.NAreturn df[required_columns]# main.pyfrom data_processor import DataFetcherif __name__ == '__main__':try:fetcher = DataFetcher# 实时数据示例print("获取实时行情...")realtime_df = fetcher.get_realtime(['上证综指', '深证成指', '创业板指', '纳斯达克', '新集能源', '德马科技', '易成新能', '龙佰集团', '贵州茅台'])print(realtime_df)# # 历史数据示例print("\n获取历史数据...")historical_df = fetcher.get_history('新集能源', start='2025-01-01', end='2025-12-31')print(historical_df)# # 批量数据示例print("\n批量数据...")batch_df = fetcher.batch_fetch(['新集能源'])print(batch_df)except Exception as e:print(f"操作失败: {str(e)}")from api_client import EastMoneyAPI # 添加API类导入from config import TRADE_DETAIL_MAP, MARKET_MAPPING # 添加config导入from utils import parse_timestamp, convert_dtypes # 添加utils导入import pandas as pdimport multitaskingfrom tqdm import tqdmfrom func_timeout import func_set_timeoutfrom datetime import datetime # 添加datetime导入from jsonpath import jsonpathclass DataFetcher:def __init__(self):self.api = EastMoneyAPIdef get_realtime(self, symbols):secids = [self.api.resolve_code(s) for s in (symbols if isinstance(symbols, list) else [symbols])]params = {'fields': ','.join(TRADE_DETAIL_MAP.keys),'secids': ','.join(secids),'fltt': '2','plat': 'Iphone','product': 'EFund'}try:data = self.api.fetch_data('realtime', params)return self._parse_realtime(data)except Exception as e:print(f"获取{symbols}历史数据失败: {str(e)}")return pd.DataFramedef _parse_realtime(self, data):items = jsonpath(data, '$..diff[:]') or df = pd.DataFrame(items)[list(TRADE_DETAIL_MAP.keys)]df = df.rename(columns=TRADE_DETAIL_MAP)df['市场'] = df['编号'].astype(str).map(MARKET_MAPPING)df['时间'] = df['更新时间戳'].apply(parse_timestamp)return convert_dtypes(df.drop(['编号', '更新时间戳'], axis=1), ['代码', '名称', '市场', '时间'])def get_history(self, symbol, start='19000101', end=None, freq='d'):code_id = self.api.resolve_code(symbol)params = {'secid': code_id,'beg': ''.join(str(start).split('-')),'end': ''.join(str(end or self.latest_date).split('-')),'klt': self._parse_freq(freq),'fqt': '1','fields1': 'f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13','fields2': 'f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61'}try:data = self.api.fetch_data('kline', params)return self._parse_history(data)except Exception as e:print(f"获取{symbol}历史数据失败: {str(e)}")return pd.DataFramedef _parse_history(self, data):klines = jsonpath(data, '$..klines[:]') or # 处理空数据情况if not klines:columns = ['date', 'name', 'code', 'open', 'high', 'low', 'close', 'volume', 'turnover', 'turnover_rate']return pd.DataFrame(columns=columns)# 确保数据完整性valid_klines = for k in klines:parts = k.split(',')if len(parts) == 11: # 验证字段数量valid_klines.append(parts)# 再次检查有效数据if not valid_klines:return pd.DataFrame(columns=['日期', '开盘', '收盘', '最高', '最低', '成交量', '成交额', '振幅', '涨跌幅', '涨跌额', '换手率'])df = pd.DataFrame(valid_klines)# 动态设置列名(根据实际字段数量)expected_columns = ['日期', '开盘', '收盘', '最高', '最低', '成交量', '成交额', '振幅', '涨跌幅', '涨跌额', '换手率']if len(df.columns) == len(expected_columns):df.columns = expected_columnselse:# 自动截断或填充列名df.columns = expected_columns[:len(df.columns)]for col in expected_columns[len(df.columns):]:df[col] = pd.NA# 添加额外字段df['代码'] = data.get('data', {}).get('code', '')df['名称'] = data.get('data', {}).get('name', '')# 字段排序ordered_cols = ['日期', '名称', '代码', '开盘', '最高', '最低', '收盘','成交量', '成交额', '换手率', '振幅', '涨跌幅', '涨跌额']df = df.reindex(columns=ordered_cols)# 类型转换return convert_dtypes(df, exclude_cols=['名称', '代码', '日期'])def latest_date(self):df = self.get_realtime('上证指数')return df['时间'].iloc[0].split[0] if not df.empty else datetime.now.strftime('%Y%m%d')@multitasking.task@func_set_timeout(15)def _fetch_async(self, symbol, results, pbar):try:results.append(self.get_history(symbol))finally:pbar.updatedef batch_fetch(self, symbols):results = with tqdm(total=len(symbols)) as pbar:for s in symbols:self._fetch_async(s, results, pbar)multitasking.wait_for_tasksreturn pd.concat(results) if results else pd.DataFrame@staticmethoddef _parse_freq(freq):freq_map = {'D': 101, 'W': 102, 'M': 103, '1M': 1, '5M': 5}return str(freq_map.get(freq.upper, 101))# config.pyTRADE_DETAIL_MAP = {'f12': '代码', # 证券代码(如:600000)——股票的身份证'f14': '名称', # 证券名称(如:贵州茅台)——股票的「真名」'f3': '涨幅', # 当日涨幅百分比(如:+5.2%)——主力资金动向的「体温计」'f2': '最新', # 实时价格(如:123.45元)——市场心跳的「脉搏」'f15': '最高', # 当日最高价(如:130.00元)——多头进攻的「冲锋号」'f16': '最低', # 当日最低价(如:118.50元)——空头防御的「警戒线」'f17': '今开', # 开盘价(如:120.00元)——市场情绪的「第一枪」'f8': '换手率', # 成交量/流通股(如:12.3%)——资金流动的「体检报告」'f10': '量比', # ⚡ 当前成交量/近期均量(如:1.5倍)——量能爆发的「加速器」'f9': '市盈率', # 市价/每股收益(如:30倍)——估值高低的「显微镜」'f5': '成交量', # 手(如:1000万手)——市场活跃度的「晴雨表」'f6': '成交额', # 万元(如:12.3亿元)——资金博弈的「战场规模」'f18': '昨收', # 前日收盘价(如:118.00元)——昨日行情的「终点站」'f20': '总市值', # 亿元(如:5000亿元)——公司价值的「天平」'f21': '流通市值', # 可交易市值(如:3000亿元)——筹码松紧的「压力计」'f13': '编号', # 内部编号(非公开)——东方财富的「暗号」'f124': '更新时间戳',# ⏳ 毫秒级时间戳(如:1715644690000)——数据新鲜度的「计时器」}MARKET_MAPPING = {'0': '深A', # 深圳证券交易所主板股票(如:000001)'1': '沪A', # 上海证券交易所主板股票(如:600000)'105': '美股', # 纽约/纳斯达克市场(如:AAPL)'106': '美股', # 纳斯达克市场(如:TSLA)'107': '美股', # 纽交所市场(如:IBM)'116': '港股', # 香港交易所(如:00700.HK)'128': '港股', # 恒生指数成分股'113': '上期所', # ️ 上海期货交易所(如:黄金期货)'114': '大商所', # 大连商品交易所(如:大豆期货)'115': '郑商所', # 郑州商品交易所(如:白糖期货)'8': '中金所', # 金融期货(如:沪深300股指期货)'142': '上海能源期货交易所', # 国际能源交易中心(如:原油期货)'155': '英股', # 标普500欧洲成分股'90': '板块' # 行业/主题指数(如:新能源车板块)}INDEX_MAPPING = {'上证综指': '1.000001', # 中国股市「晴雨表」,代码:SH000001'深证成指': '0.399001', # 深市标杆,代码:SZ399001'创业板指': '0.399006', # 新兴科技股代表,代码:SZ399006'沪深300': '1.000300', # 蓝筹股核心指数,代码:SH000300'上证50': '1.000016', # ️ 金融地产龙头,代码:SH000016'科创50': '1.000688', # 科技创新「风向标」,代码:SH000688'标普500': '100.SPX', # 美股核心指数(如:苹果、微软)'纳斯达克': '100.NDX', # 科技巨头聚集地(如:特斯拉、英伟达)'道琼斯': '100.DJIA', # 传统工业巨头指数(如:波音、可口可乐)}REQUEST_HEADERS = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', # ♂️ 伪装成火狐浏览器,骗过反爬机制!'Accept': 'application/json', # 告诉服务器:我需要JSON格式数据!'Referer': 'https://data.eastmoney.com/', # 模拟访问来源,避免被识别为爬虫!}# api_client.pyimport requestsfrom jsonpath import jsonpathfrom config import REQUEST_HEADERS, INDEX_MAPPINGimport warningswarnings.filterwarnings("ignore") # 忽略SSL警告(用于跳过证书验证)class EastMoneyAPI:def __init__(self):self.session = requests.Session # ️ 创建持久化会话,提升请求效率self.session.headers.update(REQUEST_HEADERS) # ♂️ 注入浏览器伪装头(来自config.py)self.base_urls = {'realtime': 'https://push2.eastmoney.com/api/qt/ulist.np/get',# 实时行情接口(获取股票最新价、涨跌幅等)'kline': 'https://push2his.eastmoney.com/api/qt/stock/kline/get',# K线图接口(获取历史分时/日K数据)'suggest': 'https://searchapi.eastmoney.com/api/suggest/get'# 模糊搜索接口(将股票名称转为代码)}def resolve_code(self, symbol):if symbol in INDEX_MAPPING: # 优先匹配预设指数(如上证综指→1.000001)return INDEX_MAPPING[symbol]params = {'input': symbol, # 输入用户提供的股票名称/代码'type': '14', # 东方财富API的「类型密钥」(14=股票)# 'token': 'D43BF722C8E33BDC906FB84D85E326E8' # ️ API令牌(需定期更新!)}resp = self.session.get( # 发送GET请求到模糊搜索接口self.base_urls['suggest'], params=params, verify=False # 跳过证书验证(生产环境慎用!))if resp.status_code == 200:data = resp.json.get('QuotationCodeTable', {}).get('Data', )if data:return data[0]['QuoteID'] # 返回第一个匹配的「标准代码」raise ValueError(f"无效的证券代码/名称: {symbol}") # 异常处理:用户输入错误时直接报错def fetch_data(self, endpoint, params):resp = self.session.get( # 发送到指定接口(如实时行情/历史K线)self.base_urls[endpoint], params=params, verify=False # 跳过证书验证(生产环境慎用!))resp.raise_for_status # 异常处理:HTTP错误直接抛出(如404/500)return resp.json # 返回解析后的JSON数据(如实时行情包含「涨幅」「量比」等)运行结果:

实时行情

历史数据

批量数据

这不仅仅是一段代码,而是一把打开财富之门的钥匙!无论是短线交易还是长线投资,掌握数据抓取技术,你就能比别人快人一步!现在就动手实践,让Python成为你的智能投资助手吧!

来源:心静佛现

相关推荐