找回密码
 立即注册
查看: 281|回复: 0

Python性能优化实践—曼肯德尔趋势检验计算效率提升

[复制链接]
发表于 2022-10-10 09:57 | 显示全部楼层 |阅读模式
曼—肯德尔算法可以进行序列趋势和突变点的检验,实践发现对趋势的检验有较高的准确性,对突变点的检验相对较差。本文参考相关资料通过Python实现了对序列的曼肯德尔检验并进行了运行优化,提升了生产中的应用价值。
本次实践主要使用了算法优化、函数缓存、循环遍历优化、多进程、numba优化。
由于整个过程以numpy数组的数值计算为主进一步还可以使用向量化进行加速,或者使用numexpr对Z值计算部分进行优化,但是未实践。
本次优化前将使用第三方库的算法编写函数,并通过apply函数将函数应用到每一列,在我的电脑上每10000行上的计算时间为70秒以上。
方法一:优化循环后为40多秒可以运算完毕,在使用常量法优化然后利用三核多进程计算可以18秒运算完毕。
方法二:利用函数缓存法优化正态分布的分位点部分计算,然后使用numba加速,预热后运算可以达到4.5秒。
主要参考资料:
趋势检验方法(二)MK趋势检验_aaakirito的博客-CSDN博客_mk检验

程序的优化的主要步骤为发现性能瓶颈和对主要瓶颈进行优化,主要方法有算法优化、循环优化、计算优化和编译优化。
一、程序性能分析

在jupyter中
粗分析:
使用%%timeit对cell进行分析,比较各种实现方法的效率
细分析:
使用line_profiler对函数逐行分析,找到耗时占比最大的行
%load_ext line_profiler 加载line_profiler功能
%lprun -f funcname  funcname(argu) 对函数funcname进行分析,获取每行的运行时间

二、程序性能优化

1、算法优化

将使用第三方库的算法,改为手动优化过的算法
进行曼—肯德尔趋势检验可以使用现成的pymannkendall进行计算,这个库中包含了曼—肯德尔趋势检验和他的多种扩展,计算量也较为全面。使用较为方便但是运行较慢。
import pymannkendall as mk
def mk_ku(data):
    result=mk.original_test(data)   
    return (result.trend,result.z,result.slope)参考CSDN上aaakirito的博客_CSDN博客-ACM算法题,ACM简单题,python领域博主的方法进行改进,提高算法的运行效率,但是功能缩减仅能计算最常使用的trend、z和solope三种结果。

def mk_(y):  
    alpha=0.1
    n = len(y)

    # 计算S的值
    v = np.array(y[1:]).reshape((-1, 1))
    v = np.tile(v, len(y) - 1)
    for i in range(len(y) - 1):
        v[:, i] -= y
        v[0:i, i] = 0
    v = np.sign(v)
    s = np.sum(v)

    # 判断x里面是否存在重复的数,输出唯一数队列unique_x,重复数数量队列tp
    unique_x, tp = np.unique(x, return_counts=True)
    g = len(unique_x)

    # 计算方差VAR(S)
    if n == g:  # 如果不存在重复点
        var_s = (n * (n - 1) * (2 * n + 5)) / 18
    else:
        var_s = (n * (n - 1) * (2 * n + 5) - np.sum(tp * (tp - 1) * (2 * tp + 5))) / 18

    # 计算z_value
    if n <= 10:  # n<=10属于特例
        z = s / (n * (n - 1) / 2)
    else:
        if s > 0:
            z = (s - 1) / np.sqrt(var_s)
        elif s < 0:
            z = (s + 1) / np.sqrt(var_s)
        else:
            z = 0

    # 计算p_value,可以选择性先对p_value进行验证
    #p = 2 * (1 - norm.cdf(abs(z)))

    # 计算Z(1-alpha/2)
    q=normal_ppf(1 - alpha / 2)
    h = abs(z) >q
   

    # trend趋势判断
    if (z < 0) and h:
        trend ="decreasing"
    elif (z > 0) and h:
        trend ="increasing"
    else:
        trend = "no trend"
        
    # slope     
    n=len(y)   
    t=np.arange(0,n)
    slope, intercept = np.polyfit(y, t, deg=1)
   
    z=round(z,2)
    slope=round(slope,2)

    return trend,z,slope优化算法主要集中在两部分
