全网整合营销服务商

电脑端+手机端+微信端=数据同步管理

免费咨询热线:400-708-3566

java 中自定义OutputFormat的实例详解

java 中 自定义OutputFormat的实例详解

实例代码:

package com.ccse.hadoop.outputformat; 
 
import java.io.IOException; 
import java.net.URI; 
import java.net.URISyntaxException; 
import java.util.StringTokenizer; 
 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.OutputCommitter; 
import org.apache.hadoop.mapreduce.OutputFormat; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; 
 
 
public class MySelfOutputFormatApp { 
   
  public final static String INPUT_PATH = "hdfs://chaoren1:9000/mapinput"; 
  public final static String OUTPUT_PATH = "hdfs://chaoren1:9000/mapoutput"; 
  public final static String OUTPUT_FILENAME = "/abc"; 
   
  public static void main(String[] args) throws IOException, URISyntaxException,  
    ClassNotFoundException, InterruptedException { 
    Configuration conf = new Configuration(); 
    FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf); 
    fileSystem.delete(new Path(OUTPUT_PATH), true); 
     
    Job job = new Job(conf, MySelfOutputFormatApp.class.getSimpleName()); 
    job.setJarByClass(MySelfOutputFormatApp.class); 
     
    FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); 
    job.setMapperClass(MyMapper.class); 
    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(LongWritable.class); 
     
    job.setReducerClass(MyReducer.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(LongWritable.class); 
    job.setOutputFormatClass(MyselfOutputFormat.class); 
     
    job.waitForCompletion(true); 
  } 
   
  public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { 
 
    private Text word = new Text(); 
    private LongWritable writable = new LongWritable(1); 
     
    @Override 
    protected void map(LongWritable key, Text value, 
        Mapper<LongWritable, Text, Text, LongWritable>.Context context) 
        throws IOException, InterruptedException { 
      if (value != null) { 
        String line = value.toString(); 
        StringTokenizer tokenizer = new StringTokenizer(line); 
        while (tokenizer.hasMoreTokens()) { 
          word.set(tokenizer.nextToken()); 
          context.write(word, writable); 
        } 
      } 
    } 
     
  } 
   
  public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { 
 
    @Override 
    protected void reduce(Text key, Iterable<LongWritable> values, 
        Reducer<Text, LongWritable, Text, LongWritable>.Context context) 
        throws IOException, InterruptedException { 
      long sum = 0;  
      for (LongWritable value : values) { 
        sum += value.get(); 
      } 
      context.write(key, new LongWritable(sum)); 
    } 
  } 
 
  public static class MyselfOutputFormat extends OutputFormat<Text, LongWritable> { 
 
    private FSDataOutputStream outputStream = null; 
     
    @Override 
    public RecordWriter<Text, LongWritable> getRecordWriter( 
        TaskAttemptContext context) throws IOException, 
        InterruptedException { 
      try { 
        FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUTPUT_PATH), context.getConfiguration()); 
        //指定文件的输出路径 
        final Path path = new Path(MySelfOutputFormatApp.OUTPUT_PATH  
                     + MySelfOutputFormatApp.OUTPUT_FILENAME); 
        this.outputStream = fileSystem.create(path, false); 
      } catch (URISyntaxException e) { 
        e.printStackTrace(); 
      } 
      return new MySelfRecordWriter(outputStream); 
    } 
 
    @Override 
    public void checkOutputSpecs(JobContext context) throws IOException, 
        InterruptedException { 
    } 
 
    @Override 
    public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
        throws IOException, InterruptedException { 
      return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUTPUT_PATH), context); 
    } 
     
  } 
   
  public static class MySelfRecordWriter extends RecordWriter<Text, LongWritable> { 
 
    private FSDataOutputStream outputStream = null; 
     
    public MySelfRecordWriter(FSDataOutputStream outputStream) { 
      this.outputStream = outputStream; 
    } 
     
    @Override 
    public void write(Text key, LongWritable value) throws IOException, 
        InterruptedException { 
      this.outputStream.writeBytes(key.toString()); 
      this.outputStream.writeBytes("\t"); 
      this.outputStream.writeLong(value.get()); 
    } 
 
    @Override 
    public void close(TaskAttemptContext context) throws IOException, 
        InterruptedException { 
      this.outputStream.close(); 
    } 
     
  } 
   
} 

 2.OutputFormat是用于处理各种输出目的地的。

2.1 OutputFormat需要写出去的键值对,是来自于Reducer类,是通过RecordWriter获得的。

