摘要:在构建复杂的业务系统时,消息中间件一般是较底层的基础件,离具体业务逻辑有较大距离。举个例子,在风控系统中,我们需要按照客户的设定对进入系统的事件执行不同的风控策略,而不同的风控策略有不同的规则集合,不同的规则又需要使用不同的特征,不同的特征又需要不同的特征子系
在构建复杂的业务系统时,消息中间件一般是较底层的基础件,离具体业务逻辑有较大距离。举个例子,在风控系统中,我们需要按照客户的设定对进入系统的事件执行不同的风控策略,而不同的风控策略有不同的规则集合,不同的规则又需要使用不同的特征,不同的特征又需要不同的特征子系统来进行计算。在这种场景下,事件进入系统后所走的路由完全由用户在UI上动态设置,我们在构建和部署系统时是不能预先知道的。另外,在系统不断的演进过程中,新的场景、策略、规则和特征还会不断添加进来。如果对底层消息路由的管理没有一个统一的规划和设计,全靠人工来创建和管理底层的消息中间件,如创建消息主题和队列等,那么我们的系统会变得越来越复杂,越来越难管理,直到最终系统的运维和升级都会变得举步维艰。
为了能够更好地管理和维护底层的消息中间件,让其能够平滑地跟随业务系统演进和升级,我们需要在消息中间件上添加一个消息服务层。消息服务层将底层消息中间件封装起来,隐藏消息中间件的具体操作细节,对上层提供统一风格的协议转换、消息路由和服务端点等功能,从而使数据传输系统成为独立完整的服务。我们将这种具有一致管理界面的消息管理平台定义为消息服务层中间件。
当定义好软件需求后,放眼开源软件界,我们发现Apache Camel正是这样一个对底层消息中间件进行有效管理的消息服务层中间件。
Apache Camel是一个为了灵活地进行企业集成而开发的企业集成模式开源框架。说得通俗些,企业集成就是将企业中各种“乱七八糟”的子系统,通过集成平台整合起来,让它们形成一个有机的整体,发挥出更大的数据价值。注意,这里一定要强调“乱七八糟”才能将Apache Camel的强大之处体现出来。这是因为,在一个复杂的企业系统中,其业务系统可能非常丰富,它们提供的访问界面也是多种多样的,如EJB、JSM、RMI、FTP、HTTP、HDFS等。面对这么多复杂的接口,作为程序员的你估计会疯掉。但先别急着疯,如果你此时有Apache Camel在手,你就会发现将这些系统整合起来真的是一件易如反掌的事。
像一般消息中间件那样,Apache Camel提供了不同系统之间消息传递的模式,即点对点模式和发布/订阅模式。但Apache Camel又绝对不是一个简单的消息中间件,它还提供了大量的组件,这些组件实现了不同协议的网关功能,如EJB、JSM、RMI、FTP、HTTP和HDFS等。
基本上,不管是传统的还是最近才出现的新式数据接口协议,ApacheCamel都对其支持。对于少数非常特别或暂时没有的数据接口协议,我们也可以通过自行开发相关网关程序来实现。除此以外,ApacheCamel的核心是一个路由引擎,可以支持丰富灵活的路由规则,甚至是动态路由。这些功能特性都让Apache Camel成为企业集成模式的优秀实践,让其在企业集成场景下大显身手。
下面我们通过一个例子来领略一下用Apach.Camel集成系统是一件多么简单且令人身心愉悦的事情。
from("quartz://timer001?trigger.repeatInterval=1000&trigger.repeatCount=-1")
.to("http://localhost:8080/hello")
.convertBodyTo(String.class)
.process(new Processor {
@Override public void process(Exchange exchange) throws Exception {
// 为消息设置一个用于分区的key
String key = UUID.randomUUID.toString;
System.out.println("key: " + key);
exchange.getIn.setheader(KafkaConstants.KEY, key);
}
})
.to("Kafka:localhost:9092?topic=kafka-example&requestRequiredAcks=-1");
上面的代码使用
“from("quartz://timer001trigger.repeatInterval=1000&trigge
r.repeatCount=-1")”配置了一个定时任务,每隔1秒发送一个HTT.请求到微服务,然后将请求返回的结果发送到消息中间件Kafka。
虽然Apache camel的系统集成功能非常强大,但是我们还是要聚焦于本书的主题——实时流计算系统。下面我们将重点讲述Camel在流计算系统中的数据路由功能。通过Camel的消息路由功能,我们可以对流数据的路由进行方便、灵活的管理。
Apache camel支持灵活的路由方式,当我们使用Camel管理数据流的路由时,经常会用到Camel的条件路由功能和动态路由功能。下面的示例代码演示了如何将不同类型的事件发送到不同子系统对应的Kafka主题中去。
from("kafka:localhost:9092?topic=input_events2&groupId=CamelStaticRouteExample&
autoOffsetReset=latest&serializerClass=kafka.serializer.StringEncoder")
.process(new Processor {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(String.format("get event[%s]",
exchange.getIn.getBody(String.class)));
exchange.getIn.setHeader(KafkaConstants.KEY,
UUID.randomUUID.toString);
exchange.getIn.setHeader("event_type", JSONObject.parseObject(exchange.
getIn.getBody(String.class)).getString
("event_type"));
exchange.getIn.removeHeader(KafkaConstants.TOPIC);
// 必须删除KafkaConstants.TOPIC,否则Camel会根据这个值无限循环发送
}
})
.choice
.when(header("event_type").isEqualTo("click"))
.to("kafka:localhost:9092?topic=click&requestRequiredAcks=-1")
.when(header("event_type").isEqualTo("activate"))
.to("kafka:localhost:9092?topic=activate&requestRequiredAcks=-1")
.otherwise
.to("kafka:localhost:9092?topic=other&requestRequiredAcks=-1");
相比静态路由,Camel的动态路由稍微复杂一些,除了需要配置DSL外,还需要编写对应的Java代码来实现动态路由的逻辑。另外,在初次使用Camel的动态路由时,十有八九会掉进一个消息无限循环发送的“坑”里。所以,这里必须指出,Camel的官方文档特意强调,
dynamicRouter必须返回null来表示动态路由过程的结束,否则Camel将一直按照dynamicRouter返回的URI将消息循环投递下去。通过下面的动态路由代码实现,我们也能更清楚地理解这点。
from("kafka:localhost:9092?
topic=input_events2&groupId=CamelSwitchRouteExample&autoOffsetReset=latest&seriali
zerClass=kafka.serializer.StringEncoder")
.process(new Processor {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(String.format("get event[%s]",
exchange.getIn.getBody(String.class)));
exchange.getIn.setHeader(KafkaConstants.KEY,
UUID.randomUUID.toString);
exchange.getIn.setHeader("event_type", JSONObject.parseObject
(exchange.getIn.getBody(String.class)).getString("event_type"));
// 必须删除KafkaConstants.TOPIC,否则Camel会根据这个值无限循环发送
exchange.getIn.removeHeader(KafkaConstants.TOPIC); }
})
.dynamicRouter(method(DynamicRouter.class, "slip"));
public class DynamicRouter {
private static final Logger logger =
LoggerFactory.getLogger(DynamicRouter.class);
public String slip(Exchange exchange) {
Integer currentEndpointIndex = exchange.getProperty("currentEndpointInd
ex", Integer.class);
if (currentEndpointIndex == null) {
currentEndpointIndex = 0;
}
exchange.setProperty("currentEndpointIndex", currentEndpointIndex + 1);
String eventType = exchange.getIn.getHeader("event_type", String.class);
if (StringUtils.isEmpty(eventType)) {
return null;
}
List endpoints = getEndpoints(eventType);
if (CollectionUtils.isEmpty(endpoints)) {
return null;
}
if (currentEndpointIndex >= endpoints.size) {
return null;
}
String endpoint = endpoints.get(currentEndpointIndex);
String topic = parseTopicFromEndpoint(endpoint);
exchange.getIn.setHeader(KafkaConstants.TOPIC, topic);
logger.info("send event[%s] to endpoint[%s]", exchange.getProperty
("eventId"), endpoint);
return endpoint;
}
private List getEndpoints(String eventType) {
Map> eventEndpoints = new HashMap;
eventEndpoints.put("click", Arrays.asList(
"kafka:localhost:9092?topic=subsystem1&requestRequiredAcks=-1",
"kafka:localhost:9092?topic=subsystem2&requestRequiredAcks=-1"));
eventEndpoints.put("activate", Arrays.asList(
"kafka:localhost:9092?topic=subsystem2&requestRequiredAcks=-1",
"kafka:localhost:9092?topic=subsystem3&requestRequiredAc-ks=-1"));
eventEndpoints.put("other", Arrays.asList(
return eventEndpoints.get(eventType);
}
private String parseTopicFromEndpoint(String endpoint) { String params = endpoint.split("\\?")[1].split("&");
for (String param : params) {
String splits = param.split("=");
if ("topic".equals(splits[0])) {
return splits[1];
}
}
return null;
}
}
在上面的代码中,我们定义了一个动态路由类DynamicRouter,其中的slip方法完成了具体动态路由的算法。在slip方法中,使用currentEndpointIndex记录当前路由端点的索引,当任何时候需要结束动态路由过程时,均通过返回null值来终结路由的继续执行。
来源:大数据架构师