第一部分:计算S值,趋势检验方法(二)MK趋势检验_aaakirito的博客-CSDN博客_mk检验的作者aaakirito和pymannkendall库的作者都给出了基于numpy数组的优化计算方法。其中pymannkendall库的计算方法更为优秀
以下是pymannkendall库的方法,利用了全1数组的特性进行了符号化计算和矩阵想减的方法加速了运算。
def mk_score(x):
    s = 0
    n=len(x)
    demo = np.ones(n)
    for k in range(n-1):
        s = s + np.sum(demo[k+1:n][x[k+1:n] > x[k]]) - np.sum(demo[k+1:n][x[k+1:n] < x[k]])        
    return s
aaakirito的思路为,由双循环改为通过复制np数组错位想减实现,大大提高了效率。作者也提出将循环缩减为log2(len(y))的方法,但是由于增加了开销,在短序列的时候表现较差,测试时在序列达到10000个样本时依然较差。
# 计算S的值
    v = np.array(y[1:]).reshape((-1, 1))
    v = np.tile(v, len(y) - 1)
    for i in range(len(y) - 1):
        v[:, i] -= y
        v[0:i, i] = 0
    v = np.sign(v)
    s = np.sum(v)
第二部分:优化scipy.stats 模块中标准正态分布的分位点函数norm.ppf()
经过测试该函数效率较低,耗时占总计算时间的40%多,通过在网上找了一个俄罗斯人的写的函数进行替代,效率大大提高。
但是这个函数中exit('R8_NORMAL_CDF_INVERSE - Fatal error!') 语句因为使用了exit()函数导致无法序列化进而不能开启多进程,用return 代替。(jupyter中有此问题,pycharm中没有。)
def r8_huge ( ):
    #返回理论最大值,当p小于等于0或者大于等于1时
    value = 1.79769313486231571E+308
    return value

def r8poly_value ( n, a, x ):
    value = 0.0
    for i in range ( n - 1, -1, -1 ):
        value = value * x + a
    return value

def normal_ppf(p):
    a = np.array([ \
        3.3871328727963666080, 1.3314166789178437745e+2, \
        1.9715909503065514427e+3, 1.3731693765509461125e+4, \
        4.5921953931549871457e+4, 6.7265770927008700853e+4, \
        3.3430575583588128105e+4, 2.5090809287301226727e+3])
    b = np.array([ \
        1.0, 4.2313330701600911252e+1, \
        6.8718700749205790830e+2, 5.3941960214247511077e+3, \
        2.1213794301586595867e+4, 3.9307895800092710610e+4, \
        2.8729085735721942674e+4, 5.2264952788528545610e+3])
    c = np.array([ \
        1.42343711074968357734, 4.63033784615654529590, \
        5.76949722146069140550, 3.64784832476320460504, \
        1.27045825245236838258, 2.41780725177450611770e-1, \
        2.27238449892691845833e-2, 7.74545014278341407640e-4])
    const1 = 0.180625
    const2 = 1.6
    d = np.array([ \
        1.0, 2.05319162663775882187, \
        1.67638483018380384940, 6.89767334985100004550e-1, \
        1.48103976427480074590e-1, 1.51986665636164571966e-2, \
        5.47593808499534494600e-4, 1.05075007164441684324e-9])
    e = np.array([ \
        6.65790464350110377720, 5.46378491116411436990, \
        1.78482653991729133580, 2.96560571828504891230e-1, \
        2.65321895265761230930e-2, 1.24266094738807843860e-3, \
        2.71155556874348757815e-5, 2.01033439929228813265e-7])
    f = np.array([ \
        1.0, 5.99832206555887937690e-1, \
        1.36929880922735805310e-1, 1.48753612908506148525e-2, \
        7.86869131145613259100e-4, 1.84631831751005468180e-5, \
        1.42151175831644588870e-7, 2.04426310338993978564e-15])
    split1 = 0.425
    split2 = 5.0

    if (p <= 0.0):
        value = - r8_huge()
        return value

    if (1.0 <= p):
        value = r8_huge()
        return value

    q = p - 0.5

    if (abs(q) <= split1):

        r = const1 - q * q
        value = q * r8poly_value(8, a, r) / r8poly_value(8, b, r)

    else:

        if (q < 0.0):
            r = p
        else:
            r = 1.0 - p

        if (r <= 0.0):
            value = -1.0
            exit('R8_NORMAL_CDF_INVERSE - Fatal error!')

        r = np.sqrt(- np.log(r))

        if (r <= split2):

            r = r - const2
            value = r8poly_value(8, c, r) / r8poly_value(8, d, r)

        else:

            r = r - split2
            value = r8poly_value(8, e, r) / r8poly_value(8, f, r)

        if (q < 0.0):
            value = - value

    return value
