Home> Java> javaTutorial> body text

How to run spark application in Java

黄舟
Release: 2017-09-30 10:14:05
Original
1377 people have browsed it

这篇文章主要介绍了详解Java编写并运行spark应用程序的方法,内容详细,结合了作者实际工作中的问题进行具体分析,具有一定参考价值。

我们首先提出这样一个简单的需求:

现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况。这里我拿我网站的日志记录行示例,如下所示:


121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" 121.205.198.92 - - [21/Feb/2014:00:00:11 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0" 121.205.198.92 - - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html/ HTTP/1.1" 301 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" 121.205.198.92 - - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" 121.205.241.229 - - [21/Feb/2014:00:00:13 +0800] "GET /archives/526.html HTTP/1.1" 200 12080 "http://shiyanjun.cn/archives/526.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" 121.205.241.229 - - [21/Feb/2014:00:00:15 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/526.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
Copy after login

Java实现Spark应用程序(Application)

我们实现的统计分析程序,有如下几个功能点:

从HDFS读取日志数据文件

将每行的第一个字段(IP地址)抽取出来

统计每个IP地址出现的次数

根据每个IP地址出现的次数进行一个降序排序

根据IP地址,调用GeoIP库获取IP所属国家

打印输出结果,每行的格式:[国家代码] IP地址 频率

下面,看我们使用Java实现的统计分析应用程序代码,如下所示:


package org.shirdrn.spark.job; import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.shirdrn.spark.job.maxmind.Country; import org.shirdrn.spark.job.maxmind.LookupService; import scala.Serializable; import scala.Tuple2; public class IPAddressStats implements Serializable { private static final long serialVersionUID = 8533489548835413763L; private static final Log LOG = LogFactory.getLog(IPAddressStats.class); private static final Pattern SPACE = Pattern.compile(" "); private transient LookupService lookupService; private transient final String geoIPFile; public IPAddressStats(String geoIPFile) { this.geoIPFile = geoIPFile; try { // lookupService: get country code from a IP address File file = new File(this.geoIPFile); LOG.info("GeoIP file: " + file.getAbsolutePath()); lookupService = new AdvancedLookupService(file, LookupService.GEOIP_MEMORY_CACHE); } catch (IOException e) { throw new RuntimeException(e); } } @SuppressWarnings("serial") public void stat(String[] args) { JavaSparkContext ctx = new JavaSparkContext(args[0], "IPAddressStats", System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(IPAddressStats.class)); JavaRDD lines = ctx.textFile(args[1], 1); // splits and extracts ip address filed JavaRDD words = lines.flatMap(new FlatMapFunction() { @Override public Iterable call(String s) { // 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0" // ip address return Arrays.asList(SPACE.split(s)[0]); } }); // map JavaPairRDD ones = words.map(new PairFunction() { @Override public Tuple2 call(String s) { return new Tuple2(s, 1); } }); // reduce JavaPairRDD counts = ones.reduceByKey(new Function2() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); List> output = counts.collect(); // sort statistics result by value Collections.sort(output, new Comparator>() { @Override public int compare(Tuple2 t1, Tuple2 t2) { if(t1._2 < t2._2) { return 1; } else if(t1._2 > t2._2) { return -1; } return 0; } }); writeTo(args, output); } private void writeTo(String[] args, List> output) { for (Tuple2 tuple : output) { Country country = lookupService.getCountry((String) tuple._1); LOG.info("[" + country.getCode() + "] " + tuple._1 + "\t" + tuple._2); } } public static void main(String[] args) { // ./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat if (args.length < 3) { System.err.println("Usage: IPAddressStats   "); System.err.println(" Example: org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat"); System.exit(1); } String geoIPFile = args[2]; IPAddressStats stats = new IPAddressStats(geoIPFile); stats.stat(args); System.exit(0); } }
Copy after login

具体实现逻辑,可以参考代码中的注释。我们使用Maven管理构建Java程序,首先看一下我的pom配置中所依赖的软件包,如下所示:


  org.apache.spark spark-core_2.10 0.9.0-incubating   log4j log4j 1.2.16   dnsjava dnsjava 2.1.1   commons-net commons-net 3.1   org.apache.hadoop hadoop-client 1.2.1  
Copy after login

需要说明的是,当我们将程序在Spark集群上运行时,它要求我们的编写的Job能够进行序列化,如果某些字段不需要序列化或者无法序列化,可以直接使用transient修饰即可,如上面的属性lookupService没有实现序列化接口,使用transient使其不执行序列化,否则的话,可能会出现类似如下的错误:


14/03/10 22:34:06 INFO scheduler.DAGScheduler: Failed to run collect at IPAddressStats.java:76 Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.shirdrn.spark.job.IPAddressStats at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:741) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:740) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:740) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Copy after login

在Spark集群上运行Java程序

这里,我使用了Maven管理构建Java程序,实现上述代码以后,使用Maven的maven-assembly-plugin插件,配置内容如下所示:


 maven-assembly-plugin    org.shirdrn.spark.job.UserAgentStats    jar-with-dependencies   *.properties *.xml     make-assembly package  single    
