Home > Database > Mysql Tutorial > body text

新版api mapreduce reduce结果写入mysql_MySQL

WBOY
Release: 2016-06-01 13:12:30
Original
1577 people have browsed it
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;/** * 将mapreduce的结果数据写入mysql中 *  * @author asheng */public class WriteDataToMysql {	/**	 * 重写DBWritable	 * 	 * @author asheng TblsWritable需要向mysql中写入数据	 */	public static class TblsWritable implements Writable, DBWritable {		String tbl_name;		String tbl_type;		public TblsWritable() {		}		public TblsWritable(String tbl_name, String tab_type) {			this.tbl_name = tbl_name;			this.tbl_type = tab_type;		}		@Override		public void write(PreparedStatement statement) throws SQLException {			statement.setString(1, this.tbl_name);			statement.setString(2, this.tbl_type);		}		@Override		public void readFields(ResultSet resultSet) throws SQLException {			this.tbl_name = resultSet.getString(1);			this.tbl_type = resultSet.getString(2);		}		@Override		public void write(DataOutput out) throws IOException {			out.writeUTF(this.tbl_name);			out.writeUTF(this.tbl_type);		}		@Override		public void readFields(DataInput in) throws IOException {			this.tbl_name = in.readUTF();			this.tbl_type = in.readUTF();		}		public String toString() {			return new String(this.tbl_name + " " + this.tbl_type);		}	}	public static class ConnMysqlMapper extends			Mapper	// TblsRecord是自定义的类型,也就是上面重写的DBWritable类	{		enum Counter {			LINESKIP,		}		private final static IntWritable one = new IntWritable(1);		public void map(LongWritable key, Text value, Context context)				throws IOException, InterruptedException {			try {				String line = value.toString();				String[] strings = line.split("/t");				String initTime = strings[1];				String devType = strings[4];				if (initTime.length() == 19) {					SimpleDateFormat sdf = new SimpleDateFormat(							"yyyy-MM-dd HH:mm:ss");					Date date = sdf.parse(initTime);					context.write(new Text(initTime.substring(0, 10)),one);				} else {					// System.err.println(initTime);					context.getCounter(Counter.LINESKIP).increment(1);				}				// } catch (ArrayIndexOutOfBoundsException e) {			} catch (ArrayIndexOutOfBoundsException e) {				context.getCounter(Counter.LINESKIP).increment(1);				return;			} catch (ParseException e) {				context.getCounter(Counter.LINESKIP).increment(1);				return;			}		}	}	public static class ConnMysqlReducer extends			Reducer {		public void reduce(Text key, Iterable values, Context context)				throws IOException, InterruptedException {			int count = 0;			for (Iterator itr = values.iterator(); itr.hasNext(); itr					.next()) {				count++;			}			context.write(					new TblsWritable(key.toString(), String.valueOf(count)),					null);		}	}	public static void main(String args[]) throws IOException,			InterruptedException, ClassNotFoundException {		Configuration conf = new Configuration();		DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",				"jdbc:mysql://127.0.0.1:3306/XINGXUNTONG", "hadoop", "123456");		Job job = new Job(conf, "test mysql connection");		job.setJarByClass(WriteDataToMysql.class);		job.setMapperClass(ConnMysqlMapper.class);		job.setReducerClass(ConnMysqlReducer.class);		job.setOutputKeyClass(Text.class);		job.setOutputValueClass(IntWritable.class);		job.setInputFormatClass(TextInputFormat.class);		job.setOutputFormatClass(DBOutputFormat.class);		FileInputFormat.addInputPath(job, new Path(args[0]));		DBOutputFormat.setOutput(job, "test", "initTime", "new_user_total");		System.exit(job.waitForCompletion(true) ? 0 : 1);	}}
Copy after login

之所以写入mysql是因为我们平时处理的Tb级log文件处理结果却很小,写入关系数据库使查询和使用非常便利


source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact [email protected]
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!