Kafka 内部架构

Kafka是一款基于发布于订阅的信息系统。文件系统或数据库提交日志用来提供所有事务的持久记录。通过重放这些日志可以重建系统的状态。同样地,Kafka的数据是按照一定顺序持久化保存的,可以按需读取。此外,Kafka的数据分布在整个系统里,具备数据故障保护和性能伸缩能力。

基本概念

Kafka的数据单元被称为消息,可以吧消息看成是数据库的一个“数据行”或一条“记录”。当消息以一种可控的方式写入不同的分区时,就会用到键,保证具有相同键的消息总是被写到相同的分区上。

批次就是一组消息,这些消息属于同一个主题和分区。批次数据会被压缩,这样可以提升数据的传输能力和存储能力。

Kafka的消息通过topic进行分类。topic可以被分为若干个partition。消息以追加的方式写入分区,然后以先进先出的顺序读取。由于一个主题一般包含多个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证在当个分区内的顺序。

生产者创建消息。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。

消费者读取消息。消费者订阅一个或多个topic,并按照消息生成的顺序读取它们。消费者通过检查小的偏移量来区分已经度过的消息。消费者把每个分区最后读取的消息偏移量保存在Zookeeper或Kafka上,如果消费者关闭或重启,可以从上次中断的地方继续处理消息。

一个独立的Kafka服务器被称为broker。broker接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。

每个topic可以设置单独的保留规则。消息的保留策略:

  1. 保留一段时间
  2. 保留到消息达到一定大小的字节数

要提高集群的容错能力,需要配置较高的复制系数。

生产者

private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<String, String>(kafkaProps);

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
    producer.send(record);
} catch(Exception e) {
    e.printStackTrace();
}

生产者的send()方法需要目标主题的名字和要发送的键和值对象。如果不关心发送结果,那么可以使用这种发送方式。

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
    producer.send(record).get();
} catch(Exception e) {
    e.printStackTrace();
}

send()方法先返回一个Future对象,然后调用Future对象的get()方法等待Kafka响应。如果服务器返回错误,get()方法会抛出异常。如果没有发生错误,我们会得到一个RecordMetadata对象,可以用它获取消息的偏移量。

private class DemoProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null)
            e.printStackTrace();
    }
}

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback);

为了在异步发送消息的同时能哦股对异常情况进行处理,生产者提供了回调支持。

acks参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。

  • 如果acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。所以可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
  • 如果acks=1,只要集群的首领节点收到消息,生产者就会收到一盒来自服务器的成功响应。
  • 如果acks=all,只有当所有参与复制的节点全部接收到消息时,生产者才会收到来自服务器的响应。

消费者

如果只使用单个消费者处理消息,应用程序会远跟不上消息生成的速度。显然,此时很有必要对消费者进行横向伸缩。就像多个生产者可以向相同的topic写入消息一样,我们也可以使用多个消费者从同一topic读取数据,对消息进行分流。Kafka消费者从属于group。一个group的消费者订阅的是同一个topic,每个消费者接收topic一部分分区的消息。

可以增加更多的消费者,让它们分担负载,每个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。只要保证每个应用程序有自己的group,就可以让它们获取到topic所有的消息。

一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或者发生崩溃时,它就离开集群,原本由它读取的分区将由群组里的其他消费者来读取。在主题发生变化时,比如添加了新的分区,会发生分区重分配。分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为Rebalance。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。

消费者通过向被指派为群组协调器的broker发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器就会认为它已经死亡,就会触发一次rebalance。

如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会立即触发一次rebalance,尽量降低处理停顿。