第三部分:计算sen斜率
经过上面两部分修改后,运行效率大大提高,然后进行斜率计算。以下斜率函数与pymannkendall库中算法相同但是减少了一系列的数据检查与清洗工作,健壮性降低运算效率提升。
def sen_slope(x):
    idx = 0
    n = len(x)
    d = np.ones(int(n*(n-1)/2))
    for i in range(n-1):
        j = np.arange(i+1,n)
        d[idx : idx + len(j)] = (x[j] - x) / (j - i)
        idx = idx + len(j)        
    return np.median(d)使用函数缓存进行优化:

可以看到 r8poly_value ( n, a, x ) 、normal_ppf(p)二个函数会被重复调用且参数值重复输入的情况,因此可以对这两个函数进行缓存,会极大的提升效率。
风暴之灵:Python 缓存函数提升性能—functools.cache

常量法:
可以看到计算Z值时,alpha 0.1时标准正态分布的分位点的值为 1.6448536269514715,如果我们只要最常用的alpha= 0.1取值。则不需要调用函数normal_ppf计算,可以节约时间。更重要的是该方法可以参与并行计算(函数缓存法由于闭包函数不能序列化,与多进程冲突)。

2、循环优化

请参考风暴之灵:pandas  使用迭代器高效遍历行和列 一文,使用numpy循环最快,单行numpy耗时为itertuples的20%-30%,但是itertuples已经优化到耗时很低,单行耗时us级,总节约时间不多。考虑到使用便利性使用itertuples。为了能够启用

def mk_pal(df):
    for row in df.itertuples():
        try:
            sr_1=row[10:35]
            n=len(sr_1)
            if n>1:
                idx=row[0]
                sr=np.array(sr)
                m=mk_fast(sr)      
                df.at[idx,"趋势"]=m[0]
                df.at[idx,"强度"]=m[1]
                df.at[idx,"斜率"]=m[2]
            else:
                df.at[id,"趋势"]="无数据"
                df.at[id,"趋势"]="无数据"
                df.at[id,"趋势"]="无数据"   
        except:
            pass
                        
    return df
3、计算优化

由于过程是计算密集型,采取多进行程来实现计算优化。
首先使用分块函数将数据分块
风暴之灵:使用numpy实现DataFrame高效分割分块
def df_dflist(pf_sp,n):
    col=df.columns
    Z_array=pf_sp.values
    ls_np=np.array_split(Z_array,n,axis=0)   
    ls_df=[pd.DataFrame(i,columns=col) for i in ls_np]
    return ls_df  
然后通过五步法实现并行计算。将以下代码放置与于 if __name__ == '__main__':之下运行
风暴之灵:使用joblib库,通过并发提升pandas计算效率,加速apply函数?
list_np =df_dflist(df,9)
tasks = [delayed(mk_pal)(block) for block in list_np]
works_2 = Parallel(n_jobs=3)
res = works_2(tasks)
data = pd.concat(res)
4、编译优化

通过JIT可以进一步优化,但是numba对涉及的函数不支持无法进入nopython模式,不能带来加速。
pypy目前已经到了3.9版本但是hpy还没有完全应用,当前的对C扩展库性能支持程度未知,没有试验。
numba优化
可以看到mk_score、sen_slope两个函数以及计算Z值(封装为z_score函数)的部分均是纯numpy数组运算,可以通过numba加速,通过使用@njit装饰器启用jit加以优化。由于使用装饰器的方法及numba的并行性,numba不能与多进程共同使用。

三、整体代码

多进程常量版本,代码较为简洁:
import math
from scipy.stats import norm, mstats
import numpy as np
import pandas as pd
import pymannkendall as mk
from joblib import Parallel, delayed
import time


#计算s值
def mk_score(x):
    s = 0
    n=len(x)
    demo = np.ones(n)
    for k in range(n-1):
        s = s + np.sum(demo[k+1:n][x[k+1:n] > x[k]]) - np.sum(demo[k+1:n][x[k+1:n] < x[k]])
    return s