2.2 RecordWriter中的write(...)方法只有k和v,写到哪里去哪?这要通过单独传入OutputStream来处理。write就是把k和v写入到OutputStream中的。

2.3 RecordWriter类位于OutputFormat中的。因此,我们自定义的OutputFromat必须继承OutputFormat类型。那么,流对象必须在getRecordWriter(...)方法中获得。

以上就是java 中自定义OutputFormat的实例,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!


# java  # OutputFormat  # OutputFormat的实例详解  # OutputFormat自定义的实现  # java中FileOutputStream中文乱码问题解决办法  # java DataInputStream和DataOutputStream详解及实例代码  # java IO流 之 输出流 OutputString()的使用  # Java的DataInputStream和DataOutputStream数据输入输出流  # Java中的BufferedInputStream与BufferedOutputStream使用示例  # Java中FilterInputStream和FilterOutputStream的用法详解  # 自定义  # 如有  # 希望能  # 来自于  # 写到  # 哪里去  # 谢谢大家  # 键值  # 这要  # 疑问请  # MySelfOutputFormatApp  # public  # FileOutputCommitter  # final  # String  # static  # INPUT_PATH  # output  # OutputCommitter  # Reducer 


相关文章: 沈阳制作网站公司排名,沈阳装饰协会官方网站?  如何零基础开发自助建站系统?完整教程解析  建站之星如何通过成品分离优化网站效率?  如何破解联通资金短缺导致的基站建设难题?  如何在腾讯云服务器快速搭建个人网站?  公司网站建设制作费用,想建设一个属于自己的企业网站,该如何去做?  ,在苏州找工作,上哪个网站比较好?  建站之星CMS建站配置指南:模板选择与SEO优化技巧  Swift中switch语句区间和元组模式匹配  如何快速查询网址的建站时间与历史轨迹?  建站主机核心功能解析:服务器选择与网站搭建流程指南  购物网站制作费用多少,开办网上购物网站,需要办理哪些手续?  青浦网站制作公司有哪些,苹果官网发货地是哪里?  如何基于云服务器快速搭建网站及云盘系统?  如何快速重置建站主机并恢复默认配置?  深圳网站制作费用多少钱,读秀,深圳文献港这样的网站很多只提供网上试读,但有些人只要提供试读的文章就能全篇下载,这个是怎么弄的?  成都网站制作公司哪家好,四川省职工服务网是做什么用?  移动端手机网站制作软件,掌上时代,移动端网站的谷歌SEO该如何做?  如何快速搭建FTP站点实现文件共享?  制作销售网站教学视频,销售网站有哪些?  网站制作专业公司有哪些,如何制作一个企业网站,建设网站的基本步骤有哪些?  如何通过NAT技术实现内网高效建站?  建站之星体验版:智能建站系统+响应式设计,多端适配快速建站  公司网站的制作公司,企业网站制作基本流程有哪些?  建站之星ASP如何实现CMS高效搭建与安全管理?  如何快速生成凡客建站的专业级图册?  专业网站制作服务公司,有哪些网站可以免费发布招聘信息?  为什么Go需要go mod文件_Go go mod文件作用说明  公司网站设计制作厂家,怎么创建自己的一个网站?  网站制作软件有哪些,制图软件有哪些?  网站制作报价单模板图片,小松挖机官方网站报价?  在线流程图制作网站手机版,谁能推荐几个好的CG原画资源网站么?  网站建设设计制作营销公司南阳,如何策划设计和建设网站?  建站ABC备案流程中有哪些关键注意事项?  如何在Golang中处理模块冲突_解决依赖版本不兼容问题  建站之星如何优化SEO以实现高效排名?  建设网站制作价格,怎样建立自己的公司网站?  简单实现Android验证码  如何安全更换建站之星模板并保留数据?  b2c电商网站制作流程,b2c水平综合的电商平台?  香港服务器WordPress建站指南:SEO优化与高效部署策略  建站之星×万网:智能建站系统+自助建站平台一键生成  如何在万网自助建站中设置域名及备案?  ui设计制作网站有哪些,手机UI设计网址吗?  美食网站链接制作教程视频,哪个教做美食的网站比较专业点?  如何生成腾讯云建站专用兑换码?  北京网页设计制作网站有哪些,继续教育自动播放怎么设置?  广州顶尖建站服务:企业官网建设与SEO优化一体化方案  图片制作网站免费软件,有没有免费的网站或软件可以将图片批量转为A4大小的pdf?  香港服务器选型指南:免备案配置与高效建站方案解析 

您的项目需求

*请认真填写需求信息,我们会在24小时内与您取得联系。