Copy after login

将相关依赖库文件都打进程序包里面,最后拷贝JAR文件到Linux系统下(不一定非要在Spark集群的Master节点上),保证该节点上Spark的环境变量配置正确即可看。Spark软件发行包解压缩后,可以看到脚本bin/run-example,我们可以直接修改该脚本,将对应的路径指向我们实现的Java程序包(修改变量EXAMPLES_DIR以及我们的JAR文件存放位置相关的内容),使用该脚本就可以运行,脚本内容如下所示:


cygwin=false case "`uname`" in CYGWIN*) cygwin=true;; esac SCALA_VERSION=2.10 # Figure out where the Scala framework is installed FWDIR="$(cd `dirname $0`/..; pwd)" # Export this as SPARK_HOME export SPARK_HOME="$FWDIR" # Load environment variables from conf/spark-env.sh, if it exists if [ -e "$FWDIR/conf/spark-env.sh" ] ; then . $FWDIR/conf/spark-env.sh fi if [ -z "$1" ]; then echo "Usage: run-example  []" >&2 exit 1 fi # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack # to avoid the -sources and -doc packages that are built by publish-local. EXAMPLES_DIR="$FWDIR"/java-examples SPARK_EXAMPLES_JAR="" if [ -e "$EXAMPLES_DIR"/*.jar ]; then export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR"/*.jar` fi if [[ -z $SPARK_EXAMPLES_JAR ]]; then echo "Failed to find Spark examples assembly in $FWDIR/examples/target" >&2 echo "You need to build Spark with sbt/sbt assembly before running this program" >&2 exit 1 fi # Since the examples JAR ideally shouldn't include spark-core (that dependency should be # "provided"), also add our standard Spark classpath, built using compute-classpath.sh. CLASSPATH=`$FWDIR/bin/compute-classpath.sh` CLASSPATH="$SPARK_EXAMPLES_JAR:$CLASSPATH" if $cygwin; then CLASSPATH=`cygpath -wp $CLASSPATH` export SPARK_EXAMPLES_JAR=`cygpath -w $SPARK_EXAMPLES_JAR` fi # Find java binary if [ -n "${JAVA_HOME}" ]; then RUNNER="${JAVA_HOME}/bin/java" else if [ `command -v java` ]; then RUNNER="java" else echo "JAVA_HOME is not set" >&2 exit 1 fi fi # Set JAVA_OPTS to be able to load native libraries and to set heap size JAVA_OPTS="$SPARK_JAVA_OPTS" JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH" # Load extra JAVA_OPTS from conf/java-opts, if it exists if [ -e "$FWDIR/conf/java-opts" ] ; then JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`" fi export JAVA_OPTS if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then echo -n "Spark Command: " echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" echo "========================================" echo fi exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
Copy after login

在Spark上运行我们开发的Java程序,执行如下命令:


cd /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1 ./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
Copy after login

我实现的程序类org.shirdrn.spark.job.IPAddressStats运行需要3个参数:

Spark集群主节点URL:例如我的是spark://m1:7077

输入文件路径:业务相关的,我这里是从HDFS上读取文件hdfs://m1:9000/user/shirdrn/wwwlog20140222.log

GeoIP库文件:业务相关的,用来计算IP地址所属国家的外部文件

如果程序没有错误,能够正常运行,控制台输出程序运行日志,示例如下所示:


