因为要把spark从es读出来的json数据转换为对象,开始想用case class定义类型,通过fastjson做转换。如下
case class Book (author: String, content: String, id: String, time: Long, title: String)
val json = "{\"author\":\"hll\",\"content\":\"ES即etamsports\",\"id\":\"693\",\"time\":1490165237200,\"title\":\"百度百科\"}"
val mapper: ObjectMapper = new ObjectMapper()
val book: Book = mapper.readValue(json, classOf[Book])
结果抛出了异常:com.fasterxml.jackson.databind.JsonMappingException: No suitable constructor found for type [simple type, class JsonTest$Book]
换成fastjson也会有相似的异常。
恍然大悟,case class没有空参构造函数,跟fastjson这些库不太兼容。
//隐式转换必须要导入
import org.json4s._
import org.json4s.jackson.JsonMethods._
class Book(val author: String,val content: String,val id: String, val time: Long, val title: String)
object JsonTest {
def main(args: Array[String]) {
val json = "{\"author\":\"hll\",\"content\":\"ES即etamsports\",\"id\":\"693\",\"time\":1490165237200,\"title\":\"百度百科\"}"
//导入隐式值
implicit val formats = DefaultFormats
val book: Book = parse(json).extract[Book]
println(book.content)
}
}
spark程序中的应用:
implicit val formats = DefaultFormats
esRDD.map(_._2).map(parse(_).extract[Book]).sortBy(_.time, false).take(10).foreach(println)
spark里面解析json数据有一个经典的问题,ObjectMapper对象的创建很重。一般使用mapPartition来对一个分区复用ObjectMapper对象。
我们来看一下parse方法的源码:
private[this] lazy val _defaultMapper = {
val m = new ObjectMapper()
m.registerModule(new Json4sScalaModule)
m
}
def mapper = _defaultMapper
def parse(in: JsonInput, useBigDecimalForDouble: Boolean = false): JValue = {
mapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, useBigDecimalForDouble)
in match {
case StringInput(s) => mapper.readValue(s, classOf[JValue])
case ReaderInput(rdr) => mapper.readValue(rdr, classOf[JValue])
case StreamInput(stream) => mapper.readValue(stream, classOf[JValue])
case FileInput(file) => mapper.readValue(file, classOf[JValue])
}
}
实际使用的ObjectMapper对象是lazy初始化的而且是复用的,避免了ObjectMapper对象的重复创建,很nice。