在上一篇博文中,我们讲了如何编写、运行、测试一个MR,但是hdfs上的文件是手动执行命令从本地linux上传至hdfs的。在真实的运行环境中,我们不可能每次手动执行命令上传的,这样太过繁琐。那么,我们可以使用hdfs提供的java api实现文件上传至hdfs,或者直接从ftp上传至hdfs。
然而,需要说明一点,在上一篇博文中,笔者是要运行MR,都需要每次手动执行yarn jar,在实际的环境中也不可能每次手动执行。像我们公司是使用了索答的调度平台/任务监控平台,可以定时的以工作流执行我们的程序,包括普通java程序和MR。其实,这个调度平台就是使用了quartz,记得笔者在之前的博文中也讲过大致的使用。当然,这个调度平台也提供其它的一些功能,比如web展示、日志查看等,所以也不是免费的。
首先,给大家简单介绍一下hdfs。hdfs是以流式数据访问模式来存储超大文件,hdfs的构建思路是一次写入,多次读取,这样才是最高效的访问模式。hdfs是为高数据吞吐量应用优化的,所以会以提高时间延迟为代价。对于低延时的访问需求,我们可以使用hbase,这个会在后面的博文中进行介绍。
然后,还要知道hdfs中块(block)的概念,默认为64MB。块是hdfs的数据读写的最小单位,通常每个map任务一次只处理一个block,像我们对集群性能评估就会使用到这个概念,比如目前有多少节点,每个节点的磁盘空间、cpu以及所要处理的数据量、网络带宽,通过这些信息来进行性能评估。我们可以使用hadoop fsck / -files -blocks列出文件系统中各个文件由哪些块构成。
然后,再就是要知道namenode和datanode,这个在之前的博文已经介绍过,下面看看cm环境中hdfs的管理者(namenode)和工作者(datanode),如下
这里写图片描述
在yarn环境中是可以有多个nameNode的。此环境中没有SecondaryNameNode,当然也可以有。
好了,关于hdfs的基本概念就讲到这儿了,下面来看看具体的代码。

一、java实现上传本地文件至hdfs

这里,可以直接使用hdfs提供的java api即可实现,代码如下:

package org.qiyongkang.hdfs.local;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;


/**
 * ClassName:UploadLocalFileToHdfs <br/>
 * Function: 本地文件上传至hdfs. <br/>
 * Date:     2016年3月28日 下午10:06:05 <br/>
 * @author   qiyongkang
 * @version  
 * @since    JDK 1.6
 * @see      
 */
public class UploadLocalFileToHdfs {

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        String localDir = "/home/qiyongkang";
        String hdfsDir = "/qiyongkang";
        try{
            Path localPath = new Path(localDir);
            Path hdfsPath = new Path(hdfsDir);
            FileSystem hdfs = FileSystem.get(conf);
            hdfs.copyFromLocalFile(localPath, hdfsPath);
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

注意,这里hdfs上传目录如果不存在的话,hdfs会自动创建,比较智能。
打完包后,上传至服务器,执行yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar,然后执行hadoop fs -ls /qiyongkang便可看到:
这里写图片描述

二、java实现上传ftp上的文件至hdfs

首先,我们得准备一个ftp服务器,关于ftp服务器的搭建,大家可以查阅资料,笔者就不赘述了。
其实,从ftp上拉取文件上传到hdfs上,这个过程大家不要想复杂了,我们讲本地文件上传到hdfs,其实就是采用流的方式。因此,我们可以直接读取ftp上的文件流,然后以流的方式写入到hdfs。
下面,直接贴出代码:

package org.qiyongkang.hdfs.ftp;

import java.io.InputStream;

import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

/**
 * ClassName:UploadFtpFileToHdfs <br/>
 * Function: TODO ADD FUNCTION. <br/>
 * Reason: TODO ADD REASON. <br/>
 * Date: 2016年3月28日 下午10:50:37 <br/>
 * 
 * @author qiyongkang
 * @version
 * @since JDK 1.6
 * @see
 */
public class UploadFtpFileToHdfs {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        loadFromFtpToHdfs("172.31.26.200", "qiyongkang", "qyk123456", "/www/input/", "/qiyongkang/", conf);
    }

    /**
     * 
     * loadFromFtpToHdfs:将数据从ftp上传到hdfs上. <br/>
     *
     * @author qiyongkang
     * @param ip
     * @param username
     * @param password
     * @param filePath
     * @param outputPath
     * @param conf
     * @return
     * @since JDK 1.6
     */
    private static boolean loadFromFtpToHdfs(String ip, String username, String password, String filePath,
            String outputPath, Configuration conf) {
        FTPClient ftp = new FTPClient();
        InputStream inputStream = null;
        FSDataOutputStream outputStream = null;
        boolean flag = true;
        try {
            ftp.connect(ip);
            ftp.login(username, password);
            ftp.setFileType(FTP.BINARY_FILE_TYPE);
            ftp.setControlEncoding("UTF-8");
            int reply = ftp.getReplyCode();
            if (!FTPReply.isPositiveCompletion(reply)) {
                ftp.disconnect();
            }
            FTPFile[] files = ftp.listFiles(filePath);
            FileSystem hdfs = FileSystem.get(conf);
            for (FTPFile file : files) {
                if (!(file.getName().equals(".") || file.getName().equals(".."))) {
                    inputStream = ftp.retrieveFileStream(filePath + file.getName());
                    outputStream = hdfs.create(new Path(outputPath + file.getName()));
                    IOUtils.copyBytes(inputStream, outputStream, conf, false);
                    if (inputStream != null) {
                        inputStream.close();
                        ftp.completePendingCommand();
                    }
                }
            }
            ftp.disconnect();
        } catch (Exception e) {
            flag = false;
            e.printStackTrace();
        }
        return flag;
    }
}

然后同样打包上传后执行yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar,便可看到:
这里写图片描述

好了,关于hdfs的文件上传就讲到这儿了,希望给大家提供点帮助。

GitHub 加速计划 / li / linux-dash
10.39 K
1.2 K
下载
A beautiful web dashboard for Linux
最近提交(Master分支:2 个月前 )
186a802e added ecosystem file for PM2 4 年前
5def40a3 Add host customization support for the NodeJS version 4 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