14/03/10 22:17:24 INFO job.IPAddressStats: GeoIP file: /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 14/03/10 22:17:25 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/03/10 22:17:25 INFO Remoting: Starting remoting 14/03/10 22:17:25 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@m1:57379] 14/03/10 22:17:25 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@m1:57379] 14/03/10 22:17:25 INFO spark.SparkEnv: Registering BlockManagerMaster 14/03/10 22:17:25 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140310221725-c1cb 14/03/10 22:17:25 INFO storage.MemoryStore: MemoryStore started with capacity 143.8 MB. 14/03/10 22:17:25 INFO network.ConnectionManager: Bound socket to port 45189 with id = ConnectionManagerId(m1,45189) 14/03/10 22:17:25 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/03/10 22:17:25 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager m1:45189 with 143.8 MB RAM 14/03/10 22:17:25 INFO storage.BlockManagerMaster: Registered BlockManager 14/03/10 22:17:25 INFO spark.HttpServer: Starting HTTP Server 14/03/10 22:17:25 INFO server.Server: jetty-7.x.y-SNAPSHOT 14/03/10 22:17:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:49186 14/03/10 22:17:25 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.95.3.56:49186 14/03/10 22:17:25 INFO spark.SparkEnv: Registering MapOutputTracker 14/03/10 22:17:25 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-56c3e30d-a01b-4752-83d1-af1609ab2370 14/03/10 22:17:25 INFO spark.HttpServer: Starting HTTP Server 14/03/10 22:17:25 INFO server.Server: jetty-7.x.y-SNAPSHOT 14/03/10 22:17:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:52073 14/03/10 22:17:26 INFO server.Server: jetty-7.x.y-SNAPSHOT 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null} 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null} 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null} 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null} 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null} 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null} 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null} 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null} 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null} 14/03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null} 14/03/10 22:17:26 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/03/10 22:17:26 INFO ui.SparkUI: Started Spark Web UI at http://m1:4040 14/03/10 22:17:26 INFO spark.SparkContext: Added JAR /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar at http://10.95.3.56:52073/jars/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1394515046396 14/03/10 22:17:26 INFO client.AppClient$ClientActor: Connecting to master spark://m1:7077... 14/03/10 22:17:26 INFO storage.MemoryStore: ensureFreeSpace(60341) called with curMem=0, maxMem=150837657 14/03/10 22:17:26 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 58.9 KB, free 143.8 MB) 14/03/10 22:17:26 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140310221726-0000 14/03/10 22:17:27 INFO client.AppClient$ClientActor: Executor added: app-20140310221726-0000/0 on worker-20140310221648-s1-52544 (s1:52544) with 1 cores 14/03/10 22:17:27 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140310221726-0000/0 on hostPort s1:52544 with 1 cores, 512.0 MB RAM 14/03/10 22:17:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/03/10 22:17:27 WARN snappy.LoadSnappy: Snappy native library not loaded 14/03/10 22:17:27 INFO client.AppClient$ClientActor: Executor updated: app-20140310221726-0000/0 is now RUNNING 14/03/10 22:17:27 INFO mapred.FileInputFormat: Total input paths to process : 1 14/03/10 22:17:27 INFO spark.SparkContext: Starting job: collect at IPAddressStats.java:77 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Registering RDD 4 (reduceByKey at IPAddressStats.java:70) 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Got job 0 (collect at IPAddressStats.java:77) with 1 output partitions (allowLocal=false) 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Final stage: Stage 0 (collect at IPAddressStats.java:77) 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1) 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1) 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[4] at reduceByKey at IPAddressStats.java:70), which has no missing parents 14/03/10 22:17:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[4] at reduceByKey at IPAddressStats.java:70) 14/03/10 22:17:27 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 14/03/10 22:17:28 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@s1:59233/user/Executor#-671170811] with ID 0 14/03/10 22:17:28 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 0 on executor 0: s1 (PROCESS_LOCAL) 14/03/10 22:17:28 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 2396 bytes in 5 ms 14/03/10 22:17:29 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager s1:47282 with 297.0 MB RAM 14/03/10 22:17:32 INFO scheduler.TaskSetManager: Finished TID 0 in 3376 ms on s1 (progress: 0/1) 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 0) 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Stage 1 (reduceByKey at IPAddressStats.java:70) finished in 4.420 s 14/03/10 22:17:32 INFO scheduler.DAGScheduler: looking for newly runnable stages 14/03/10 22:17:32 INFO scheduler.DAGScheduler: running: Set() 14/03/10 22:17:32 INFO scheduler.DAGScheduler: waiting: Set(Stage 0) 14/03/10 22:17:32 INFO scheduler.DAGScheduler: failed: Set() 14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 1.0 from pool 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List() 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[6] at reduceByKey at IPAddressStats.java:70), which is now runnable 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[6] at reduceByKey at IPAddressStats.java:70) 14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 14/03/10 22:17:32 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 1 on executor 0: s1 (PROCESS_LOCAL) 14/03/10 22:17:32 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 2255 bytes in 1 ms 14/03/10 22:17:32 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@s1:33534 14/03/10 22:17:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 120 bytes 14/03/10 22:17:32 INFO scheduler.TaskSetManager: Finished TID 1 in 282 ms on s1 (progress: 0/1) 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0) 14/03/10 22:17:32 INFO scheduler.DAGScheduler: Stage 0 (collect at IPAddressStats.java:77) finished in 0.314 s 14/03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from pool 14/03/10 22:17:32 INFO spark.SparkContext: Job finished: collect at IPAddressStats.java:77, took 4.870958309 s 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 58.246.49.218 312 14/03/10 22:17:32 INFO job.IPAddressStats: [KR] 1.234.83.77 300 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.11.16 212 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 110.85.72.254 207 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 27.150.229.134 185 14/03/10 22:17:32 INFO job.IPAddressStats: [HK] 180.178.52.181 181 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.37.210.212 180 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 222.77.226.83 176 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.11.205 169 14/03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.9.19 165 ...
Copy after login

我们也可以通过Web控制台来查看当前执行应用程序(Application)的状态信息,通过Master节点的8080端口(如:http://m1:8080/)就能看到集群的应用程序(Application)状态信息。

另外,需要说明的时候,如果在Unix环境下使用Eclipse使用Java开发Spark应用程序,也能够直接通过Eclipse连接Spark集群,并提交开发的应用程序,然后交给集群去处理。

总结

The above is the detailed content of How to run spark application in Java. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!