当消费者要加入群组时,向群组协调器发送一个JoinGroup请求。第一个加入群组的消费者将成为群主。群主从协调器哪里获得群组的成员列表,并负责给每一个消费者分配分区。群主把分配情况列表发送给群组协调器,协调器再把这些信息发送给消费者。每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.[ut("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

consumer.subsribe(Collections.singletonList("customerCountries"));

subscribe()方法接受一个主题列表作为参数。在调用subscribe()方法时可以传入一个正则表达式。

consumer.subscribe("test.*");

正则表达式可以匹配多个主题,如果创建了新的主题,那么会立即出发一次rebalance,消费者就可以读取新添加的主题。

Kafka如何进行复制

Kafka使用主题来组织数据,每个主题被分为若干个分区,每个分区有多个副本。那些副本被保存在broker上,每个broker可以保存成败上千个属于不同主题呵分区的副本。

每个分区都有一个首领副本。为了保证一致性,所有生产者请求和消费者请求都会经过这个副本。跟随者副本不处理来自客户端的请求,它们唯一的任务就是从首领哪里复制消息,保持与首领一致的状态。如果首领发生崩溃,其中的一个跟随者就会被提升为新首领。

如果一个副本无法与首领保持一致,在首领发生失效时,它就不可能成为新首领——毕竟他没有包含全部的消息。相反,持续请求得到最新消息副本的被称为同步副本。在首领发生失效时,只有同步副本才有可能被选举为新首领。

Kafka如何处理请求

Kafka客户端要自己负责把生产请求和获取请求发送到正确的broker上。客户端使用另一种请求类型,也就是元数据请求。客户端会把这些信息缓存起来,并直接往目标broker上发送生产请求和获取请求。

消费者请求需要闲到达指定的分区首领上,然后客户端通过查询元数据来确保请求的路由是正确的。首领在收到请求时,它会检查请求是否有效。如果请求的偏移量存在,broker将按照客户端指定的数量上限从分区里读取消息,再把消息返回给客户端。Kafka使用零复制向客户端发送消息,Kafka直接把消息文件(Linux文件系统缓存)里发送到网络通道,而不需要经过任何中间缓冲区。其他数据库再将数据发送给客户端之前会先把它们保存在本地缓存里。

大部分客户端只能读取已经被写入所有同步副本的消息。在消息还没被写入所有同步副本之前,是不会发送消费者的。

文件管理

因为在一个大文件里查找和删除消息是很费时的,也很容易出错,所以我们把分区分成若干个片段。默认情况下,每个片段包含1GB或一周的数据,在broker往分区写入数据时,如果达到片段上限,就关闭当前文件,并打开一个新文件。当前正在写入数据的片段叫做活跃片段,在片段被关闭之前这些数据无法被删除。

kafka为每个分区维护了一个索引。索引把偏移量映射到片段文件和偏移量在文件里的位置。如果索引出现损坏,Kafka会通过重新读取消息并录制偏移量和位置来重新生成索引。

Sprak Streaming 消费 Kafka

Spark-Streaming获取kafka数据的两种方式:Receiver和Direct,可以从代码中简单理解成Receiver方式是通过zookeeper来连接kafka队列,Direct方式是直接连接到kafka的节点上获取数据了。

Receiver

使用Kafka的高层次Consumer API来实现。receiver从Kafka中获取的数据都存储在Spark Executor的内存中,然后Spark Streaming启动的job会去处理那些数据。然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

Direct

Direct Approach (No Receivers) 方式的 createDirectStream 方法,但是第二种使用方式中 kafka 的 offset 是保存在 checkpoint 中的,如果程序重启的话,会丢失一部分数据。本文将用代码说明如何将 kafka 中的 offset 保存到 zookeeper 中,以及如何从 zookeeper 中读取已存在的 offset。

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.kafka010.{KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

object Consumer extends App {

  val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
  val ssc = new StreamingContext(conf, Seconds(5))

  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "spark",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
  )

  val topics = Array("streaming")
  val kafkaStream = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
  )

  kafkaStream.foreachRDD(rdd => {
    // ConsumerRecord(topic = maxwell, partition = 0, offset = 336754, CreateTime = 1553679302113, checksum = 807646051, serialized key size = 66, serialized value size = 265, key = {"database":"spero_apppush_service","table":"getui","pk.id":36490}, value = {"database":"spero_apppush_service","table":"getui","type":"delete","ts":1553679302,"xid":313982176,"commit":true,"data":{"id":36490,"createdAt":"2019-03-27 09:20:10","updatedAt":"2019-03-27 09:20:10","userId":"spero_3029","cid":"f3d3fe74e1a89788a8b8dd12fc99a142"}})
    val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    if (!rdd.isEmpty()){
      rdd.foreachPartition(p => {
        val o: OffsetRange = offsetRanges(TaskContext.get.partitionId())
        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
//        val mapper = new ObjectMapper() with ScalaObjectMapper
//        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
//        mapper.registerModule(DefaultScalaModule)
        p.foreach(row => {
          println("FUCK: " + row)
//          val keyMapper: Map[String, Object] = mapper.readValue(row.key(), classOf[Map[String, Object]])
//          val valueMapper: Map[String, Object] = mapper.readValue(row.value(), classOf[Map[String, Object]])
        })
      })
    } else {
      // RDD is empty!
    }
    kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  })

  ssc.start()
  ssc.awaitTermination()
}

results matching ""

    No results matching ""