Heim > Datenbank > MySQL-Tutorial > 新版api mapreduce reduce结果写入mysql_MySQL

新版api mapreduce reduce结果写入mysql_MySQL

WBOY
Freigeben: 2016-06-01 13:12:30
Original
1703 Leute haben es durchsucht
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;/** * 将mapreduce的结果数据写入mysql中 *  * @author asheng */public class WriteDataToMysql {	/**	 * 重写DBWritable	 * 	 * @author asheng TblsWritable需要向mysql中写入数据	 */	public static class TblsWritable implements Writable, DBWritable {		String tbl_name;		String tbl_type;		public TblsWritable() {		}		public TblsWritable(String tbl_name, String tab_type) {			this.tbl_name = tbl_name;			this.tbl_type = tab_type;		}		@Override		public void write(PreparedStatement statement) throws SQLException {			statement.setString(1, this.tbl_name);			statement.setString(2, this.tbl_type);		}		@Override		public void readFields(ResultSet resultSet) throws SQLException {			this.tbl_name = resultSet.getString(1);			this.tbl_type = resultSet.getString(2);		}		@Override		public void write(DataOutput out) throws IOException {			out.writeUTF(this.tbl_name);			out.writeUTF(this.tbl_type);		}		@Override		public void readFields(DataInput in) throws IOException {			this.tbl_name = in.readUTF();			this.tbl_type = in.readUTF();		}		public String toString() {			return new String(this.tbl_name + " " + this.tbl_type);		}	}	public static class ConnMysqlMapper extends			Mapper<longwritable text intwritable>	// TblsRecord是自定义的类型,也就是上面重写的DBWritable类	{		enum Counter {			LINESKIP,		}		private final static IntWritable one = new IntWritable(1);		public void map(LongWritable key, Text value, Context context)				throws IOException, InterruptedException {			try {				String line = value.toString();				String[] strings = line.split("/t");				String initTime = strings[1];				String devType = strings[4];				if (initTime.length() == 19) {					SimpleDateFormat sdf = new SimpleDateFormat(							"yyyy-MM-dd HH:mm:ss");					Date date = sdf.parse(initTime);					context.write(new Text(initTime.substring(0, 10)),one);				} else {					// System.err.println(initTime);					context.getCounter(Counter.LINESKIP).increment(1);				}				// } catch (ArrayIndexOutOfBoundsException e) {			} catch (ArrayIndexOutOfBoundsException e) {				context.getCounter(Counter.LINESKIP).increment(1);				return;			} catch (ParseException e) {				context.getCounter(Counter.LINESKIP).increment(1);				return;			}		}	}	public static class ConnMysqlReducer extends			Reducer<text text tblswritable> {		public void reduce(Text key, Iterable<text> values, Context context)				throws IOException, InterruptedException {			int count = 0;			for (Iterator<text> itr = values.iterator(); itr.hasNext(); itr					.next()) {				count++;			}			context.write(					new TblsWritable(key.toString(), String.valueOf(count)),					null);		}	}	public static void main(String args[]) throws IOException,			InterruptedException, ClassNotFoundException {		Configuration conf = new Configuration();		DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",				"jdbc:mysql://127.0.0.1:3306/XINGXUNTONG", "hadoop", "123456");		Job job = new Job(conf, "test mysql connection");		job.setJarByClass(WriteDataToMysql.class);		job.setMapperClass(ConnMysqlMapper.class);		job.setReducerClass(ConnMysqlReducer.class);		job.setOutputKeyClass(Text.class);		job.setOutputValueClass(IntWritable.class);		job.setInputFormatClass(TextInputFormat.class);		job.setOutputFormatClass(DBOutputFormat.class);		FileInputFormat.addInputPath(job, new Path(args[0]));		DBOutputFormat.setOutput(job, "test", "initTime", "new_user_total");		System.exit(job.waitForCompletion(true) ? 0 : 1);	}}</text></text></text></longwritable>
Nach dem Login kopieren

之所以写入mysql是因为我们平时处理的Tb级log文件处理结果却很小,写入关系数据库使查询和使用非常便利


Quelle:php.cn
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage