• 技术文章 >Java >java教程

    详解Spark中将对象序列化存储到hdfs

    零下一度零下一度2017-06-17 11:42:07原创1236
    这篇文章主要介绍了java 中Spark中将对象序列化存储到hdfs的相关资料,需要的朋友可以参考下

    java 中Spark中将对象序列化存储到hdfs

    摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs.

    废话不多说, 直接贴代码了. spark1.4 + hbase0.98


    import org.apache.spark.storage.StorageLevel
    import scala.collection.JavaConverters._
    import java.io.File
    import java.io.FileInputStream
    import java.io.FileOutputStream
    import java.io.ObjectInputStream
    import java.io.ObjectOutputStream
    import java.net.URI
    import java.util.Date
    import org.ansj.library.UserDefineLibrary
    import org.ansj.splitWord.analysis.NlpAnalysis
    import org.ansj.splitWord.analysis.ToAnalysis
    import org.apache.hadoop.fs.FSDataInputStream
    import org.apache.hadoop.fs.FSDataOutputStream
    import org.apache.hadoop.fs.FileSystem
    import org.apache.hadoop.fs.FileUtil
    import org.apache.hadoop.fs.Path
    import org.apache.hadoop.hbase.client._
    import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
    import org.apache.hadoop.hbase.filter.FilterList
    import org.apache.hadoop.hbase.filter.PageFilter
    import org.apache.hadoop.hbase.filter.RegexStringComparator
    import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
    import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil
    import org.apache.hadoop.hbase.util.{Base64, Bytes}
    import com.feheadline.fespark.db.Neo4jManager
    import com.feheadline.fespark.util.Env
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd._
    import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
    import scala.math.log
    import scala.io.Source
    
    object Word2VecDemo {
    
     def convertScanToString(scan: Scan) = {
      val proto = ProtobufUtil.toScan(scan)
      Base64.encodeBytes(proto.toByteArray)
     }
    
     def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setAppName("Word2Vec Demo")
      sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      sparkConf.set("spark.kryoserializer.buffer", "256m")
      sparkConf.set("spark.kryoserializer.buffer.max","2046m")
      sparkConf.set("spark.akka.frameSize", "500")
      sparkConf.set("spark.rpc.askTimeout", "30")
      
    
      val sc = new SparkContext(sparkConf)
      val hbaseConf = HBaseConfiguration.create()
      hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper")
    
      hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled")
    
      val scan = new Scan()
      val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)
      
      val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""")
      
      val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(
      "data".getBytes,
      "article".getBytes,
      CompareOp.EQUAL,
      comp
      )
      
      filterList.addFilter(articleFilter)
      filterList.addFilter(new PageFilter(100))
      
      scan.setFilter(filterList)
      scan.setCaching(50)
      scan.setCacheBlocks(false)
      hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))
    
      val crawledRDD = sc.newAPIHadoopRDD(
       hbaseConf,
       classOf[TableInputFormat],
       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
       classOf[org.apache.hadoop.hbase.client.Result]
      )
     
      val articlesRDD = crawledRDD.filter{
       case (_,result) => {
         val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
         content != null
       }
      }
    
      val wordsInDoc = articlesRDD.map{
       case (_,result) => {
         val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
         if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq
         else Seq("")
       }
      }
      
      val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)
      
      val word2vec = new Word2Vec()
      val model = word2vec.fit(fitleredWordsInDoc)
      
      //---------------------------------------重点看这里-------------------------------------------------------------
      //将上面的模型存储到hdfs
      val hadoopConf = sc.hadoopConfiguration
      hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")
      val fileSystem = FileSystem.get(hadoopConf)
      val path = new Path("/user/hadoop/data/mllib/word2vec-object")
      val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))
      oos.writeObject(model)
      oos.close
      
      //这里示例另外一个程序直接从hdfs读取序列化对象使用模型
      val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))
      val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
      
      /*
      * //你还可以将序列化文件从hdfs放到本地, scala程序使用模型
      * import java.io._
      * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
      * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))
      * val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
      * ois.close
      */
      //--------------------------------------------------------------------------------------------------------------
     }
    }

    以上就是详解Spark中将对象序列化存储到hdfs的详细内容,更多请关注php中文网其它相关文章!

    声明:本文原创发布php中文网,转载请注明出处,感谢您的尊重!如有疑问,请联系admin@php.cn处理
    上一篇:详细介绍Spring boot 添加jsp支持配置的实例 下一篇:登陆时验证码结合springboot的用法实例介绍
    大前端线上培训班

    相关文章推荐

    • 理解java8中java.util.function.*pojo反射新方法(附代码)• 浅析安卓app和微信授权登录及分享完整对接(代码分享)• 教你一招搞定时序数据库在Spring Boot中的使用• 一招教你使用java快速创建Map(代码分享)• PlayFramework 完整实现一个APP(十一)

    全部评论我要评论

  • majic

    fileSystem.create(path) 返回的就是 FSDataOutputStream ,new FSDataOutputStream(fileSystem.create(path))会报错,val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))改成val oos = new Ob

    2021-05-11

  • majic

    改成val oos = new ObjectOutputStream(new fileSystem.create(path))

    2021-05-11

  • 取消发布评论发送
  • 1/1

    PHP中文网