#计算slope值
def sen_slope(x):
    idx = 0
    n = len(x)
    d = np.ones(int(n*(n-1)/2))
    for i in range(n-1):
        j = np.arange(i+1,n)
        d[idx : idx + len(j)] = (x[j] - x) / (j - i)
        idx = idx + len(j)
    return np.median(d)


def mk_fast_c(y):
    alpha = 0.1
    s = mk_score(y)
    n = len(y)

    if n > 1:

        # 判断x里面是否存在重复的数,输出唯一数队列unique_x,重复数数量队列tp
        unique_y, tp = np.unique(y, return_counts=True)
        g = len(unique_y)

        # 计算方差VAR(S)
        if n == g:  # 如果不存在重复点
            var_s = (n * (n - 1) * (2 * n + 5)) / 18
        else:
            var_s = (n * (n - 1) * (2 * n + 5) - np.sum(tp * (tp - 1) * (2 * tp + 5))) / 18

        # 计算z_value
        if n <= 10:  # n<=10属于特例
            z = s / (n * (n - 1) / 2)
        else:
            if s > 0:
                z = (s - 1) / np.sqrt(var_s)
            elif s < 0:
                z = (s + 1) / np.sqrt(var_s)
            else:
                z = 0

        # 计算Z(1-alpha/2) 当alpha0.1时q=1.6448536269514715
        #         q=normal_ppf(1 - alpha / 2)
        #         h = abs(z) >q
        h = abs(z) > 1.6448536269514715

        # trend趋势判断
        if (z < 0) and h:
            trend = "decreasing"
        elif (z > 0) and h:
            trend = "increasing"
        else:
            trend = "no trend"

        # slope
        n = len(y)
        t = np.arange(0, n)
        slope = sen_slope(y)
        z = round(z, 2)
        slope = round(slope, 2)

    return trend, z, slope


def df_dflist(pf_sp,n):
    col=df.columns
    Z_array=pf_sp.values
    ls_np=np.array_split(Z_array,n,axis=0)
    ls_df=[pd.DataFrame(i,columns=col) for i in ls_np]
    return ls_df


def mk_pal(df):
    for row in df.itertuples():

        sr_1 = row[10:35]
        n = len(sr_1)
        if n > 1:
            idx = row[0]
            sr = np.array(sr_1)
            m = mk_fast_c(sr)
            df.at[idx, "趋势"] = m[0]
            df.at[idx, "强度"] = m[1]
            df.at[idx, "斜率"] = m[2]
        else:
            df.at[idx, "趋势"] = "无数据"
    return df

if __name__ == '__main__':

    path = r"D:\data\MK0925\925.csv"
    path1 = r"D:\data\MK0925\100301.csv"
    df = pd.read_csv(path)

    df["趋势"] = ""
    df["强度"] = 0.0
    df["斜率"] = 0.0

    # 五步法实现并行计算
    t1 = time.time()
    list_np = df_dflist(df, 9)
    tasks = [delayed(mk_pal)(block) for block in list_np]
    works_2 = Parallel(n_jobs=3)
    res = works_2(tasks)
    data = pd.concat(res)
    t2 = time.time()
    print(t2-t1)
    print(data.head())
    data.to_csv(path1)
numba+函数缓存版本,运行最快
import math
from scipy.stats import norm, mstats
import numpy as np
import pandas as pd
import pymannkendall as mk
from joblib import Parallel, delayed
import time
from numba import njit
from functools import lru_cache

#计算s值
@njit
def mk_score(x):
    s = 0
    n=len(x)
    demo = np.ones(n)
    for k in range(n-1):
        s = s + np.sum(demo[k+1:n][x[k+1:n] > x[k]]) - np.sum(demo[k+1:n][x[k+1:n] < x[k]])
    return s
#计算slope值
@njit
def sen_slope(x):
    idx = 0
    n = len(x)
    d = np.ones(int(n*(n-1)/2))
    for i in range(n-1):
        j = np.arange(i+1,n)
        d[idx : idx + len(j)] = (x[j] - x) / (j - i)
        idx = idx + len(j)
    return np.median(d)
#计算Z值
@njit
def z_score(g,n,s,tp):

    # 计算方差VAR(S)
    if n == g:  # 如果不存在重复点
        var_s = (n * (n - 1) * (2 * n + 5)) / 18
    else:
        var_s = (n * (n - 1) * (2 * n + 5) - np.sum(tp * (tp - 1) * (2 * tp + 5))) / 18

    # 计算z_value
    if n <= 10:  # n<=10属于特例
        z = s / (n * (n - 1) / 2)
    else:
        if s > 0:
            z = (s - 1) / np.sqrt(var_s)
        elif s < 0:
            z = (s + 1) / np.sqrt(var_s)
        else:
            z = 0
    return z


