本篇博客介绍如何在spark程序中使用thrift接口反序列化对象。
代码如下,其中args(0)表示输入的hdfs路径,args(1)表示输出路径。
val hdfsPath = args(0) sparkContext.sequenceFile[BytesWritable, BytesWritable](hdfsPath) .values .map{ case value => try { val deserializer: TDeserializer = new TDeserializer(new TCompactProtocol.Factory) val someClass = new SomeClass deserializer.deserialize(someClass, value.copyBytes()) someClass } catch { case e: Exception => logError("Failed to deserialize", e); null } } .filter(_ != null) .saveAsTextFile(args(1))