Processing data from AWS S3 using Cascading on Apache Hadoop YARN

S3 Native FileSystem (URI scheme: s3n)

A native filesystem for reading and writing regular files on s3. The advantage of this filesystem is that you can access files on s3 that were written with other tools. Conversely, other tools can access files written using Hadoop. The disadvantage is the 5GB limit on file size imposed by s3. For this reason it is not suitable as a replacement for HDFS (which has support for very large files).

S3 Block FileSystem (URI scheme: s3)

A block-based filesystem backed by s3. Files are stored as blocks, just like they are in HDFS. This permits efficient implementation of renames. This filesystem requires you to dedicate a bucket for the filesystem - you should not use an existing bucket containing files, or write other files to the same bucket. The files stored by this filesystem can be larger than 5GB, but they are not interoperable with other s3 tools.

Note

Note that for input you must create bucket manually and upload files in that bucket, for output you must create bucket and directory specified for output must not already exist.

Configuration for S3

Add the following entry to hdfs-site.xml

$ vim HADOOP_HOME/etc/hadoop/hdfs-site.xml
 <property>
     <name>fs.s3.awsAccessKeyId</name>
     <value>AWS-ACCESS-KEY</value>
 </property>
 <property>
     <name>fs.s3.awsSecretAccessKey</name>
     <value>AWS-SECRET-KEY</value>
 </property>

OR

You can supply these properties to FlowConnector as done in example below

OR

 s3n://ID:SECRET@BUCKET (use as url)

Example:

 $> HADOOP_HOME/bin/hadoop distcp hdfs://ip:9001/user/nutch/file.csv 
         s3://ACCESSKEY:SECRETKEY@bucket_name/

What are Access Key and Secret Key

AWS access key

This is actually a username . It is alphanumeric text string that uniquely identifies the user who owns the account. No two accounts can have the same AWS Access Key.

AWS Secret key

This key plays the role of a password . It's called secret because it is assumed to be known by the owner only that's why,when you type it in the given box, its displayed as asterisk or dots. A Password with Access Key forms a secure information set that confirms the user's identity. You are advised to keep your Secret Key in a safe place.

Disadvantages

Note that by using S3 as an input you lose the data locality optimization, which may be significant.

General Practise

The general best practise is to copy in data using distcp at the start of a workflow, then copy it out at the end, using the transient HDFS in between.


Cascading example app

public class Main {
    public void run(String[] args) {
        Properties properties = new Properties();
        String accessKey = args[0];
        String secretKey = args[1];
        // better put these keys to hadoop xml file
        // for block file system
        properties.setProperty("fs.s3.awsAccessKeyId", accessKey);
        properties.setProperty("fs.s3.awsSecretAccessKey", secretKey);
        // for s3 native file system
        // properties.setProperty("fs.s3n.awsAccessKeyId", accessKey);
        // properties.setProperty("fs.s3n.awsSecretAccessKey", secretKey);
        // properties.setProperty("fs.defaultFS", "hdfs://localhost:8020/");
        // properties.setProperty("fs.permissions.umask-mode", "007");
        AppProps.setApplicationJarClass(properties, Main.class);
        HadoopFlowConnector flowConnector = new HadoopFlowConnector(     
                                              properties );
        String input = "s3://my-bucket/my-log.csv";
        Tap inTap = new Hfs( new TextDelimited( false, ";" ), input);
        Pipe copyPipe = new Pipe( "copy" );
        Tap outTap = new Hfs( new TextDelimited( false, ";" ),
                                         "data/output");
        FlowDef flowDef = FlowDef.flowDef()
        .addSource( copyPipe, inTap )
        .addTailSink( copyPipe, outTap );
        flowConnector.connect( flowDef ).complete();
    }

  public static void main(String[] args) {
    new Main().run(args);
  }
}

6 comments

Thanks for your support, i am very interested in learning Hadoop.. If you want more details on HADOOP BIGDATA
just go through this link.....http://www.tekclasses.com/courses/hadoop/

Thanks for your support, i am very interested in learning Hadoop.. If you want more details on HADOOP BIGDATA
just go through this link.....http://www.tekclasses.com/courses/hadoop/

Thanks for your support, i am very interested in learning Hadoop.. If you want more details on HADOOP BIGDATA
just go through this link.....http://www.tekclasses.com/courses/hadoop/

Thanks for your support, i am very interested in learning Hadoop.. If you want more details on HADOOP BIGDATA
just go through this link.....http://www.tekclasses.com/courses/hadoop/

Thanks for your support, i am very interested in learning Hadoop.. If you want more details on HADOOP BIGDATA
just go through this link.....http://www.tekclasses.com/courses/hadoop/

When running this example I am getting below error -

2016-11-24 20:22:49.838 INFO util.Util: resolving application jar from found main method on: testing.S3Testing
2016-11-24 20:22:49.842 INFO planner.HadoopPlanner: using application jar: null
2016-11-24 20:22:50.176 INFO property.AppProps: using app.id: 179B7D571F0E4994AB28EFC183250531
2016-11-24 20:22:53.607 INFO flow.Flow: [] executed rule registry: MapReduceHadoopRuleRegistry, completed as: SUCCESS, in: 00:00.165
2016-11-24 20:22:53.614 INFO flow.Flow: [] rule registry: MapReduceHadoopRuleRegistry, supports assembly with steps: 1, nodes: 1
2016-11-24 20:22:53.615 INFO flow.Flow: [] rule registry: MapReduceHadoopRuleRegistry, result was selected using: 'default comparator: selects plan with fewest steps and fewest nodes'
2016-11-24 20:22:53.676 INFO Configuration.deprecation: mapred.used.genericoptionsparser is deprecated. Instead, use mapreduce.client.genericoptionsparser.used
Exception in thread "main" cascading.flow.planner.PlannerException: [] could not build flow from assembly: [unable to get handle to get filesystem for: s3n]
at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(FlowPlanner.java:749)
at cascading.flow.planner.FlowPlanner.buildFlow(FlowPlanner.java:209)
at cascading.flow.FlowConnector.connect(FlowConnector.java:456)
at testing.S3Testing.run(S3Testing.java:36)
at testing.S3Testing.main(S3Testing.java:40)
Caused by: cascading.tap.TapException: unable to get handle to get filesystem for: s3n
at cascading.tap.hadoop.Hfs.getFileSystem(Hfs.java:301)
at cascading.tap.hadoop.Hfs.getFullIdentifier(Hfs.java:330)
at cascading.tap.hadoop.Hfs.sourceConfInit(Hfs.java:336)
at cascading.tap.hadoop.Hfs.sourceConfInit(Hfs.java:109)
at cascading.flow.hadoop.HadoopFlowStep.initFromSources(HadoopFlowStep.java:405)
at cascading.flow.hadoop.HadoopFlowStep.createInitializedConfig(HadoopFlowStep.java:118)
at cascading.flow.hadoop.HadoopFlowStep.createInitializedConfig(HadoopFlowStep.java:82)
at cascading.flow.planner.BaseFlowStep.getCreateFlowStepJob(BaseFlowStep.java:910)
at cascading.flow.BaseFlow.initializeNewJobsMap(BaseFlow.java:1353)
at cascading.flow.BaseFlow.initialize(BaseFlow.java:247)
at cascading.flow.planner.FlowPlanner.buildFlow(FlowPlanner.java:203)
... 3 more
Caused by: java.io.IOException: No FileSystem for scheme: s3n
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at cascading.tap.hadoop.Hfs.getFileSystem(Hfs.java:297)
... 13 more