def r8_huge ( ):
#返回理论最大值
    value = 1.79769313486231571E+308
    return value
#@lru_cache(maxsize=128, typed=False) 本行加缓存会报错 unhashable type: 'numpy.ndarray'
def r8poly_value ( n, a, x ):
    value = 0.0
    for i in range ( n - 1, -1, -1 ):
        value = value * x + a
    return value

@lru_cache(maxsize=128, typed=False)
def normal_ppf(p):
    a = np.array([ \
        3.3871328727963666080, 1.3314166789178437745e+2, \
        1.9715909503065514427e+3, 1.3731693765509461125e+4, \
        4.5921953931549871457e+4, 6.7265770927008700853e+4, \
        3.3430575583588128105e+4, 2.5090809287301226727e+3])
    b = np.array([ \
        1.0, 4.2313330701600911252e+1, \
        6.8718700749205790830e+2, 5.3941960214247511077e+3, \
        2.1213794301586595867e+4, 3.9307895800092710610e+4, \
        2.8729085735721942674e+4, 5.2264952788528545610e+3])
    c = np.array([ \
        1.42343711074968357734, 4.63033784615654529590, \
        5.76949722146069140550, 3.64784832476320460504, \
        1.27045825245236838258, 2.41780725177450611770e-1, \
        2.27238449892691845833e-2, 7.74545014278341407640e-4])
    const1 = 0.180625
    const2 = 1.6
    d = np.array([ \
        1.0, 2.05319162663775882187, \
        1.67638483018380384940, 6.89767334985100004550e-1, \
        1.48103976427480074590e-1, 1.51986665636164571966e-2, \
        5.47593808499534494600e-4, 1.05075007164441684324e-9])
    e = np.array([ \
        6.65790464350110377720, 5.46378491116411436990, \
        1.78482653991729133580, 2.96560571828504891230e-1, \
        2.65321895265761230930e-2, 1.24266094738807843860e-3, \
        2.71155556874348757815e-5, 2.01033439929228813265e-7])
    f = np.array([ \
        1.0, 5.99832206555887937690e-1, \
        1.36929880922735805310e-1, 1.48753612908506148525e-2, \
        7.86869131145613259100e-4, 1.84631831751005468180e-5, \
        1.42151175831644588870e-7, 2.04426310338993978564e-15])
    split1 = 0.425
    split2 = 5.0

    if (p <= 0.0):
        value = - r8_huge()
        return value

    if (1.0 <= p):
        value = r8_huge()
        return value

    q = p - 0.5

    if (abs(q) <= split1):

        r = const1 - q * q
        value = q * r8poly_value(8, a, r) / r8poly_value(8, b, r)

    else:

        if (q < 0.0):
            r = p
        else:
            r = 1.0 - p

        if (r <= 0.0):
            value = -1.0
#            exit('R8_NORMAL_CDF_INVERSE - Fatal error!')
#            exit()函数会因为无法序列化导致不能多进程,用return 代替
            return value

        r = np.sqrt(- np.log(r))

        if (r <= split2):

            r = r - const2
            value = r8poly_value(8, c, r) / r8poly_value(8, d, r)

        else:

            r = r - split2
            value = r8poly_value(8, e, r) / r8poly_value(8, f, r)

        if (q < 0.0):
            value = - value

    return value


def mk_fast_i(y):
    alpha = 0.1
    s = mk_score(y)
    n = len(y)

    if n > 1:

        # 判断x里面是否存在重复的数,输出唯一数队列unique_x,重复数数量队列tp
        unique_y, tp = np.unique(y, return_counts=True)
        g = len(unique_y)

        z = z_score(g, n, s, tp)

        # 计算Z(1-alpha/2)
        q = normal_ppf(1 - alpha / 2)
        h = abs(z) > q

        # trend趋势判断
        if (z < 0) and h:
            trend = "decreasing"
        elif (z > 0) and h:
            trend = "increasing"
        else:
            trend = "no trend"

        # slope
        n = len(y)
        t = np.arange(0, n)
        slope = sen_slope(y)
        z = round(z, 2)
        slope = round(slope, 2)

    return trend, z, slope


