java 中 自定义OutputFormat的实例详解

实例代码:
package com.ccse.hadoop.outputformat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
public class MySelfOutputFormatApp {
public final static String INPUT_PATH = "hdfs://chaoren1:9000/mapinput";
public final static String OUTPUT_PATH = "hdfs://chaoren1:9000/mapoutput";
public final static String OUTPUT_FILENAME = "/abc";
public static void main(String[] args) throws IOException, URISyntaxException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);
fileSystem.delete(new Path(OUTPUT_PATH), true);
Job job = new Job(conf, MySelfOutputFormatApp.class.getSimpleName());
job.setJarByClass(MySelfOutputFormatApp.class);
FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setOutputFormatClass(MyselfOutputFormat.class);
job.waitForCompletion(true);
}
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
private Text word = new Text();
private LongWritable writable = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
if (value != null) {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, writable);
}
}
}
}
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
long sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
context.write(key, new LongWritable(sum));
}
}
public static class MyselfOutputFormat extends OutputFormat<Text, LongWritable> {
private FSDataOutputStream outputStream = null;
@Override
public RecordWriter<Text, LongWritable> getRecordWriter(
TaskAttemptContext context) throws IOException,
InterruptedException {
try {
FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUTPUT_PATH), context.getConfiguration());
//指定文件的输出路径
final Path path = new Path(MySelfOutputFormatApp.OUTPUT_PATH
+ MySelfOutputFormatApp.OUTPUT_FILENAME);
this.outputStream = fileSystem.create(path, false);
} catch (URISyntaxException e) {
e.printStackTrace();
}
return new MySelfRecordWriter(outputStream);
}
@Override
public void checkOutputSpecs(JobContext context) throws IOException,
InterruptedException {
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException {
return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUTPUT_PATH), context);
}
}
public static class MySelfRecordWriter extends RecordWriter<Text, LongWritable> {
private FSDataOutputStream outputStream = null;
public MySelfRecordWriter(FSDataOutputStream outputStream) {
this.outputStream = outputStream;
}
@Override
public void write(Text key, LongWritable value) throws IOException,
InterruptedException {
this.outputStream.writeBytes(key.toString());
this.outputStream.writeBytes("\t");
this.outputStream.writeLong(value.get());
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
this.outputStream.close();
}
}
}
2.OutputFormat是用于处理各种输出目的地的。
2.1 OutputFormat需要写出去的键值对,是来自于Reducer类,是通过RecordWriter获得的。
2.2 RecordWriter中的write(...)方法只有k和v,写到哪里去哪?这要通过单独传入OutputStream来处理。write就是把k和v写入到OutputStream中的。
2.3 RecordWriter类位于OutputFormat中的。因此,我们自定义的OutputFromat必须继承OutputFormat类型。那么,流对象必须在getRecordWriter(...)方法中获得。
以上就是java 中自定义OutputFormat的实例,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
# java
# OutputFormat
# OutputFormat的实例详解
# OutputFormat自定义的实现
# java中FileOutputStream中文乱码问题解决办法
# java DataInputStream和DataOutputStream详解及实例代码
# java IO流 之 输出流 OutputString()的使用
# Java的DataInputStream和DataOutputStream数据输入输出流
# Java中的BufferedInputStream与BufferedOutputStream使用示例
# Java中FilterInputStream和FilterOutputStream的用法详解
# 自定义
# 如有
# 希望能
# 来自于
# 写到
# 哪里去
# 谢谢大家
# 键值
# 这要
# 疑问请
# MySelfOutputFormatApp
# public
# FileOutputCommitter
# final
# String
# static
# INPUT_PATH
# output
# OutputCommitter
# Reducer
相关文章:
沈阳制作网站公司排名,沈阳装饰协会官方网站?
如何零基础开发自助建站系统?完整教程解析
建站之星如何通过成品分离优化网站效率?
如何破解联通资金短缺导致的基站建设难题?
如何在腾讯云服务器快速搭建个人网站?
公司网站建设制作费用,想建设一个属于自己的企业网站,该如何去做?
,在苏州找工作,上哪个网站比较好?
建站之星CMS建站配置指南:模板选择与SEO优化技巧
Swift中switch语句区间和元组模式匹配
如何快速查询网址的建站时间与历史轨迹?
建站主机核心功能解析:服务器选择与网站搭建流程指南
购物网站制作费用多少,开办网上购物网站,需要办理哪些手续?
青浦网站制作公司有哪些,苹果官网发货地是哪里?
如何基于云服务器快速搭建网站及云盘系统?
如何快速重置建站主机并恢复默认配置?
深圳网站制作费用多少钱,读秀,深圳文献港这样的网站很多只提供网上试读,但有些人只要提供试读的文章就能全篇下载,这个是怎么弄的?
成都网站制作公司哪家好,四川省职工服务网是做什么用?
移动端手机网站制作软件,掌上时代,移动端网站的谷歌SEO该如何做?
如何快速搭建FTP站点实现文件共享?
制作销售网站教学视频,销售网站有哪些?
网站制作专业公司有哪些,如何制作一个企业网站,建设网站的基本步骤有哪些?
如何通过NAT技术实现内网高效建站?
建站之星体验版:智能建站系统+响应式设计,多端适配快速建站
公司网站的制作公司,企业网站制作基本流程有哪些?
建站之星ASP如何实现CMS高效搭建与安全管理?
如何快速生成凡客建站的专业级图册?
专业网站制作服务公司,有哪些网站可以免费发布招聘信息?
为什么Go需要go mod文件_Go go mod文件作用说明
公司网站设计制作厂家,怎么创建自己的一个网站?
网站制作软件有哪些,制图软件有哪些?
网站制作报价单模板图片,小松挖机官方网站报价?
在线流程图制作网站手机版,谁能推荐几个好的CG原画资源网站么?
网站建设设计制作营销公司南阳,如何策划设计和建设网站?
建站ABC备案流程中有哪些关键注意事项?
如何在Golang中处理模块冲突_解决依赖版本不兼容问题
建站之星如何优化SEO以实现高效排名?
建设网站制作价格,怎样建立自己的公司网站?
简单实现Android验证码
如何安全更换建站之星模板并保留数据?
b2c电商网站制作流程,b2c水平综合的电商平台?
香港服务器WordPress建站指南:SEO优化与高效部署策略
建站之星×万网:智能建站系统+自助建站平台一键生成
如何在万网自助建站中设置域名及备案?
ui设计制作网站有哪些,手机UI设计网址吗?
美食网站链接制作教程视频,哪个教做美食的网站比较专业点?
如何生成腾讯云建站专用兑换码?
北京网页设计制作网站有哪些,继续教育自动播放怎么设置?
广州顶尖建站服务:企业官网建设与SEO优化一体化方案
图片制作网站免费软件,有没有免费的网站或软件可以将图片批量转为A4大小的pdf?
香港服务器选型指南:免备案配置与高效建站方案解析
*请认真填写需求信息,我们会在24小时内与您取得联系。