摘要:import Pandas as pdimport numpy as np@pd.api.extensions.register_dataframe_accessor("analysis")class AnalysisAccessor:def __init__
Pandas的强大之处不仅在于其内置功能,还在于其可扩展性。本文将深入探讨如何通过自定义函数和扩展技术来增强Pandas的功能和性能。
使用@pd.api.extensions.register_dataframe_accessor创建DataFrame扩展
import Pandas as pdimport numpy as np@pd.api.extensions.register_dataframe_accessor("analysis")class AnalysisAccessor:def __init__(self, pandas_obj):self._obj = pandas_objdef describe_plus(self):"""增强版describe,添加峰度和偏度"""desc = self._obj.describeif hasattr(self._obj, 'kurt'):desc.loc['kurtosis'] = self._obj.kurt(numeric_only=True)if hasattr(self._obj, 'skew'):desc.loc['skew'] = self._obj.skew(numeric_only=True)return descdef zscore(self, columns=None):"""计算指定列的Z-score标准化"""cols = columns or self._obj.select_dtypes(include=np.number).columnsreturn (self._obj[cols] - self._obj[cols].mean) / self._obj[cols].std# 使用示例df = pd.DataFrame(np.random.randn(100, 3), columns=['A', 'B', 'C'])print("增强版描述统计:")print(df.analysis.describe_plus)print("\nZ-score标准化:")print(df.analysis.zscore.head)使用@pd.api.extensions.register_series_accessor创建Series扩展
@pd.api.extensions.register_series_accessor("text")class TextAccessor:def __init__(self, pandas_obj):self._obj = pandas_objdef count_words(self):"""计算字符串中的单词数"""return self._obj.str.split.str.lendef remove_punctuation(self):"""移除标点符号"""import rereturn self._obj.str.replace(r'[^\w\s]', '', regex=True)# 使用示例s = pd.Series(['Hello, world!', 'Pandas is great.', 'Data analysis.'])print("\n单词计数:")print(s.text.count_words)print("\n移除标点符号:")print(s.text.remove_punctuation)自定义聚合函数
def weighted_mean(x, weights):"""加权平均聚合函数"""return np.sum(x * weights) / np.sum(weights)# 使用示例df = pd.DataFrame({'value': [10, 20, 30, 40],'weight': [1, 2, 3, 4]})print("\n加权平均值:", weighted_mean(df['value'], df['weight']))# 在groupby中使用df_group = pd.DataFrame({'group': ['A', 'A', 'B', 'B'],'value': [10, 20, 30, 40],'weight': [1, 2, 3, 4]})result = df_group.groupby('group').apply(lambda x: weighted_mean(x['value'], x['weight']))print("\n分组加权平均:")print(result)numexpr基础使用
# 首先安装:pip install numexprimport numexpr as ne# 创建大型DataFramedf = pd.DataFrame(np.random.rand(1000000, 4), columns=['A', 'B', 'C', 'D'])# 标准Pandas运算%timeit df['A'] + df['B'] * df['C'] / (df['D'] + 1)# 使用numexpr加速def pandas_numexpr(df):return ne.evaluate("A + B * C / (D + 1)", local_dict=df)%timeit pandas_numexpr(df)# 验证结果一致性assert np.allclose(df['A'] + df['B'] * df['C'] / (df['D'] + 1), pandas_numexpr(df))在Pandas查询中使用numexpr
# Pandas默认使用numexpr优化查询(如果已安装)result_standard = df.query('A > 0.5 & B 0.5) & (B 0.5 & B 0.5) & (B复杂表达式优化
# 复杂计算表达式def complex_calc(df):return (df['A']**2 + df['B']**3 + np.sin(df['C']) * np.log(abs(df['D']) + 1))# 使用numexpr优化def complex_calc_numexpr(df):return ne.evaluate("A**2 + B**3 + sin(C) * log(abs(D) + 1)",local_dict=df)# 性能比较%timeit complex_calc(df)%timeit complex_calc_numexpr(df)# 验证结果assert np.allclose(complex_calc(df), complex_calc_numexpr(df))简单的Cython函数
首先创建cython_extension.pyx文件:
# cython: language_level=3import numpy as npcimport numpy as cnpfrom libc.math cimport sqrtdef cython_sum(cnp.ndarray[double] arr):"""Cython实现的求和函数"""cdef double total = 0.0cdef int n = arr.shape[0]cdef int ifor i in range(n):total += arr[i]return totaldef cython_rolling_mean(cnp.ndarray[double] arr, int window):"""Cython实现的滚动平均"""cdef int n = arr.shape[0]cdef cnp.ndarray[double] out = np.zeros(n-window+1, dtype=np.float64)cdef double current_sum = 0.0cdef int i, j# 计算初始窗口和for i in range(window):current_sum += arr[i]out[0] = current_sum / window# 滑动窗口计算for i in range(1, n-window+1):current_sum = current_sum - arr[i-1] + arr[i+window-1]out[i] = current_sum / windowreturn out编译Cython扩展
创建setup.py文件:
from setuptools import setupfrom Cython.Build import cythonizeimport numpy as npsetup(ext_modules=cythonize("cython_extension.pyx"),include_dirs=[np.get_include])编译命令:
python setup.py build_ext --inplace在Pandas中使用Cython扩展
from cython_extension import cython_sum, cython_rolling_mean# 创建大型Seriess = pd.Series(np.random.rand(1000000))# 性能比较%timeit s.sum%timeit cython_sum(s.values)# 滚动平均比较window = 100%timeit s.rolling(window).mean.dropna%timeit pd.Series(cython_rolling_mean(s.values, window), index=s.index[window-1:])# 验证结果assert np.allclose(s.rolling(window).mean.dropna.values,cython_rolling_mean(s.values, window))更复杂的Cython-Pandas集成
# 在cython_extension.pyx中添加import pandas as pdfrom pandas cimport Seriesdef cython_group_mean(Series group_labels, Series values):"""Cython实现的高效分组平均"""cdef:dict sums = {}dict counts = {}Py_ssize_t i, n = len(group_labels)object groupdouble valuefor i in range(n):group = group_labels.iloc[i]value = values.iloc[i]if group in sums:sums[group] += valuecounts[group] += 1else:sums[group] = valuecounts[group] = 1result = pd.Series({group: sums[group] / counts[group] for group in sums})return result使用示例:
# 创建测试数据groups = pd.Series(np.random.choice(['A', 'B', 'C', 'D'], 1000000))values = pd.Series(np.random.randn(1000000))# 性能比较%timeit values.groupby(groups).mean%timeit cython_group_mean(groups, values)# 验证结果assert values.groupby(groups).mean.equals(cython_group_mean(groups, values))定义Cython加速的清洗函数
# 在cython_extension.pyx中添加def cython_clean_data(cnp.ndarray[double] values, double lower, double upper):"""Cython实现的数据清洗:将超出范围的值替换为边界值"""cdef:Py_ssize_t i, n = values.shape[0]cnp.ndarray[double] out = np.empty(n, dtype=np.float64)for i in range(n):if values[i] upper:out[i] = upperelse:out[i] = values[i]return out创建Pandas扩展方法
@pd.api.extensions.register_series_accessor("clean")class CleanAccessor:def __init__(self, pandas_obj):self._obj = pandas_objdef clip_fast(self, lower, upper):"""使用Cython加速的clip操作"""from cython_extension import cython_clean_datacleaned = cython_clean_data(self._obj.values, lower, upper)return pd.Series(cleaned, index=self._obj.index)def remove_outliers(self, n_std=3):"""使用numexpr加速的异常值处理"""import numexpr as nevalues = self._obj.valuesmean = values.meanstd = values.stdmask = ne.evaluate('(values > mean - n_std*std) & (values s.mean - 3*s.std) & (s 场景推荐方法性能实现难度简单向量化操作Pandas内置方法★★★★复杂行级操作apply/自定义函数★★★数学表达式优化numexpr★★★★★★性能关键循环Cython★★★★★★★★★性能优化金字塔
★★★★★ Cython/Numba★★★★ numexpr★★★ Pandas向量化操作★★ apply/自定义函数★ Python循环调试与验证技巧
# 验证结果一致性def verify_implementation(func1, func2, data, *args, **kwargs):result1 = func1(data, *args, **kwargs)result2 = func2(data, *args, **kwargs)if isinstance(result1, pd.DataFrame) or isinstance(result1, pd.Series):assert result1.equals(result2)else:assert np.allclose(result1, result2)print("验证通过!")# 示例验证s = pd.Series(np.random.randn(1000))verify_implementation(lambda x: x.clip(-1, 1),lambda x: x.clean.clip_fast(-1, 1),s)总结与进阶方向Pandas扩展:通过register_dataframe_accessor和register_series_accessor创建领域特定方法numexpr:优化复杂数学表达式,特别适合大型数据集Cython:对性能关键部分进行底层优化,实现接近C的性能进阶学习方向
# 1. 使用Numba加速Python函数# from numba import jit# @jit(nopython=True)# def numba_func(x): ...# 2. 使用Dask处理超大规模数据# import dask.dataframe as dd# ddf = dd.from_pandas(df, npartitions=4)# 3. 创建完整的Pandas扩展包# 参考:https://pandas.pydata.org/pandas-docs/stable/development/extending.html通过本文介绍的高级扩展技术,我们可以将Pandas的性能和功能提升到新的水平。请大家记住优化原则:先确保正确性,再考虑性能;先使用简单方法,再考虑高级优化。
来源:爱生活的程序旺一点号