def mk_pal(df):
    for row in df.itertuples():
        sr_1 = row[10:35]
        n = len(sr_1)
        if n > 1:
            idx = row[0]
            sr = np.array(sr_1)
            m = mk_fast_i(sr)
            df.at[idx, "趋势"] = m[0]
            df.at[idx, "强度"] = m[1]
            df.at[idx, "斜率"] = m[2]
        else:
            df.at[id, "趋势"] = "无数据"

    return df


if __name__ == "__main__":

    path = r"D:\data\MK0925\925.csv"
    path1 = r"D:\data\MK0925\100302.csv"

    df_s = pd.read_csv(path)
    df = df_s[0:10000]
    df["趋势"] = ""
    df["强度"] = 0.0
    df["斜率"] = 0.0

    t1 = time.time()
    pf=mk_pal(df)
    t2 = time.time()
    print(t2 - t1)

#    print(df.head())
#     print(pf.head())
#
#     pf.to_csv(path1)

多进程版本
import math
from scipy.stats import norm, mstats
import numpy as np
import pandas as pd
import pymannkendall as mk
from joblib import Parallel, delayed
import time

#计算s值
def mk_score(x):
    s = 0
    n=len(x)
    demo = np.ones(n)
    for k in range(n-1):
        s = s + np.sum(demo[k+1:n][x[k+1:n] > x[k]]) - np.sum(demo[k+1:n][x[k+1:n] < x[k]])
    return s

#计算slope值
def sen_slope(x):
    idx = 0
    n = len(x)
    d = np.ones(int(n*(n-1)/2))
    for i in range(n-1):
        j = np.arange(i+1,n)
        d[idx : idx + len(j)] = (x[j] - x) / (j - i)
        idx = idx + len(j)
    return np.median(d)

#以下3个函数计算置信区间 标准正态分布的分位点
def r8_huge ( ):
#返回理论最大值
    value = 1.79769313486231571E+308
    return value
def r8poly_value ( n, a, x ):
    value = 0.0
    for i in range ( n - 1, -1, -1):
        value = value * x + a
    return value
def normal_ppf(p):
    a = np.array([ \
        3.3871328727963666080, 1.3314166789178437745e+2, \
        1.9715909503065514427e+3, 1.3731693765509461125e+4, \
        4.5921953931549871457e+4, 6.7265770927008700853e+4, \
        3.3430575583588128105e+4, 2.5090809287301226727e+3])
    b = np.array([ \
        1.0, 4.2313330701600911252e+1, \
        6.8718700749205790830e+2, 5.3941960214247511077e+3, \
        2.1213794301586595867e+4, 3.9307895800092710610e+4, \
        2.8729085735721942674e+4, 5.2264952788528545610e+3])
    c = np.array([ \
        1.42343711074968357734, 4.63033784615654529590, \
        5.76949722146069140550, 3.64784832476320460504, \
        1.27045825245236838258, 2.41780725177450611770e-1, \
        2.27238449892691845833e-2, 7.74545014278341407640e-4])
    const1 = 0.180625
    const2 = 1.6
    d = np.array([ \
        1.0, 2.05319162663775882187, \
        1.67638483018380384940, 6.89767334985100004550e-1, \
        1.48103976427480074590e-1, 1.51986665636164571966e-2, \
        5.47593808499534494600e-4, 1.05075007164441684324e-9])
    e = np.array([ \
        6.65790464350110377720, 5.46378491116411436990, \
        1.78482653991729133580, 2.96560571828504891230e-1, \
        2.65321895265761230930e-2, 1.24266094738807843860e-3, \
        2.71155556874348757815e-5, 2.01033439929228813265e-7])
    f = np.array([ \
        1.0, 5.99832206555887937690e-1, \
        1.36929880922735805310e-1, 1.48753612908506148525e-2, \
        7.86869131145613259100e-4, 1.84631831751005468180e-5, \
        1.42151175831644588870e-7, 2.04426310338993978564e-15])
    split1 = 0.425
    split2 = 5.0

    if (p <= 0.0):
        value = - r8_huge()
        return value

    if (1.0 <= p):
        value = r8_huge()
        return value

    q = p - 0.5

    if (abs(q) <= split1):

        r = const1 - q * q
        value = q * r8poly_value(8, a, r) / r8poly_value(8, b, r)

    else:
        if (q < 0.0):
            r = p
        else:
            r = 1.0 - p
        if (r <= 0.0):
            value = -1.0
            exit('R8_NORMAL_CDF_INVERSE - Fatal error!')
            #在jupyter中exit()函数会因为无法序列化导致不能多进程,用return 代替,但是pycharm中无此问题
            #return value

        r = np.sqrt(- np.log(r))

        if (r <= split2):
            r = r - const2
            value = r8poly_value(8, c, r) / r8poly_value(8, d, r)
        else:
            r = r - split2
            value = r8poly_value(8, e, r) / r8poly_value(8, f, r)
        if (q < 0.0):
            value = - value

    return value

