本发明涉及信息处理技术领域,特别涉及一种消息处理方法及系统。
背景技术:
随着大数据技术在各行各业得到了广泛应用,对海量数据进行实时处理的需求越来越多。现有技术中实现消息实时处理的方法,一种方式是本地服务直接同步调用其他业务的接口,等待接口返回结果后再进行后续业务处理,这种方案仅适于流量不大的场景,当消息量增加时本地系统吞吐量也会变高,严重影响消息处理的服务质量。另一种方式是本地服务发送消息到消息总线,由消息总线进行统一的处理和转发,消息的格式、转发规则和错误或超时处理机制,由于消息总线采用同步调用的方法,同步调用是一种阻塞式调用,调用方要等待对方执行完毕才返回,当某个远程业务出错或超时,会影响消息系统当前线程的服务质量。
技术实现要素:
为此,本发明提供了一种网络请求处理方法、计算设备及存储介质,以力图解决或者至少缓解上面存在的至少一个问题。
根据本发明的一个方面,提供了一种消息处理方法,包括步骤:从发布订阅系统获取消息数据流,按照时间将获取的消息数据流分组后聚合为多个弹性分布式数据集,其中,消息数据流是通过封装并序列化存储至所述发布订阅系统的;对于每个弹性分布式数据集,获取弹性分布式数据集包括的多个消息,根据多个消息的消息头部的标识信息,分别获取预定义规则对所述多个消息进行处理;当消息类型为计算类型时,对处理后的弹性分布式数据集,每次拉取每个弹性分布式数据集中的多个处理后的消息,根据预定义规则对所拉取的消息进行进一步处理;将处理后的消息分别转发给对应的业务系统,根据多个业务系统返回的信息创建消息重发任务,定时向对应的业务系统发送消息重发任务中的消息。
可选地,在根据本发明的消息处理方法中,按照时间将获取的消息数据分组后聚合为多个弹性分布式数据集,包括步骤:将序列化后的消息数据流以预定时间片为单位划分为多个分组,对分组后的消息数据流的头部进行反序列化处理,将处理后的数据聚合为弹性分布式数据集。
可选地,在根据本发明的消息处理方法中,根据多个消息的消息头部的标识信息,分别获取预定义的规则对多个消息进行处理,包括步骤:若消息头部的标识信息包括规则信息,则将该消息的消息体反序列化,并根据标识信息获取对应的预定义规则,对反序列化后的消息进行处理。
可选地,在根据本发明的消息处理方法中,业务系统返回的信息的类型包括:处理成功信息、处理失败信息、超时信息。
可选地,在根据本发明的消息处理方法中,将最终处理后的消息转发给对应的业务系统之前,还包括步骤:将最终处理后的消息存储在本地缓存中。
可选地,在根据本发明的消息处理方法中,根据业务系统返回的信息创建重发任务,定时向对应的业务系统发送消息重发任务中的消息,包括步骤:若收到处理成功信息,删除对应的本地缓存中的消息。
可选地,在根据本发明的消息处理方法中,根据业务系统返回的信息创建重发任务,定时向对应的业务系统发送消息重发任务中的消息,包括步骤:若收到处理失败和超时消息,建立失败和超时的消息重发任务,获取本地缓存中对应的消息进行重发。
可选地,在根据本发明的消息处理方法中,预定义规则包括:数据映射规则,所述数据映射规则适于将反序列化后的消息体中的键值对根据配置信息中的映射表进行映射;消息行为规则,所述消息行为规则适于根据反序列化后的消息头部和消息体中的信息将消息发送到指定地址;消息合并规则,所述消息合并规则适于对消息进行合并。
可选地,在根据本发明的消息处理方法中,根据业务系统返回的信息创建重发任务之后,还包括步骤:若接收到业务系统返回的处理失败信息的数量达到阈值,根据处理失败信息获取对应消息,并将处理失败信息转发给该消息对应的负责人。
根据本发明的另一个方面,提供了一种消息处理与分发系统,包括:发布订阅系统,适于存储通过封装和序列化的消息数据流,并将消息数据流实时发送给消息处理系统;消息处理系统,适于执行如前所述的消息处理方法对接收的消息数据流进行处理,并将处理后的消息分别转发给对应的业务系统;业务系统,业务系统有多个,适于分别接收来自消息处理系统的消息,并记录消息在该业务系统的处理成功信息、处理失败信息、超时信息,返回给消息处理系统。
可选地,在根据本发明的消息处理与分发系统中,业务系统利用sdk实现记录消息在该业务系统的处理成功信息、处理失败信息、超时信息。
根据本发明的又一个方面,提供了一种计算设备,包括至少一个处理器;以及存储器,存储有程序指令,其中程序指令被配置为适于由至少一个处理器执行,程序指令包括用于执行根据本发明的消息处理方法的指令。
根据本发明的又一个方面,提供了一种存储有程序指令的可读存储介质,当该程序指令被计算设备读取并执行时,使得计算设备执行本发明的网络请求处理方法。
根据本发明的技术方案,消息数据封装并序列化存储到发布订阅系统形成消息数据流,按时间将消息数据流分成多批,对每批消息根据预定义规则进行处理后,将非计算类型的消息直接转发到对应的业务方,当消息类型为计算类型例如需要合并时,每次一共拉取每批处理后的消息中固定大小的消息进行进一步处理,将处理后的消息分别转发给对应的业务系统,根据多个业务系统返回的信息创建消息重发任务,定时向对应的业务系统发送消息重发任务中的消息。利用发布订阅系统避免流量高峰的产生,使后续系统实现平稳地处理消息数据,对于所获取的每批消息,分别根据所定义的规则进行处理,再根据消息类型判断是否需要进行与其他批消息的进一步合并处理等,在考虑系统处理能力的同时进行汇总后再分发,而非消息类型的直接转发到对应的业务系统,提高消息处理的并行度以最小化地利用计算资源。
进一步地,发送消息后当前线程立即结束,无需等待业务系统执行完毕,由业务系统自身记录消息处理成功信息、处理失败信息、超时信息,而后根据业务系统的返回数据建立重发任务以避免消息出错,将性能消耗分散到每个业务系统来处理,具有处理数据量大、高并发、易弹性扩展、高容错的特性。
附图说明
为了实现上述以及相关目的,本文结合下面的描述和附图来描述某些说明性方面,这些方面指示了可以实践本文所公开的原理的各种方式,并且所有方面及其等效方面旨在落入所要求保护的主题的范围内。通过结合附图阅读下面的详细描述,本公开的上述以及其它目的、特征和优势将变得更加明显。遍及本公开,相同的附图标记通常指代相同的部件或元素。
图1示出了根据本发明的一个实施例的消息处理与分发系统100的示意图;
图2示出了根据本发明的一个实施例的计算设备200的示意图;
图3示出了根据本发明的一个实施例的消息处理方法300的流程图。
具体实施方式
下面将参照附图更详细地描述本公开的示例性实施例。虽然附图中显示了本公开的示例性实施例,然而应当理解,可以以各种形式实现本公开而不应被这里阐述的实施例所限制。相反,提供这些实施例是为了能够更透彻地理解本公开,并且能够将本公开的范围完整的传达给本领域的技术人员。
图1示出了根据本发明的一个实施例的消息处理与分发系统100的示意图。如图1所示,消息处理系统100包括:发布订阅系统110、消息处理系统120、业务系统130。随着网络技术的不断发展,如网上购物、网络游戏、金融交易等网上行为发生时会产生大量的消息数据。其中,客户端作为消息数据的生产者一端,需要将生产的消息数据存入发布订阅系统110,而业务系统120作为消费者一端,需要利用消息处理系统130从发布订阅系统读取业务数据并分析数据分析和监控。
具体来说,发布订阅系统110适于存储通过封装和序列化的消息数据流,并将消息数据流实时发送给消息处理系统120,利用发布订阅系统避免流量高峰的产生,使后续系统实现平稳地处理消息数据;消息处理系统120适于执行如下所述的消息处理方法对接收的消息数据流进行处理,并将处理后的消息分别转发给对应的业务系统130,根据多个业务系统130返回的信息创建消息重发任务,定时向对应的业务系统130发送消息重发任务中的消息;业务系统130有多个,适于分别接收来自消息处理系统120的消息,并记录消息在该业务系统130的处理成功信息、处理失败信息、超时信息,返回给消息处理系统120。根据本发明的一个实施例,发布订阅系统实现为kafka,kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据,本方案中用于对生产者产生的消息进行保存。消息处理系统实现为spark,是专为大规模数据处理而设计的快速通用的计算引擎。sparkstreaming是构建在spark上处理流数据的框架,基本的原理是将消息流数据分成小的时间片段(几秒),以批量处理的方式来处理这小部分数据。sparkingstreaming的处理响应时间一般以分钟为单位,而在处理实时数据时的延迟时间是秒级别的,因此,目前可采用基于sparkingstreaming框架实现的流处理系统能够用于实现对消息数据的实时处理。由流式数据的后续处理时实现实时跟踪客户兴趣、推荐合理内容、刷新客户榜单等功能。
在根据本发明的技术方案中,对于所获取的每批消息,消息处理系统120分别根据所定义的规则进行处理,再根据消息类型判断是否需要进行与其他批消息的进一步合并处理等,在考虑系统处理能力的同时进行汇总后再分发,而非消息类型的直接转发到对应的业务系统,提高消息处理的并行度以最小化地利用计算资源。进一步地,发送消息后当前线程立即结束,无需等待业务系统130执行完毕,由业务系统130自身记录消息处理成功信息、处理失败信息、超时信息,而后消息处理系统120根据业务系统130的返回数据建立重发任务以避免消息出错,将性能消耗分散到每个业务系统130来处理,具有处理数据量大、高并发、易弹性扩展、高容错的特性。
多个业务系统130作为消息的消费者一端,适于分别集成于多个计算设备200中,图2示出了根据本发明一个实施例的计算设备200的结构框图。在基本配置202中,计算设备200典型地包括系统存储器206和一个或者多个处理器204。存储器总线208可以用于在处理器204和系统存储器206之间的通信。
取决于期望的配置,处理器204可以是任何类型的处理,包括但不限于:微处理器(μp)、微控制器(μc)、数字信息处理器(dsp)或者它们的任何组合。处理器204可以包括诸如一级高速缓存210和二级高速缓存212之类的一个或者多个级别的高速缓存、处理器核心214和寄存器216。示例的处理器核心214可以包括运算逻辑单元(alu)、浮点数单元(fpu)、数字信号处理核心(dsp核心)或者它们的任何组合。示例的存储器控制器218可以与处理器204一起使用,或者在一些实现中,存储器控制器218可以是处理器204的一个内部部分。
取决于期望的配置,系统存储器206可以是任意类型的存储器,包括但不限于:易失性存储器(诸如ram)、非易失性存储器(诸如rom、闪存等)或者它们的任何组合。系统存储器206可以包括操作系统220、一个或者多个应用222以及程序数据226。在一些实施方式中,应用222可以布置为在操作系统上利用程序数据226进行操作。根据本发明的一个实施例,程序数据226包括sdk(softwaredevelopmentkit,软件开发工具包),sdk一般都是一些软件工程师为特定的软件包、软件框架、硬件平台、操作系统等建立应用软件时的开发工具的集合。集成了一个业务系统的计算设备200利用sdk实现处理来自消息处理系统120的消息的后续处理,并记录消息在该业务系统的处理成功信息、处理失败信息、超时信息,处理后把结果通知给如前所述的消息处理系统120。在向配置了sdk的业务系统130分发任务后,消息处理系统120根据业务系统130的返回数据建立重发任务以避免消息出错,将性能消耗分散到每个业务系统130来处理,使本方案具有处理数据量大、高并发、易弹性扩展、高容错的特性。
计算设备200还可以包括有助于从各种接口设备(例如,输出设备242、外设接口244和通信设备246)到基本配置102经由总线/接口控制器230的通信的接口总线240。示例的输出设备242包括图形处理单元248和音频处理单元250。它们可以被配置为有助于经由一个或者多个a/v端口252与诸如显示器或者扬声器之类的各种外部设备进行通信。示例外设接口244可以包括串行接口控制器254和并行接口控制器256,它们可以被配置为有助于经由一个或者多个i/o端口258和诸如输入设备(例如,键盘、鼠标、笔、语音输入设备、触摸输入设备)或者其他外设(例如打印机、扫描仪等)之类的外部设备进行通信。示例的通信设备246可以包括网络控制器260,其可以被布置为便于经由一个或者多个通信端口264与一个或者多个其他计算设备262通过网络通信链路的通信。
网络通信链路可以是通信介质的一个示例。通信介质通常可以体现为在诸如载波或者其他传输机制之类的调制数据信号中的计算机可读指令、数据结构、程序模块,并且可以包括任何信息递送介质。“调制数据信号”可以这样的信号,它的数据集中的一个或者多个或者它的改变可以在信号中编码信息的方式进行。作为非限制性的示例,通信介质可以包括诸如有线网络或者专线网络之类的有线介质,以及诸如声音、射频(rf)、微波、红外(ir)或者其它无线介质在内的各种无线介质。这里使用的术语计算机可读介质可以包括存储介质和通信介质二者。在一些实施例中,计算机可读介质中存储一个或多个程序,这一个或多个程序中包括执行某些方法的指令。
计算设备200上安装有支持网络文件传输和存储的移动app或者客户端应用,包括原生应用或者诸如包括ie、chrome和firefox等浏览器,或者微信、qq等通讯软件,并且本地存储有各种文件,如照片、音频、视频、文档(如word、pdf等格式的文档)。应用客户端可以运行在诸如windows、macos、安卓等操作系统上。计算设备200可以实现为小尺寸便携(或者移动)电子设备的一部分,这些电子设备可以是诸如蜂窝电话、数码照相机、个人数字助理(pda)、个人媒体播放器设备、无线网络浏览设备、个人头戴设备、应用专用设备、或者可以包括上面任何功能的混合设备。
图3示出了根据本发明的一个实施例的消息处理方法300的流程图,方法300适于在消息处理系统(例如前述消息处理系统120中)执行。如图3所示,消息处理方法始于步骤s310。
在步骤s310中,从发布订阅系统获取消息数据流,按照时间将获取的消息数据流分组后聚合为多个弹性分布式数据集。其中,消息数据流是通过封装并序列化存储至发布订阅系统的。发布订阅系统中,发布者(消息生产者)以某种方式对消息进行分类,订阅者(消息消费者)订阅它们,以便接收特定类型的消息。其中,客户端作为消息数据的生产者一端,把消息按照指定格式,封装成消息头和消息体,按照指定协议发送到发布订阅系统。
具体地,在封装过程中,生产者需要利用序列化器将消息对象的状态信息转换为可以存储或传输的形式,才能通过网络发送给发布订阅系统。通过发布订阅系统获取消息数据流后,还需要对消息进行处理例如拆解、计算和重新组装成新的消息等,才能转发到指定的业务系统。发布订阅系统可以采用kafka,消息生产者其利用protobuf工具对封装后消息数据进行序列化并存储至kafka中,主题是kafka数据写入操作的基本单元,一个主题包括多个分区,每条消息仅属于一个分区,消息生产者在发布数据时,需要指定将该消息发布到哪个主题,消息消费者即业务系统需要指定订阅哪个主题的消息,以实现接收此特定类型的消息。方法300实现的即是从发布的到订阅每个主题的中间的消息处理的过程,消息生产者和发布者由发布订阅系统指定。
把消息分成批次可以减少网络开销,根据本发明的一个实施例,为了提高效率,按照时间将获取的消息数据分组后聚合为多个弹性分布式数据集,包括步骤:将序列化后的消息数据流以预定时间片为单位划分为多个分组,对分组后的消息数据流的头部进行反序列化处理,将处理后的数据聚合为弹性分布式数据集。即按自定义的时间片将这些大量的消息数据流聚合为rdd(弹性分布式数据集),得到了一个包含消息数据的rdd对象。分组的步骤可以在sparkstreaming上进行,其以direct方式从kafka获取消息流数据分成小的时间片段(几秒),并以批量处理的方式来处理这小部分数据,且处理实时数据时的延迟时间是秒级别的。在序列化期间,对象将其当前状态写入到临时或持久性存储区,反序列化为从存储区中读取数据,重新创建对象的过程,由于消息头部和消息体分开序列化,此处仅需对消息头部进行反序列化。进一步地,通常在消息处理过程是分区的数量和处理节点的数量相当,而本方案中处理节点的数量是发布订阅系统每个主题中分区数的数十倍,以使资源利用最大化,提高消息吞吐量。
接着在步骤s320中,对于每个弹性分布式数据集,获取弹性分布式数据集包括的多个消息,根据多个消息的消息头部的标识信息,分别获取预定义规则对多个消息进行处理。对于每个rdd,需要对其中的消息进行处理例如拆解、计算和重新组装成新的消息等,才能转发到指定的业务系统。
根据本发明的一个实施例,根据多个消息的消息头部的标识信息,分别获取预定义的规则对多个消息进行处理,包括步骤:若消息头部的标识信息包括规则信息,则将该消息的消息体反序列化,并根据标识信息获取对应的预定义规则,对反序列化后的消息进行处理。无需处理的消息其头部标志中不存在对应的预定义规则,此时可以根据反序列化后的消息头部直接转发到对应的业务系统,消息体的反序列化由真正的消息消费者业务系统进行处理,提高分发速率。进一步地,预定义规则包括:
数据映射规则,数据映射规则适于将反序列化后的消息体中的键值对根据配置信息中的映射表进行映射,例如对消息体进行反序列化,把得到的结构化数据{“username”:”nickname”},,key为username替换为nickname,得到{“username”:”kevin”}->{“nickname”:”kevin”},其中,结构数据通常为json,也可以为xml或者protobuf等;
消息行为规则,消息行为规则适于根据反序列化后的消息头部和消息体中的信息将消息发送到指定地址,具体地,消息头中会有每条消息的key,反序列化后的结构化消息体中也有key,两个key中的地址信息匹配后发送消息到指定的地址(url或者ip端口);
消息合并规则,消息合并规则适于对消息进行合并,例如输入消息有{“username”:”kevin”,”gender”:”male”,”age”:”37”}{“username”:”yangkai”,”gender”:”male”,”age”:”37”}{“username”:”yangkai”,”gender”:”male”,”age”:”37”},利用规则{distinct(data.username,data.gender,data.age)}将消息进行合并,得到{“username”:”kevin”,”gender”:”male”,”age”:”37”}{“username”:”yangkai”,”gender”:”male”,”age”:”37”},防止业务系统收到重复的消息。还有其他自定义的规则本发明中不做限制,例如消息计数、求和、拼接等,依据具体的业务场景决定。
随后,在步骤s330中,当消息类型为计算类型时,对处理后的弹性分布式数据集,每次拉取每个弹性分布式数据集中的多个处理后的消息,根据预定义规则对所拉取的消息进行进一步处理。
分批对消息进行处理后,部分消息数据,例如其头部标识中仅对应映射规则,其消息类型则为非计算类型,对消息进行映射转换后可以直接进行转发到对应的业务系统。而对于消息头部中的标识还对应消息合并规则的消息,其消息类型则为计算类型,还需要和其他弹性分布式数据集中的消息再进行一次合并,以去除该主题下的冗余消息。这样,将非计算类型的消息直接转发到对应的业务方,当消息类型为计算类型例如需要合并时,每次一共拉取每批处理后的消息中固定大小的消息进行进一步处理,无需单独提高合并时处理节点的性能,即可实现消息流整体包含的大量消息数据的合并处理。需要指出的,消息类型为计算类型不仅指消息头部标识对应消息合并规则的消息,还包括对应需要和其他批次的消息进行聚合的规则,例如消息计数、求和、拼接等。在考虑系统处理能力的同时进行汇总后再分发,而非消息类型的直接转发到对应的业务系统,提高消息处理的并行度以最小化地利用计算资源。
进一步地,在利用spark对于计算类型的消息进行进一步处理时,每个处理节点的线程会有一个buffer缓冲,每次从处理后的弹性分布式数据集拉取与buffer缓冲相同大小的数据,然后通过内存中的一个map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,以此类推,直到最后将所有数据到拉取完,并得到最终的结果。一方面文件数量显著减少,另一方面减少写入缓存所占用的内存大小,提高消息数据流实时处理的速度。
最后,在步骤s340中,将处理后的消息分别转发给对应的业务系统,根据多个业务系统返回的信息创建消息重发任务,定时向对应的业务系统发送消息重发任务中的消息。进一步地,将处理后的消息存储在本地缓存中,以配合后续在消息出错时进行消息重发。当消息类型为非计算类型时,处理后的消息是指步骤s320中处理得到的消息,当消息类型为计算类型时,处理后的消息为步骤s330中进一步处理得到的消息。
发送消息后当前线程立即结束,无需等待业务系统执行完毕,而由业务系统在对收到的消息进行处理后返回消息是否成功处理的信息,根据本发明的一个实施例,业务系统返回的信息的类型包括:处理成功信息、处理失败信息、超时信息。进一步地,业务系统利用sdk实现记录消息在该业务系统的处理成功信息、处理失败信息、超时信息。sdk一般都是一些软件工程师为特定的软件包、软件框架、硬件平台、操作系统等建立应用软件时的开发工具的集合。在向配置了sdk的业务系统分发任务后,再根据业务系统的返回数据建立重发任务以避免消息出错,将性能消耗分散到每个业务系统来处理。
具体地,在业务系统端,sdk收到消息,进入本地业务系统处理。业务系统端采用自动异常捕获机制,如果没有异常,业务系统端业务正确处理完成。正常处理完业务后,sdk回调返回该消息处理成功的信息。若收到处理成功信息,删除对应的本地缓存中的消息。
如果出现错误,业务系统端sdk返回该消息的处理错误信息。若收到处理失败和超时消息,建立失败和超时的消息重发任务,获取本地缓存中对应的消息进行重发,转发消息失败后会启动消息总线重发任务。这样可以进一步确保由于网络或者其他故障,导致的消息不可用时,在故障恢复后可以收到消息,从而保证了消息的完整性。
根据本发明的又一个实施例,若接收到业务系统返回的处理失败信息的数量达到阈值,根据处理失败信息获取对应消息,并将处理失败信息转发给该消息对应的负责人。整个消息系统消息处理和转发的过程中,每个环节的操作都会被记录下来,为负责人查询、监控和报警提供数据源。
根据本发明的技术方案,消息数据封装并序列化存储到发布订阅系统形成消息数据流,按时间将消息数据流分成多批,对每批消息根据预定义规则进行处理后,将非计算类型的消息直接转发到对应的业务方,当消息类型为计算类型例如需要合并时,每次一共拉取每批处理后的消息中固定大小的消息进行进一步处理,将处理后的消息分别转发给对应的业务系统,根据多个业务系统返回的信息创建消息重发任务,定时向对应的业务系统发送消息重发任务中的消息。利用发布订阅系统避免流量高峰的产生,使后续系统实现平稳地处理消息数据,对于所获取的每批消息,分别根据所定义的规则进行处理,再根据消息类型判断是否需要进行与其他批消息的进一步合并处理等,在考虑系统处理能力的同时进行汇总后再分发,而非消息类型的直接转发到对应的业务系统,提高消息处理的并行度以最小化地利用计算资源。
进一步地,发送消息后当前线程立即结束,无需等待业务系统执行完毕,由业务系统自身记录消息处理成功信息、处理失败信息、超时信息,而后根据业务系统的返回数据建立重发任务以避免消息出错,将性能消耗分散到每个业务系统来处理,具有处理数据量大、高并发、易弹性扩展、高容错的特性。
在此处所提供的说明书中,说明了大量具体细节。然而,能够理解,本发明的实施例可以在没有这些具体细节的情况下被实践。在一些实例中,并未详细示出公知的方法、结构和技术,以便不模糊对本说明书的理解。
a9、如a8所述的方法,所述根据业务系统返回的信息创建重发任务之后,还包括步骤:
若接收到业务系统返回的处理失败信息的数量达到阈值,根据处理失败信息获取对应消息,并将处理失败信息转发给该消息对应的负责人。
类似地,应当理解,为了精简本公开并帮助理解各个发明方面中的一个或多个,在上面对本发明的示例性实施例的描述中,本发明的各个特征有时被一起分组到单个实施例、图、或者对其的描述中。然而,并不应将该公开的方法解释成反映如下意图:即所要求保护的本发明要求比在每个权利要求中所明确记载的特征更多特征。更确切地说,如下面的权利要求书所反映的那样,发明方面在于少于前面公开的单个实施例的所有特征。因此,遵循具体实施方式的权利要求书由此明确地并入该具体实施方式,其中每个权利要求本身都作为本发明的单独实施例。
本领域那些技术人员应当理解在本文所公开的示例中的设备的模块或单元或组件可以布置在如该实施例中所描述的设备中,或者可替换地可以定位在与该示例中的设备不同的一个或多个设备中。前述示例中的模块可以组合为一个模块或者此外可以分成多个子模块。
本领域那些技术人员可以理解,可以对实施例中的设备中的模块进行自适应性地改变并且把它们设置在与该实施例不同的一个或多个设备中。可以把实施例中的模块或单元或组件组合成一个模块或单元或组件,以及此外可以把它们分成多个子模块或子单元或子组件。除了这样的特征和/或过程或者单元中的至少一些是相互排斥之外,可以采用任何组合对本说明书(包括伴随的权利要求、摘要和附图)中公开的所有特征以及如此公开的任何方法或者设备的所有过程或单元进行组合。除非另外明确陈述,本说明书(包括伴随的权利要求、摘要和附图)中公开的每个特征可以由提供相同、等同或相似目的的替代特征来代替。
此外,本领域的技术人员能够理解,尽管在此所述的一些实施例包括其它实施例中所包括的某些特征而不是其它特征,但是不同实施例的特征的组合意味着处于本发明的范围之内并且形成不同的实施例。例如,在下面的权利要求书中,所要求保护的实施例的任意之一都可以以任意的组合方式来使用。
此外,所述实施例中的一些在此被描述成可以由计算机系统的处理器或者由执行所述功能的其它装置实施的方法或方法元素的组合。因此,具有用于实施所述方法或方法元素的必要指令的处理器形成用于实施该方法或方法元素的装置。此外,装置实施例的在此所述的元素是如下装置的例子:该装置用于实施由为了实施该发明的目的的元素所执行的功能。
如在此所使用的那样,除非另行规定,使用序数词“第一”、“第二”、“第三”等等来描述普通对象仅仅表示涉及类似对象的不同实例,并且并不意图暗示这样被描述的对象必须具有时间上、空间上、排序方面或者以任意其它方式的给定顺序。
尽管根据有限数量的实施例描述了本发明,但是受益于上面的描述,本技术领域内的技术人员明白,在由此描述的本发明的范围内,可以设想其它实施例。此外,应当注意,本说明书中使用的语言主要是为了可读性和教导的目的而选择的,而不是为了解释或者限定本发明的主题而选择的。因此,在不偏离所附权利要求书的范围和精神的情况下,对于本技术领域的普通技术人员来说许多修改和变更都是显而易见的。对于本发明的范围,对本发明所做的公开是说明性的,而非限制性的,本发明的范围由所附权利要求书限定。
1.一种消息处理方法,包括步骤:
从发布订阅系统获取消息数据流,按照时间将获取的消息数据流分组后聚合为多个弹性分布式数据集,其中,所述消息数据流是通过封装并序列化存储至所述发布订阅系统的;
对于每个弹性分布式数据集,获取弹性分布式数据集包括的多个消息,根据所述多个消息的消息头部的标识信息,分别获取预定义规则对所述多个消息进行处理;
当消息类型为计算类型时,对处理后的弹性分布式数据集,每次拉取每个弹性分布式数据集中的多个处理后的消息,根据预定义规则对所拉取的消息进行进一步处理;
将处理后的消息分别转发给对应的业务系统,根据多个业务系统返回的信息创建消息重发任务,定时向对应的业务系统发送消息重发任务中的消息。
2.如权利要求1所述的方法,所述按照时间将获取的消息数据分组后聚合为多个弹性分布式数据集,包括步骤:
将序列化后的消息数据流以预定时间片为单位划分为多个分组,对分组后的消息数据流的头部进行反序列化处理,将处理后的数据聚合为弹性分布式数据集。
3.如权利要求1或2所述的方法,所述根据所述多个消息的消息头部的标识信息,分别获取预定义的规则对所述多个消息进行处理,包括步骤:
若消息头部的标识信息包括规则信息,则将该消息的消息体反序列化,并根据标识信息获取对应的预定义规则,对反序列化后的消息进行处理。
4.如权利要求1-3中任一项所述的方法,所述业务系统返回的信息的类型包括:处理成功信息、处理失败信息、超时信息。
5.如权利要求1-4中任一项所述的方法,所述将最终处理后的消息转发给对应的业务系统之前,还包括步骤:
将最终处理后的消息存储在本地缓存中。
6.如权利要求5所述的方法,所述根据业务系统返回的信息创建重发任务,定时向对应的业务系统发送消息重发任务中的消息,包括步骤:
若收到处理成功信息,删除对应的本地缓存中的消息。
7.如权利6所述的方法,所述根据业务系统返回的信息创建重发任务,定时向对应的业务系统发送消息重发任务中的消息,包括步骤:
若收到处理失败和超时消息,建立失败和超时的消息重发任务,获取本地缓存中对应的消息进行重发。
8.如权利要求1-7中任一项所述的方法,所述预定义规则包括:
数据映射规则,所述数据映射规则适于将反序列化后的消息体中的键值对根据配置信息中的映射表进行映射;
消息行为规则,所述消息行为规则适于根据反序列化后的消息头部和消息体中的信息将消息发送到指定地址;
消息合并规则,所述消息合并规则适于对消息进行合并。
9.一种消息处理与分发系统,包括:
发布订阅系统,所述发布订阅系统适于存储通过封装和序列化的消息数据流,并将消息数据流实时发送给消息处理系统;
消息处理系统,所述消息处理系统适于执行如权利要求1-8中任一项所述的消息处理方法对接收的消息数据流进行处理,并将处理后的消息分别转发给对应的业务系统;
业务系统,所述业务系统有多个,适于分别接收来自消息处理系统的消息,并记录消息在该业务系统的处理成功信息、处理失败信息、超时信息,返回给消息处理系统。
10.如权利要求9所述的消息处理与分发系统,所述业务系统利用sdk实现记录消息在该业务系统的处理成功信息、处理失败信息、超时信息。
技术总结