全网整合营销服务商

电脑端+手机端+微信端=数据同步管理

免费咨询热线:400-708-3566

java 中Spark中将对象序列化存储到hdfs

java 中Spark中将对象序列化存储到hdfs

摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs.

废话不多说, 直接贴代码了. spark1.4 + hbase0.98

import org.apache.spark.storage.StorageLevel
import scala.collection.JavaConverters._
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.net.URI
import java.util.Date
import org.ansj.library.UserDefineLibrary
import org.ansj.splitWord.analysis.NlpAnalysis
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.PageFilter
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import com.feheadline.fespark.db.Neo4jManager
import com.feheadline.fespark.util.Env
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
import scala.math.log
import scala.io.Source

object Word2VecDemo {

 def convertScanToString(scan: Scan) = {
  val proto = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
 }

 def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setAppName("Word2Vec Demo")
  sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  sparkConf.set("spark.kryoserializer.buffer", "256m")
  sparkConf.set("spark.kryoserializer.buffer.max","2046m")
  sparkConf.set("spark.akka.frameSize", "500")
  sparkConf.set("spark.rpc.askTimeout", "30")
  

  val sc = new SparkContext(sparkConf)
  val hbaseConf = HBaseConfiguration.create()
  hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper")

  hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled")

  val scan = new Scan()
  val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)
  
  val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""")
  
  val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(
  "data".getBytes,
  "article".getBytes,
  CompareOp.EQUAL,
  comp
  )
  
  filterList.addFilter(articleFilter)
  filterList.addFilter(new PageFilter(100))
  
  scan.setFilter(filterList)
  scan.setCaching(50)
  scan.setCacheBlocks(false)
  hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))

  val crawledRDD = sc.newAPIHadoopRDD(
   hbaseConf,
   classOf[TableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result]
  )
 
  val articlesRDD = crawledRDD.filter{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     content != null
   }
  }

  val wordsInDoc = articlesRDD.map{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq
     else Seq("")
   }
  }
  
  val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)
  
  val word2vec = new Word2Vec()
  val model = word2vec.fit(fitleredWordsInDoc)
  
  //---------------------------------------重点看这里-------------------------------------------------------------
  //将上面的模型存储到hdfs
  val hadoopConf = sc.hadoopConfiguration
  hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")
  val fileSystem = FileSystem.get(hadoopConf)
  val path = new Path("/user/hadoop/data/mllib/word2vec-object")
  val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))
  oos.writeObject(model)
  oos.close
  
  //这里示例另外一个程序直接从hdfs读取序列化对象使用模型
  val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))
  val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  
  /*
  * //你还可以将序列化文件从hdfs放到本地, scala程序使用模型
  * import java.io._
  * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
  * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))
  * val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  * ois.close
  */
  //--------------------------------------------------------------------------------------------------------------
 }
}


感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!


# java  # 中Spark中将对象序列化存储到hdfs  # 在IntelliJ IDEA中创建和运行java/scala/spark程序的方法  # java-spark中各种常用算子的写法示例  # Java和scala实现 Spark RDD转换成DataFrame的两种方法小结  # 详解Java编写并运行spark应用程序的方法  # 如何使用Java调用Spark集群  # 序列化  # 尤其是  # 你还  # 希望能  # 这样一个  # 另外一个  # 谢谢大家  # 多说  # 中经  # CompareOp  # SingleColumnValueFilter  # PageFilter  # RegexStringComparator  # CompareFilter  # ProtobufUtil  # Bytes  # mapreduce  # TableInputFormat  # protobuf  # Path 


相关文章: 网站按钮制作软件,如何实现网页中按钮的自动点击?  在线ppt制作网站有哪些,请推荐几个好的课件下载的网站?  如何实现建站之星域名转发设置?  建站VPS选购需注意哪些关键参数?  广平建站公司哪家专业可靠?如何选择?  建站之星各版本价格是多少?  如何快速搭建个人网站并优化SEO?  建站之星CMS五站合一模板配置与SEO优化指南  网站制作怎么样才能赚钱,用自己的电脑做服务器架设网站有什么利弊,能赚钱吗?  南京网站制作费用,南京远驱官方网站?  建站中国官网:模板定制+SEO优化+建站流程一站式指南  网站建设设计制作营销公司南阳,如何策划设计和建设网站?  宝塔Windows建站如何避免显示默认IIS页面?  盘锦网站制作公司,盘锦大洼有多少5G网站?  C#怎么创建控制台应用 C# Console App项目创建方法  平台云上自主建站:模板化设计与智能工具打造高效网站  创业网站制作流程,创业网站可靠吗?  制作证书网站有哪些,全国城建培训中心证书查询官网?  建站之星如何取消后台验证码生成?  如何处理“XML格式不正确”错误 常见XML well-formed问题解决方法  如何正确选择百度移动适配建站域名?  如何在IIS中配置站点IP、端口及主机头?  建站与域名管理如何高效结合?  电商网站制作价格怎么算,网上拍卖流程以及规则?  建站之星安装需要哪些步骤及注意事项?  重庆网站制作公司哪家好,重庆中考招生办官方网站?  建站主机如何选?高性价比方案全解析  东莞市网站制作公司有哪些,东莞找工作用什么网站好?  天津个人网站制作公司,天津网约车驾驶员从业资格证官网?  最好的网站制作公司,网购哪个网站口碑最好,推荐几个?谢谢?  ,购物网站怎么盈利呢?  Swift中循环语句中的转移语句 break 和 continue  网站制作大概多少钱一个,做一个平台网站大概多少钱?  建站之星CMS建站配置指南:模板选择与SEO优化技巧  建站之星多图banner生成与模板自定义指南  建站之星五站合一营销型网站搭建攻略,流量入口全覆盖优化指南  测试制作网站有哪些,测试性取向的权威测试或者网站?  如何通过虚拟主机空间快速建站?  贸易公司网站制作流程,出口贸易网站设计怎么做?  大连企业网站制作公司,大连2025企业社保缴费网上缴费流程?  网站制作壁纸教程视频,电脑壁纸网站?  专业型网站制作公司有哪些,我设计专业的,谁给推荐几个设计师兼职类的网站?  如何制作网站标识牌,动态网站如何制作(教程)?  C++如何编写函数模板?(泛型编程入门)  教育培训网站制作流程,请问edu教育网站的域名怎么申请?  山东网站制作公司有哪些,山东大源集团官网?  建站之星备案是否影响网站上线时间?  ,在苏州找工作,上哪个网站比较好?  家庭服务器如何搭建个人网站?  如何获取上海专业网站定制建站电话? 

您的项目需求

*请认真填写需求信息,我们会在24小时内与您取得联系。