##计算mk
def mk_fast(y):
    alpha = 0.1
    s = mk_score(y)
    n = len(y)

    if n > 1:

        # 判断x里面是否存在重复的数,输出唯一数队列unique_x,重复数数量队列tp
        unique_y, tp = np.unique(y, return_counts=True)
        g = len(unique_y)

        # 计算方差VAR(S)
        if n == g:  # 如果不存在重复点
            var_s = (n * (n - 1) * (2 * n + 5)) / 18
        else:
            var_s = (n * (n - 1) * (2 * n + 5) - np.sum(tp * (tp - 1) * (2 * tp + 5))) / 18

        # 计算z_value
        if n <= 10:  # n<=10属于特例
            z = s / (n * (n - 1) / 2)
        else:
            if s > 0:
                z = (s - 1) / np.sqrt(var_s)
            elif s < 0:
                z = (s + 1) / np.sqrt(var_s)
            else:
                z = 0

        # 计算p_value,可以选择性先对p_value进行验证
        # p = 2 * (1 - norm.cdf(abs(z)))

        # 计算Z(1-alpha/2)
        q = normal_ppf(1 - alpha / 2)
        h = abs(z) > q

        # trend趋势判断
        if (z < 0) and h:
            trend = "decreasing"
        elif (z > 0) and h:
            trend = "increasing"
        else:
            trend = "no trend"

        # slope
        n = len(y)
        slope = sen_slope(y)
        z = round(z, 2)
        slope = round(slope, 2)

    return trend, z, slope

##为多进程准备数据分割
def df_dflist(pf_sp,n):
    col=df.columns
    Z_array=pf_sp.values
    ls_np=np.array_split(Z_array,n,axis=0)
    ls_df=[pd.DataFrame(i,columns=col) for i in ls_np]
    return ls_df

#逐行进行mk计算,封装为函数方便进行并行计算
def mk_pal(df):
    for row in df.itertuples():
        try:
            sr_1 = row[10:35]
            n = len(sr_1)
            if n > 1:
                idx = row[0]
                sr = np.array(sr_1)
                m = mk_fast(sr)
                df.at[idx, "趋势"] = m[0]
                df.at[idx, "强度"] = m[1]
                df.at[idx, "斜率"] = m[2]
            else:
                df.at[id, "趋势"] = "无数据"
                df.at[id, "趋势"] = "无数据"
                df.at[id, "趋势"] = "无数据"
        except:
            pass

    return df

def mk_pal1(df):
    for row in df.itertuples():
        sr_1 = row[10:35]
        n = len(sr_1)
        if n > 1:
            idx = row[0]
            sr = np.array(sr_1)
            m = mk_fast(sr)
            df.at[idx, "趋势"] = m[0]
            df.at[idx, "强度"] = m[1]
            df.at[idx, "斜率"] = m[2]
        else:
            df.at[id, "趋势"] = "无数据"
            df.at[id, "趋势"] = "无数据"
            df.at[id, "趋势"] = "无数据"


    return df




if __name__ == '__main__':

    path = r"D:\data\MK0925\925.csv"
    path1 = r"D:\data\MK0925\1004.csv"
    df = pd.read_csv(path)

    df["趋势"] = ""
    df["强度"] = 0.0
    df["斜率"] = 0.0

    # 五步法实现并行计算
    t1 = time.time()
    list_np = df_dflist(df, 9)
    tasks = [delayed(mk_pal1)(block) for block in list_np]
    works_2 = Parallel(n_jobs=3)
    res = works_2(tasks)
    data = pd.concat(res)
    t2 = time.time()
    print(t2-t1)
    print(data.head())
    data.to_csv(path1)
懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Unity开发者联盟 ( 粤ICP备20003399号 )

GMT+8, 2024-11-15 10:41 , Processed in 0.114155 second(s), 25 queries .

Powered by Discuz! X3.5 Licensed

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表