本申请属于数据流处理技术领域,特别涉及一种数据流连接优化方法、系统、终端以及存储介质。
背景技术:
随着大数据、云计算、物联网等技术的快速发展,其所产生的数据量呈现出爆发式增长的态势。面对如此大规模且复杂多样的数据,如何高效地对其进行处理加工,使之能为行业所用,实现数据增值,成为了大数据时代所面临的重大挑战。现今家喻户晓的网购、金融服务、交通数据等数据源产生的数据均为实时数据流,这是一种连续、大量、持续不断动态到达的数据序列。数据流可以随时间延续而无限增长,且随时间发展而变化,这都使得处理数据流相较于处理传统的批式历史数据变得更加复杂。但若能高效地对数据流进行分析筛选,找出关联,则可以将数据流的价值最大化。
theta-join(θ联接)是一种可用于处理数据流,从中挖掘数据之间的关系的操作。笛卡尔积(cartesianproduct)操作是theta-join操作的关键一步,但是笛卡尔积计算量大,比较耗时。如果在并行集群环境下进行theta-join操作,笛卡尔积还需要被传输到不同的计算节点上进行并行运算,这就意味着如果笛卡尔积的量很大,将会加重在并行环境下的通讯负载与计算负载。除此之外,如果没有将笛卡尔积操作均匀分配到并行计算节点上,还会导致负载失衡的问题,从而极大程度地影响theta-join的运行效率。
例如,文献[okcan,alper,andmirekriedewald."processingtheta-joinsusingmapreduce."inproceedingsofthe2011acmsigmodinternationalconferenceonmanagementofdata,pp.949-960.2011]提出了一种1-bucket-theta算法,该算法没有对输入的数据流进行任何形式的过滤,而是直接对数据流中的所有数据先进行耗时的排序操作,之后进行笛卡尔积操作,之后将笛卡尔积结果形成的矩形区域进行分割,然后将分割后的数据均匀分配到集群中的计算节点上从而实现负载均衡。该算法所进行的笛卡尔积数量巨大,运行代价高,效率低下。
文献[liu,wenjie,zhanhuaili,andyuntaozhou."anefficientfilterstrategyfortheta-joinqueryindistributedenvironment."in201746thinternationalconferenceonparallelprocessingworkshops(icppw),pp.77-84.ieee,2017]提出了一种crossfilterstrategy(cfs)算法,该算法在对数据流进行笛卡尔积之前先对数据流进行了较为粗糙的过滤,虽然一定程度上的避免了部分不必要的笛卡尔积计算,但是该算法所需的笛卡尔积运算量依旧很大,且依旧需要对数据进行耗时的排序处理。
文献[hu,ziyue,xiaopengfan,yangwang,andchengzhongxu."fastthetajoin:anoptimizationonmulti-waydatastream$$\theta$$-joinwithrangeconstraints."ininternationalconferenceonalgorithmsandarchitecturesforparallelprocessing,pp.174-189.springer,cham,2020]提出了一种fastthetajoin算法,该算法进一步优化了在进行笛卡尔积之前对数据流的过滤方式,通过在过滤时考虑theta运算符,从而实现对数据流进行更精细的过滤,提升算法性能。但该算法仍然存在以下不足:
1)该算法使用手工输入的固定的分区数量进行分区,分区数量是一个固定的参数,与数据流的特征相孤立,这使得其只能对数据流进行较粗略的分区,导致在过滤时仍存在大量不必要的笛卡尔积操作未能被过滤掉,从而影响到该算法进行theta-join的运行效率。
2)该算法在处理负载不均衡问题时依据每个节点的最大承载能力决定应该如何分配负载,这种方式使得如果其他节点并没有达到节点负载上限,在重新分配后负载不均衡的现象可能依然严重。
技术实现要素:
本申请提供了一种数据流连接优化方法、系统、终端以及存储介质,旨在至少在一定程度上解决现有技术中的上述技术问题之一。
为了解决上述问题,本申请提供了如下技术方案:
一种数据流连接优化方法,包括:
分别计算两个待连接数据流的标准差,根据所述标准差分别确定所述两个待连接数据流的第一分区数;
根据所述第一分区数分别对所述两个待连接数据流进行分区;
根据所述两个待连接数据流中所有分区的数据量平均值对过载分区进行二次分区;
根据theta条件对所述两个待连接数据流的所有分区进行过滤;
对所述过滤后的分区进行笛卡尔积操作,并从所述笛卡尔积操作结果中过滤出满足theta条件的数据组,作为theta-join操作的结果。
本申请实施例采取的技术方案还包括:所述根据所述标准差分别确定所述两个待连接数据流的第一分区数具体为:
上式中,r和s分别代表两个待连接数据流,partitionr和partitions分别代表将数据流r和s进行分区的分区数,stdr和stds分别为数据流r和s的标准差,
本申请实施例采取的技术方案还包括:所述根据所述第一分区数分别对所述两个待连接数据流进行分区具体为:
上式中,minr和mins分别表示数据流r和s中最小的数,maxr和maxs分别表示数据流r和s中最大的数。
本申请实施例采取的技术方案还包括:所述根据所述两个待连接数据流中所有分区的数据量平均值对过载分区进行二次分区具体为:
分别计算所述两个待连接数据流中所有分区的数据量平均值,并找出数据量大于所述平均值的过载分区;
根据所述平均值计算所述过载分区的第二分区数,根据所述第二分区数对所述过载分区进行二次分区。
本申请实施例采取的技术方案还包括:所述找出数据量大于所述平均值的过载分区具体为:
分别设定所述两个待连接数据流的再分区阈值,并分别用所述两个待连接数据流的数据量平均值乘以各自的再分区阈值,得到两个待连接数据流的再分区数据量;
分别判断所述两个待连接数据流中各个分区的数据量是否大于对应的再分区数据量,如果大于所述再分区数据量,则判定该分区为过载分区。
本申请实施例采取的技术方案还包括:所述对所述过载分区进行二次分区具体为:
上式中,partitioni和partitionj分别表示数据流r的第i个分区和数据流s的第j个分区,minpartitioni和minpartitionj分别代表分区i和分区j中的最小值,maxpartitioni和maxpartitionj分别代表分区i和分区j中的最大值,repartition代表对所述过载分区进行二次分区的第二分区数。
本申请实施例采取的技术方案还包括:所述根据theta条件对所述两个待连接数据流的所有分区进行过滤具体为:
当theta条件为“>”时,如果数据流r中第i个分区ri的最大值大于数据流s中第i个分区si的最小值,则认为分区ri与分区si的笛卡尔积中有可能存在满足theta条件的数据,即分区ri与分区si满足theta条件;否则认为分区ri与分区si不满足theta条件,则将该分区过滤掉。
本申请实施例采取的另一技术方案为:一种数据流连接优化系统,包括:
标准差计算模块:用于分别计算两个待连接数据流的标准差,根据所述标准差分别确定所述两个待连接数据流的第一分区数;
第一分区模块:用于根据所述第一分区数分别对所述两个待连接数据流进行分区;
第二分区模块:用于根据所述两个待连接数据流中所有分区的数据量平均值对过载分区进行二次分区;
分区过滤模块:用于根据theta条件对所述两个待连接数据流的所有分区进行过滤;
笛卡尔积操作模块:用于对所述过滤后的分区进行笛卡尔积操作,并从所述笛卡尔积操作结果中过滤出满足theta条件的数据组,作为theta-join操作的结果。
本申请实施例采取的又一技术方案为:一种终端,所述终端包括处理器、与所述处理器耦接的存储器,其中,
所述存储器存储有用于实现所述数据流连接优化方法的程序指令;
所述处理器用于执行所述存储器存储的所述程序指令以控制数据流连接优化。
本申请实施例采取的又一技术方案为:一种存储介质,存储有处理器可运行的程序指令,所述程序指令用于执行所述数据流连接优化方法。
相对于现有技术,本申请实施例产生的有益效果在于:本申请实施例的数据流连接优化方法、系统、终端以及存储介质通过借助数据流自身的波动特征进行动态分区,使分区包含对数据流特征的考量而变得更为细化,更细化的分区可以在过滤阶段更大程度地过滤掉不必要的笛卡尔积操作,从而提升行theta-join操作的性能;且分区过程无需排序,时间复杂度低,运行效率高;并且在分区后,根据所有分区的数据量平均值对过载分区进行二次分区,以避免并行环境下的负载失衡问题,提高数据流theta-join操作的性能。
附图说明
图1是本申请实施例的数据流连接优化方法的流程图;
图2为现有分区算法与本申请实施例的分区算法的负载均衡对比图;
图3为现有笛卡尔积操作算法与本申请实施例的笛卡尔积操作算法的效果对比图;
图4为本申请实施例的数据流连接优化系统结构示意图;
图5为本申请实施例的终端结构示意图;
图6为本申请实施例的存储介质的结构示意图。
具体实施方式
为了使本申请的目的、技术方案及优点更加清楚明白,以下结合附图及实施例,对本申请进行进一步详细说明。应当理解,此处所描述的具体实施例仅用以解释本申请,并不用于限定本申请。
针对现有技术的不足,本申请实施例的数据流连接优化方法利用输入数据流的波动特征作为确定数据流分区数的参考,从而能够动态地根据输入数据流的特征进行更合理的数据分区,以此在筛选阶段去除掉更多不必要的笛卡尔积操作,减少theta-join操作所需进行的笛卡尔积数量,同时减少并行计算环境下的通讯传输等代价,提高theta-join操作的性能。另外,针对并行集群计算环境下可能产生的负载不均衡问题,对负载过大的节点上的数据分区进行二次分区,从而缓解负载不均衡的问题,提高算法在并行计算环境下的运行效率。
具体地,请参阅图1,是本申请实施例的数据流连接优化方法的流程图。本申请实施例的数据流连接优化方法包括以下步骤:
s1:接收输入的两个待连接数据流;
s2:分别计算两个待连接数据流的标准差,根据标准差分别确定各个数据流的第一分区数;
本步骤中,由于在线输入的实时数据流复杂多样,脱离数据特征的固定分区策略无法对数据流进行合理分区,而不合理的分区使得在依据theta进行过滤时,很多不必要的笛卡尔积操作无法被过滤掉。因此,本发明实施例借助当前窗口中数据流的标准差确定数据流的分区数。标准差可以反映数据流的分散程度,标准差越大,表示数据流中的数据分散程度越大,此时可以将数据流分为更多的区,以将分散程度较大的数据尽可能分到不同的组中,从而使得依据theta的过滤更加细化。而标准差越小,表示数据流中的数据分散程度越小,此时的数据分布较为集中,数据流分组数也应越小。
具体的,数据流的标准差计算公式为:
公式(1)中,n为数据流的长度,xi为数据流中的第i个数,
由于计算集群中的计算节点有限,数据流的分区数不能过大;同时,为了保证算法可以在并行计算环境下可以高效运行,即使标准差很小,分组数也不能过小,因此,本申请实施例针对输入的两个待连接数据流分别设置一个最大分区数和一个最小分区数,根据当前窗口中两个待连接数据流各自的标准差以及最大分区数和最小分区数来确定各个数据流的第一分区数,从而对两个待连接数据流的分区更加细化,使得后续依据theta的过滤过程中可以过滤掉更多不必要的笛卡尔积操作。具体的,第一分区数计算方式具体为:
式(2)中,r和s分别代表两个数据流,partitionr和partitions分别代表将数据流r和s进行分区的分区数,stdr和stds分别为数据流r和s的标准差,
s3:根据第一分区数分别对两个待连接数据流进行分区;
本步骤中,两个待连接数据流的分区算法如下:
式(3)、(4)中,minr和mins分别表示数据流r和s中最小的数,maxr和maxs分别表示数据流r和s中最大的数。
s4:分别计算两个待连接数据流中所有分区的数据量平均值,并找出数据量大于平均值的过载分区,根据数据量平均值计算过载分区的第二分区数,根据第二分区数对过载分区进行二次分区;
本步骤中,由于不同分区中的数据量可能存在较大差异,本申请实施例通过对数据量过大的分区进行二次分区操作,避免由于分区内数据量不均衡造成的负载不均衡。二次分区算法具体为:分别设定两个待连接数据流的再分区阈值,并分别用两个待连接数据流的数据量平均值乘以各自的再分区阈值,得到两个待连接数据流的再分区数据量;分别判断两个待连接数据流中各个分区的数据量是否大于对应的再分区数据量,如果大于再分区数据量,则判定该分区为过载分区。二次分区算法公式具体为:
式(5)、(6)中,partitioni和partitionj分别表示数据流r的第i个分区和数据流s的第j个分区。minpartitioni和minpartitionj分别代表分区i和分区j中的最小值,maxpartitioni和maxpartitionj分别代表分区i和分区j中的最大值,repartition代表对过载分区进行二次分区的第二分区数,即将过载分区分成repartition份,第二分区数的计算公式为:
式(7)、(8)中,average_sizer和average_sizes分别代表将数据流r和s进行分区后每个分区的平均大小。
请一并参阅图2,为现有分区算法与本申请实施例的分区算法的负载均衡对比图。相较于现有分区算法中通过节点的最大承载能力决定分区数,本申请实施例的分区决策方式更加合理,可以达到更好的负载均衡。
s5:根据theta条件对两个数据流的所有分区进行过滤,将不满足theta条件的分区过滤掉;
本步骤中,在对数据流分区结束之后,根据theta条件对所有分区进行过滤,以进一步减少笛卡尔积操作的数量。过滤方式具体为:例如,当theta条件为“>”时,如果数据流r中第i个分区ri的最大值大于数据流s中第i个分区si的最小值,则认为分区ri与分区si的笛卡尔积中有可能存在满足theta条件的数据,即分区ri与分区si满足theta条件;否则认为分区ri与分区si不满足theta条件,则将该分区过滤掉,从而既减少了笛卡尔积操作的数量,又能在并行计算环境下减少节点之间的通讯代价,提高theta-join操作的性能。
同样地,当θ条件为“<”、“<=”或“>=”时,与theta条件为“>”时的分区过滤方式同理,此处将不再赘述。
s6:对过滤后的分区进行笛卡尔积操作,并从笛卡尔积操作结果中过滤出满足theta条件的数据组,作为theta-join操作的结果;
本步骤中,在对数据流进行完过滤之后,所保留下来的分区均为可能存在满足theta条件的数据组、需要对其进行笛卡尔积操作的分区。对这些分区进行笛卡尔积操作,并从笛卡尔积操作的结果中过滤出满足theta条件的数据组,作为theta-join操作的结果进行返回。
如图3所示,为现有笛卡尔积操作算法与本申请实施例的笛卡尔积操作算法的效果对比图。其中,(a)、(b)、(c)和(d)分别为1-bucket-theta算法、cfs算法、fastthetajoin算法和本算法的笛卡尔积操作所产生的结果矩阵示意图,(e)为四种算法的笛卡尔积数量柱状图。由于在过滤阶段对不必要的笛卡尔积进行过滤的效果不同,因此各种算法需要操作的笛卡尔积数量也不同。在(a)、(b)、(c)和(d)中,粗线边框代表需要进行笛卡尔积操作的区域,细线边框代表对区域进行的分区。如图所示,本发明所提算法需要进行笛卡尔积操作的区域最小,最为高效。另外,fastthetajoin算法与本发明所提算法不需要对分区内数据顺序进行排序,而1-bucket-theta与cfs算法则需要对数据进行整体排序,所以效率会更差。
基于上述,本申请实施例的数据流连接优化方法通过借助数据流自身的波动特征进行动态分区,使分区包含对数据流特征的考量而变得更为细化,更细化的分区可以在过滤阶段更大程度地过滤掉不必要的笛卡尔积操作,从而提升行theta-join操作的性能;且分区过程无需排序,时间复杂度低,运行效率高;并且在分区后,根据所有分区的数据量平均值对过载分区进行二次分区,以避免并行环境下的负载失衡问题,提高数据流theta-join操作的性能。
请参阅图4,为本申请实施例的数据流连接优化系统结构示意图。本申请实施例的数据流连接优化系统40包括:
标准差计算模块41:用于分别计算两个待连接数据流的标准差,根据所述标准差分别确定所述两个待连接数据流的第一分区数;
第一分区模块42:用于根据所述第一分区数分别对所述两个待连接数据流进行分区;
第二分区模块43:用于根据所述两个待连接数据流中所有分区的数据量平均值对过载分区进行二次分区;
分区过滤模块44:用于根据theta条件对所述两个待连接数据流的所有分区进行过滤;
笛卡尔积操作模块45:用于对所述过滤后的分区进行笛卡尔积操作,并从所述笛卡尔积操作结果中过滤出满足theta条件的数据组,作为theta-join操作的结果。
请参阅图5,为本申请实施例的终端结构示意图。该终端50包括处理器51、与处理器51耦接的存储器52。
存储器52存储有用于实现上述数据流连接优化方法的程序指令。
处理器51用于执行存储器52存储的程序指令以控制数据流连接优化。
其中,处理器51还可以称为cpu(centralprocessingunit,中央处理单元)。处理器51可能是一种集成电路芯片,具有信号的处理能力。处理器51还可以是通用处理器、数字信号处理器(dsp)、专用集成电路(asic)、现成可编程门阵列(fpga)或者其他可编程逻辑器件、分立门或者晶体管逻辑器件、分立硬件组件。通用处理器可以是微处理器或者该处理器也可以是任何常规的处理器等。
请参阅图6,为本申请实施例的存储介质的结构示意图。本申请实施例的存储介质存储有能够实现上述所有方法的程序文件61,其中,该程序文件61可以以软件产品的形式存储在上述存储介质中,包括若干指令用以使得一台计算机设备(可以是个人计算机,服务器,或者网络设备等)或处理器(processor)执行本发明各个实施方式方法的全部或部分步骤。而前述的存储介质包括:u盘、移动硬盘、只读存储器(rom,read-onlymemory)、随机存取存储器(ram,randomaccessmemory)、磁碟或者光盘等各种可以存储程序代码的介质,或者是计算机、服务器、手机、平板等终端设备。
对所公开的实施例的上述说明,使本领域专业技术人员能够实现或使用本申请。对这些实施例的多种修改对本领域的专业技术人员来说将是显而易见的,本申请中所定义的一般原理可以在不脱离本申请的精神或范围的情况下,在其它实施例中实现。因此,本申请将不会被限制于本申请所示的这些实施例,而是要符合与本申请所公开的原理和新颖特点相一致的最宽的范围。
1.一种数据流连接优化方法,其特征在于,包括:
分别计算两个待连接数据流的标准差,根据所述标准差分别确定所述两个待连接数据流的第一分区数;
根据所述第一分区数分别对所述两个待连接数据流进行分区;
根据所述两个待连接数据流中所有分区的数据量平均值对过载分区进行二次分区;
根据theta条件对所述两个待连接数据流的所有分区进行过滤;
对所述过滤后的分区进行笛卡尔积操作,并从所述笛卡尔积操作结果中过滤出满足theta条件的数据组,作为theta-join操作的结果。
2.根据权利要求1所述的数据流连接优化方法,其特征在于,所述根据所述标准差分别确定所述两个待连接数据流的第一分区数具体为:
上式中,r和s分别代表两个待连接数据流,partitionr和partitions分别代表将数据流r和s进行分区的分区数,stdr和stds分别为数据流r和s的标准差,
3.根据权利要求2所述的数据流连接优化方法,其特征在于,所述根据所述第一分区数分别对所述两个待连接数据流进行分区具体为:
上式中,minr和mins分别表示数据流r和s中最小的数,maxr和maxs分别表示数据流r和s中最大的数。
4.根据权利要求1至3任一项所述的数据流连接优化方法,其特征在于,所述根据所述两个待连接数据流中所有分区的数据量平均值对过载分区进行二次分区具体为:
分别计算所述两个待连接数据流中所有分区的数据量平均值,并找出数据量大于所述平均值的过载分区;
根据所述平均值计算所述过载分区的第二分区数,根据所述第二分区数对所述过载分区进行二次分区。
5.根据权利要求4所述的数据流连接优化方法,其特征在于,所述找出数据量大于所述平均值的过载分区具体为:
分别设定所述两个待连接数据流的再分区阈值,并分别用所述两个待连接数据流的数据量平均值乘以各自的再分区阈值,得到两个待连接数据流的再分区数据量;
分别判断所述两个待连接数据流中各个分区的数据量是否大于对应的再分区数据量,如果大于所述再分区数据量,则判定该分区为过载分区。
6.根据权利要求5所述的数据流连接优化方法,其特征在于,所述对所述过载分区进行二次分区具体为:
上式中,partitioni和partitionj分别表示数据流r的第i个分区和数据流s的第j个分区,minpartitioni和minpartitionj分别代表分区i和分区j中的最小值,maxpartitioni和maxpartitionj分别代表分区i和分区j中的最大值,repartition代表对所述过载分区进行二次分区的第二分区数。
7.根据权利要求6所述的数据流连接优化方法,其特征在于,所述根据theta条件对所述两个待连接数据流的所有分区进行过滤具体为:
当theta条件为“>”时,如果数据流r中第i个分区ri的最大值大于数据流s中第i个分区si的最小值,则认为分区ri与分区si的笛卡尔积中有可能存在满足theta条件的数据,即分区ri与分区si满足theta条件;否则认为分区ri与分区si不满足theta条件,则将该分区过滤掉。
8.一种数据流连接优化系统,其特征在于,包括:
标准差计算模块:用于分别计算两个待连接数据流的标准差,根据所述标准差分别确定所述两个待连接数据流的第一分区数;
第一分区模块:用于根据所述第一分区数分别对所述两个待连接数据流进行分区;
第二分区模块:用于根据所述两个待连接数据流中所有分区的数据量平均值对过载分区进行二次分区;
分区过滤模块:用于根据theta条件对所述两个待连接数据流的所有分区进行过滤;
笛卡尔积操作模块:用于对所述过滤后的分区进行笛卡尔积操作,并从所述笛卡尔积操作结果中过滤出满足theta条件的数据组,作为theta-join操作的结果。
9.一种终端,其特征在于,所述终端包括处理器、与所述处理器耦接的存储器,其中,
所述存储器存储有用于实现权利要求1-7任一项所述的数据流连接优化方法的程序指令;
所述处理器用于执行所述存储器存储的所述程序指令以控制数据流连接优化。
10.一种存储介质,其特征在于,存储有处理器可运行的程序指令,所述程序指令用于执行权利要求1至7任一项所述数据流连接优化方法。
技术总结