import java.io._
import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.IOUtils;

object HdfsFileOperate {
  def Test(args: Array[String]) {
    appendHdfs("hdfs://somepath", "test")
  }
  /**
   * HDFS文件追加
   * @param hdfs_path
   * @param context
   */
  def appendHdfs(hdfs_path:String, context: String): Unit = {
    val conf:Configuration = new Configuration();
    conf.setBoolean("dfs.support.append", true);
    //var inpath = "/home/wyp/append.txt";
    var fs:FileSystem = null;
    try {
      fs = FileSystem.get(URI.create(hdfs_path), conf);
      if(!fs.exists(new Path(hdfs_path))) {
        createFile(hdfs_path)
      }
      fs = FileSystem.get(URI.create(hdfs_path), conf);
      var in = new ByteArrayInputStream(context.getBytes());
      var out = fs.append(new Path(hdfs_path));
      IOUtils.copyBytes(in, out, 4096, true);
    } catch {
      case e: Exception => println("HDFS文件追加:" + e)
    } finally {
      try{
        fs.close()
      }catch{
        case e: Exception => println("HDFS文件追加[关闭FileSystem]:" + e)
      }
    }
  }
  def createFile(hdfs_file: String): Unit ={
    val conf_f:Configuration = new Configuration();
    var fs_f:FileSystem = null;
    try {
      fs_f = FileSystem.get(URI.create(hdfs_file), conf_f);
      if(!fs_f.exists(new Path(hdfs_file))){
        fs_f.create(new Path(hdfs_file))
      }
    } catch {
      case e: Exception => println("创建HDFS文件:" + e)
    } finally {
      try{
        fs_f.close()
      }catch {
        case e: Exception => println("创建HDFS文件[关闭FileSystem]:" + e)
      }
    }
  }
  def createPath(hdfs_path: String): Unit ={
    val conf_p:Configuration = new Configuration();
    var fs_p:FileSystem = null;
    try {
      fs_p = FileSystem.get(URI.create(hdfs_path), conf_p);
      if(!fs_p.exists(new Path(hdfs_path))){
        println("create hdfs path .......")
        fs_p.mkdirs(new Path(hdfs_path));
      }
    } catch {
      case e: Exception => println("创建HDFS目录:" + e)
    } finally {
      try{
        fs_p.close()
      }catch {
        case e: Exception => println("创建HDFS目录[关闭FileSystem]:" + e)
      }
    }
  }
  def exists(path:String): Boolean = {
    var fs_e:FileSystem = null;
    val conf_e:Configuration = new Configuration();
    var isExsits = false
    try {
      fs_e = FileSystem.get(URI.create(path), conf_e);
      if(fs_e.exists(new Path(path))) {
        isExsits = true
      }
    } catch {
      case e: Exception => println("判断HDFS上【文件】或【文件夹】是否存在:" + e)
    } finally {
      try{
        fs_e.close()
      }catch {
        case e: Exception => println("判断HDFS上【文件】或【文件夹】是否存在[关闭FileSystem]:" + e)
      }
    }
    isExsits
  }
}