mongodb - spark叢集中每個節點都有一個獨立資料庫,可以實現分散式統計計算嗎?
天蓬老师
天蓬老师 2017-04-28 09:04:12
0
1
544

我將spark搭建在兩台機器上,其中一台即是master又是slave,另一台是slave,兩台機器上均裝有獨立的mongodb資料庫。我的主程式讓它們統計自身資料庫的內容,然後將結果匯總到一台伺服器上的資料庫。目前程式碼是在master節點上提交的。但是我spark-submit之後,好像只統計master節點上的mongodb裡的資料了,另一個worker節點沒有統計。請問這是什麼原因?程式碼如下:

val conf = new SparkConf().setAppName("Scala Word Count")
val sc = new SparkContext(conf)

val config = new Configuration()

//以下代码表示只统计本机数据库上的数据,猜测问题可能出在这里
config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/local.test")
//统计结果输出到服务器上
config.set("mongo.output.uri", "mongodb://103.25.23.80:60013/test_hao.result")

val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])

// Input contains tuples of (ObjectId, BSONObject)
val countsRDD = mongoRDD.flatMap(arg => {
  var str = arg._2.get("type").toString
  str = str.toLowerCase().replaceAll("[.,!?\n]", " ")
  str.split(" ")
})
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)

// Output contains tuples of (null, BSONObject) - ObjectId will be generated by Mongo driver if null
val saveRDD = countsRDD.map((tuple) => {
  var bson = new BasicBSONObject()
  bson.put("word", tuple._1)
  bson.put("count", tuple._2.toString() )
  (null, bson)
})

// Only MongoOutputFormat and config are relevant
saveRDD.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)
天蓬老师
天蓬老师

欢迎选择我的课程,让我们一起见证您的进步~~

全部回覆(1)
Ty80

自问自答。原因可能是这样:

val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])

这行代码表示这是由driver读取数据库,然后将符合条件的数据载入RDD,由于之前设置了是将127.0.0.1作为输入,也就是从driver的mongodb上读取数据。由于driver就在master上,所以读取的数据也自然就是master上的数据了。

熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板
關於我們 免責聲明 Sitemap
PHP中文網:公益線上PHP培訓,幫助PHP學習者快速成長!