Apache Kafka的Apache Trafodion消费者
本文介绍了如何实现Apache Trafodion与Apache Kafka的无缝结合。我们展示了Trafodion如何轻松地获取数据,如何结合不同的开源组件,从而使用 Apache Kafka、 Trafodion、 HBase 和Hadoop创建近实时的流式处理工作流。
如何实现各组件的结合?
什么是Kafka?Kafka是一个分布式、分区、多复本的日志提交服务。Kafka维护按类区分的消息,称为主题(topic)。生产者(producer)向Kafka的主题发布消息。消费者(consumer)订阅主题,接收发布到这些主题的消息。一个主题就是一个类别或者一个可订阅的条目名称。对每个主题来说,Kafka维护的是一个分区日志(partitioned log)。客户端控制将消息发布到哪个分区。
Kafka集群包含一个或多个服务器,每台服务器被称为broker。消费者向分区的leader broker发出fetch请求。在每个请求中,消费者指定偏移量(offset),从该位置返回日志块。如果有需要,消费者可以将偏移量倒回,重新消费数据。同时,Kafka保留消费者在日志中的位置,即偏移量(offset)。消费者在读取消息时,会提高其偏移量。消费者也可以按照任意的顺序消费消息。分区允许日志扩展到超过单个服务器,但是每个分区的大小必须适应于其服务器。我们可以对给定主题的数据进行分区,以便处理大量数据。
接下来,关于Trafodion SQL引擎的工作原理。Trafodion SQL编译器为所有的关系操作使用运算符模型,包括进程间的消息传递、用于横向扩展处理的可扩展分区功能。编译器根据region的边界或统计数据信息,生成使用表的分区布局的并行查询计划。
在同一进程中以及在跨多个节点重新分区或收集数据时,查询引擎在操作符间使用数据流模型。Trafodion工作负载在运行时使用分区并行,从而并行处理多个数据分区。
在分区并行计划中,多个运算符为相同的计划工作。使用多队列或管道合并结果,再保存输入分区的sort顺序。由于数据被划分成多个独立执行的单元,所以分区也被称为“数据并行”。表映射UDF允许在Trafodion使用MapReduce模型编程。这些UDF可以使用可选的表值参数并生成表值输出。
如同Trafodion中的其他运算符,表映射UDF可以并行执行。可选的优化器接口允许表映射UDF的编写人员实现多态性,以便在编译时确定结果列(名称和类型),将谓词下推到UDF或其输入表,从而影响UDF的并行度并执行各种其他优化。
Trafodion优化器采用从上而下的方法,非常适合通过运算符优化查询(例如,表映射UDF);它不会假设查询树的运算符是硬编码的。除了MapReduce模型编程的常规应用程序,表映射UDF也是一个简便而强大的机制,可以将其他数据源整合到Trafodion。
结合Trafodion和Kafka的并行架构是非常有益的。例如,可以直接使用示例的Trafodion Kafka UDF,也可以进行一些修改,用于消费Kafka生产者发出的数据。一旦执行了查询,Kafka UDF就将启动等待中的Trafodion执行程序。
下图展示了完整的流程:
示例:将数据导入Trafodion
以下步骤将连续数据加载到8节点集群的Trafodion表:
1.创建用于加载数据的表。
CREATE TABLE employee (id int not null,name varchar(20),email varchar(20),primary key(id))SALT USING 4 PARTITIONS ON (id);
上表有4个分区;每个节点一个分区。
2.创建一个主题。
创建一个名为employee的主题,有4个分区和2个副本。> bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 4 –topic employee
3.开启进程。
创建一个生产者。示例代码:Producer producer = new Producer(config);String topic = “employee” ;// Produce strings in delimited form “id|name|email”for (int i = 1 ; i < = 1000 ; i++) String msg = ; System.out.println(msg); data = new KeyedMessage(topic, String.valueOf(i), msg);producer.send(data);}
使用易鲸捷github资料库中的示例UDF(),消费Trafodion中的数据。注意:该简单的示例UDF不是并行执行的。
SELECT *FROM udf(kafka(‘localhost:2181′, — zookeeper connection0, — Kafka group id’employee’, — Kafka topic‘IC20C20’, — int, and two char output cols‘|’, — field delimiter100, — max. rows to read10000)) — timeout 10 secondsKafkaResult(id, name, email);– name the output columns