本篇博客介绍如何在spark程序中使用thrift接口反序列化对象。


代码如下,其中args(0)表示输入的hdfs路径,args(1)表示输出路径。

val hdfsPath = args(0)

sparkContext.sequenceFile[BytesWritable, BytesWritable](hdfsPath)
  .values
  .map{ case value =>
    try {
      val deserializer: TDeserializer = new TDeserializer(new TCompactProtocol.Factory)
      val someClass = new SomeClass
      deserializer.deserialize(someClass, value.copyBytes())
      someClass
    } catch { case e: Exception =>
      logError("Failed to deserialize", e);
      null
    }
  }
  .filter(_ != null)
  .saveAsTextFile(args(1))