Değerli KD okuyucuları, bir önceki yazımda temel Apache Hadoop hakkında bilgiler edinmiş ufak çaplı bir MapReduce programı yazmıştık. Bu yazımızda ise MapReduce programlarının detaylarından bahsedeceğiz. Aşağıda basit bir MapReduce programının diyagramı bulunuyor.

MapReduce nasıl çalışır

Şemada aslında her şey yeterince açık. Sonuca baktığımızda kelimelerin bulunduğu havuzda her bir kelimeden kaç tane olduğunu buluyoruz fakat her zaman adet bazında işlemlerimiz de olmuyor. Bu yazımızda MapReduce algoritmasının derinliklerine ineceğiz. Custom Writable, Combiner, Partitioner konularını ele alacağız.

Örneklerimizi açık bir veri seti olan Newyork Stock Exchange(NYSE) üzerinden yapacağız. Veri setini https://s3.amazonaws.com/hw-sandbox/tutorial1/NYSE-2000-2001.tsv.gz adresinden indirelim.

Dosyayı arşivden çıkarttıktan sonra CSV türüne dönüştürmemiz gerekiyor. Aynı zamanda kullanılabilir bir veri şeklinde sokabilmemiz için Identity türünde bir ek alana da daha ihtiyacımız var. Tüm bunlar için aşağıda ki komutu uygulayalım.

awk '{printf "%s,%s\n",NR,$0}' NYSE-2000-2001.tsv | sed '2,$y/\t/,/' > data.csv

Şimdi herhangi bir metin editöründe “NYSE,” metnini tüm satırlar için silelim. Bunun için Find And Replace türünde bir işlem yapabilirsiniz. Artık örneklerimizi yapmaya hazırız.

Custom Writable

Custom Writable dediğimiz mevzu aslında kendi classlarımızı işleyebilmemiz anlamına geliyor. Bir önce ki makalemizde hatırlarsanız key tipini IntWritable olarak kullanmıştık. Şimdi ise kendi key tipimizi tanımlayacağız. Bunun için Hadoop çekirdeğinden WritableComparable Interface’ini implement edeceğiz.

Abstract olarak üç metot içeren bu Interface, tanımladığımız bir Java Class’ı Context türünde ki nesnelerimize işleme olanağı sağlar. Şimdi bu NYSE veri setini bir class’a çevirelim. Bunun için yeni bir Java Class’ı ekledikten sonra gerekli propertyler’i tanımlayalım.

    public IntWritable id;
    public Text symbol;
    public Text date;
    public FloatWritable priceOpen;
    public FloatWritable priceHigh;
    public FloatWritable priceLow;
    public FloatWritable priceClose;
    public IntWritable stockVolume;
    public FloatWritable priceAdjClose;

Şimdi bu Class’ın Constructor metodunu kullanacağız ve propertyler’imizi set edeceğiz.

    public StockExchange()
    {
        this.id = new IntWritable();
        this.symbol = new Text();
        this.date = new Text();
        this.priceOpen = new FloatWritable();
        this.priceHigh = new FloatWritable();
        this.priceLow = new FloatWritable();
        this.priceClose = new FloatWritable();
        this.stockVolume = new IntWritable();
        this.priceAdjClose = new FloatWritable();
    }

    public StockExchange(
            IntWritable id, Text symbol, 
            Text date, FloatWritable priceOpen,
            FloatWritable priceHigh, FloatWritable priceLow,
            FloatWritable priceClose, IntWritable stockVolume,
            FloatWritable priceAdjClose)
    {
        this.id = id;
        this.symbol = symbol;
        this.date = date;
        this.priceOpen = priceOpen;
        this.priceHigh = priceHigh;
        this.priceClose = priceClose;
        this.stockVolume = stockVolume;
        this.priceAdjClose = priceAdjClose;
    }

Classımıza özgün setter metodumuzu tanımlayalım.

    public void set(IntWritable id, Text symbol, 
            Text date, FloatWritable priceOpen,
            FloatWritable priceHigh, FloatWritable priceLow,
            FloatWritable priceClose, IntWritable stockVolume,
            FloatWritable priceAdjClose)
    {
        this.id = id;
        this.symbol = symbol;
        this.date = date;
        this.priceOpen = priceOpen;
        this.priceHigh = priceHigh;
        this.priceClose = priceClose;
        this.stockVolume = stockVolume;
        this.priceAdjClose = priceAdjClose;
    }

 

Daha sonra Context nesnemize işlemek üzere önemli olan iki alanımız id ve symbol propertyler’i için getter metodlarını da ekleyelim.

 

    public IntWritable getId(){
        return this.id;
    }
    
    public Text getSymbol(){
        return this.symbol;
    }

 

Şu ana kadar standart bir Java Class’ı yazdık. Gelelim Interface tarafına. Classımızı WritableComparable Interface’ine implement edelim.

public class StockExchange implements WritableComparable<StockExchange>

Şimdi abstract metotlarımızı ekliyoruz.

 

    @Override
    public void write(DataOutput out) throws IOException {
        this.id.write(out);
        this.symbol.write(out);
        this.date.write(out);
        this.priceOpen.write(out);
        this.priceHigh.write(out);
        this.priceClose.write(out);
        this.stockVolume.write(out);
        this.priceAdjClose.write(out);
    }

 

    @Override
    public void readFields(DataInput in) throws IOException {
        this.id.readFields(in);
        this.symbol.readFields(in);
        this.date.readFields(in);
        this.priceOpen.readFields(in);
        this.priceHigh.readFields(in);
        this.priceClose.readFields(in);
        this.stockVolume.readFields(in);
        this.priceAdjClose.readFields(in);
    }

 

    @Override
    public int compareTo(StockExchange o) {
        return symbol.compareTo(o.symbol);
    }
    
    @Override 
    public boolean equals(Object o)
    {
        if (o instanceof StockExchange)
        {
            StockExchange se = (StockExchange)o;
            return this.symbol.equals(se.symbol);
        }
        return false;
    }

    @Override
    public int hashCode() {
        return this.symbol.hashCode();
    }

 

Classımızı hazır. Şimdi basit bir senaryo ile MapReduce fonksiyonlarımızı yazalım. Senaryomuz şu şekilde olsun; Her bir stok biriminin toplam hareketini ve hacmini bulalım.

public class CustomTypesSample extends Configured implements Tool

Diyerek başlayalım ve hemen Map fonksiyonumuzu ekleyelim.

   public static class StockMapper extends Mapper<LongWritable, Text, StockExchange, IntWritable>
   {
       private static final IntWritable one = new IntWritable(1);
       private final StockExchange stockExchange = new StockExchange();
       private final IntWritable id = new IntWritable();
       private final Text symbol = new Text();
       private final Text date = new Text();
       private final FloatWritable priceOpen = new FloatWritable();
       private final FloatWritable priceHigh = new FloatWritable();
       private final FloatWritable priceLow = new FloatWritable();
       private final FloatWritable priceClose = new FloatWritable();
       private final IntWritable stockVolume = new IntWritable();
       private final FloatWritable priceAdjClose = new FloatWritable();
       @Override
       public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
       {
           String[] values = value.toString().split(",");
           id.set(Integer.parseInt(values[0]));
           symbol.set(values[1]);
           date.set(values[2]);
           priceOpen.set(Float.parseFloat(values[3]));
           priceHigh.set(Float.parseFloat(values[4]));
           priceLow.set(Float.parseFloat(values[5]));
           priceClose.set(Float.parseFloat(values[6]));
           stockVolume.set(Integer.parseInt(values[7]));
           priceAdjClose.set(Float.parseFloat(values[8]));
           stockExchange.set(id, symbol, date, priceOpen, priceHigh, priceLow, priceClose, stockVolume, priceAdjClose);
           context.write(stockExchange, stockVolume);
       }
   }

 

Map Class’ımızın extends parametrelerine ve map metoduna baktığımızda, context nesnemize işlenmek üzere önceden tanımladığımız StockExchange tipi var. Aslında genel adıyla “entity” nesnemizi işletmiş oluyoruz. Şimdi reduce metodumuzu yazalım.

   public static class StockReducer extends Reducer<StockExchange, IntWritable, Text, IntWritable>
   {
       private final IntWritable result = new IntWritable();
       private Text symbol = new Text();
       
       @Override
       public void reduce(StockExchange key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
       {
          int sum = 0;
          symbol = key.getSymbol();
          for(IntWritable val : values)
          {
              
              sum += val.get();
          }
          result.set(sum);
          context.write(symbol, result);
       }
   }

 

Reduce metodunun içerisinde bir satıra dikkat çekmek istiyorum:  symbol = key.getSymbol(); yani key nesnesi bizim belirlediğimiz bir tip ve yazdığımız bir metod üzerinden geliyor. Buna bağlı olarakta stok adını reduce işlemine farklı efor sarfettirmeden sokabiliyoruz. Son olarakta map fonksiyonunda context’e stockVolume property’sini value olarak işlettirdiğimiz için, reduce içerisinde toplamasını da rahatlıkla yapabiliyoruz. main metodumuza geçelim.

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job();
        job.setJobName("Stock Exchange Job");
        job.setJarByClass(CustomTypesSample.class);
        
        job.setMapperClass(StockMapper.class);
        job.setCombinerClass(StockCombiner.class);
        job.setReducerClass(StockReducer.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        job.setMapOutputKeyClass(StockExchange.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return (job.waitForCompletion(true) ? 1 : 0);
    }

 

   public static void main(String[] args) throws IOException, Exception
   {
       int res = ToolRunner.run(new Configuration(), new CustomTypesSample(), args);
       System.exit(res);
       
   }

 

Önceki makalede gördüğümüz gibi aynı şekilde run adında bir metod tanımlayıp input/output parametrelerini belirtiyoruz.

Programı derledikten sonra çalıştıralım.

hadoop jar MapReduceSamples-1.0.jar kodumundunyasi.mapreducesamples.CustomTypesSample kdOrnekler/in kdOrnekler/out

bash-4.1# hadoop jar MapReduceSamples-1.0.jar kodumundunyasi.mapreducesamples.Cu

stomTypesSample kdOrnekler/in kdOrnekler/out

16/05/19 09:07:20 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0

:8032

16/05/19 09:07:21 WARN mapreduce.JobResourceUploader: Hadoop command-line option

parsing not performed. Implement the Tool interface and execute your applicatio

n with ToolRunner to remedy this.

16/05/19 09:07:21 INFO input.FileInputFormat: Total input paths to process : 1

16/05/19 09:07:21 INFO mapreduce.JobSubmitter: number of splits:1

16/05/19 09:07:21 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_14

63645504864_0014

16/05/19 09:07:22 INFO impl.YarnClientImpl: Submitted application application_14

63645504864_0014

16/05/19 09:07:22 INFO mapreduce.Job: The url to track the job: http://8baa64519

29f:8088/proxy/application_1463645504864_0014/

16/05/19 09:07:22 INFO mapreduce.Job: Running job: job_1463645504864_0014

16/05/19 09:07:30 INFO mapreduce.Job: Job job_1463645504864_0014 running in uber

mode : false

16/05/19 09:07:30 INFO mapreduce.Job:  map 0% reduce 0%

16/05/19 09:07:43 INFO mapreduce.Job:  map 67% reduce 0%

16/05/19 09:07:47 INFO mapreduce.Job:  map 100% reduce 0%

16/05/19 09:07:56 INFO mapreduce.Job:  map 100% reduce 100%

16/05/19 09:07:57 INFO mapreduce.Job: Job job_1463645504864_0014 completed succe

ssfully

16/05/19 09:07:58 INFO mapreduce.Job: Counters: 49

File System Counters

FILE: Number of bytes read=36518537

FILE: Number of bytes written=73267467

FILE: Number of read operations=0

FILE: Number of large read operations=0

FILE: Number of write operations=0

HDFS: Number of bytes read=45520833

HDFS: Number of bytes written=22879

HDFS: Number of read operations=6

HDFS: Number of large read operations=0

HDFS: Number of write operations=2

Job Counters

Launched map tasks=1

Launched reduce tasks=1

Data-local map tasks=1

Total time spent by all maps in occupied slots (ms)=14529

Total time spent by all reduces in occupied slots (ms)=7267

Total time spent by all map tasks (ms)=14529

Total time spent by all reduce tasks (ms)=7267

Total vcore-seconds taken by all map tasks=14529

Total vcore-seconds taken by all reduce tasks=7267

Total megabyte-seconds taken by all map tasks=14877696

Total megabyte-seconds taken by all reduce tasks=7441408

Map-Reduce Framework

Map input records=812989

Map output records=812989

Map output bytes=34892553

Map output materialized bytes=36518537

Input split bytes=122

Combine input records=0

Combine output records=0

Reduce shuffle bytes=36518537

Reduce output records=17349

Shuffled Maps =11625978

Merged Map outputs=1

CPU time spent (ms)=12140

Virtual memory (bytes) snapshot=1391702016

Shuffle Errorscommitted heap usage (bytes)=198053888

CONNECTION=0

WRONG_LENGTH=0

WRONG_REDUCE=0

File InpBytes Read=45520711

File OutBytes Written=22879

Herşey tamam, sonuca bakalım

SNH     36582800

SNN     6724200

SNP     21844800

SNS     1116000

SNV     204124500

SO      948512200

SON     79370900

SOR     3458400

SPA     2569200

SPF     195048200

SPG     188042000

SPH     24934500

SPN     30090800

SPP     87433700

SPW     167380800

SQM     97164000

SR      17715200

SRE     339118200

SRI     12150900

SRT     30680400

SRZ     275164400

SSD     17488400

SSL     24065800

SSP     32296700

SSS     16306900

STC     17453400

devam eder….

 

Combiner

Combiner özelliği yardımcı reducer olarak ta tanımlanabilir. Opsiyonel bir özelliktir. Performans açısından önem arz eder. Map işleminden sonra, Reduce işleminden önce çalışır. Yazılış biçimini Reduce Class’ından farksızdır ancak belli bir formüle uyarlanması gerekir. Map işleminden sonra ki key-value parçalarını Reduce işlemine iletir. Aynı key değelerini Reduce fonksiyonuna özetleyerek iletir böylelikle Reduce fonksiyonunun yükü azaltılır.

 

combiner

Şemada görüldüğü üzere Map üzerinde ki işlemleri tekrar birleştirme işlemi yaparak yerine yerleştirir.

Şimdi Combiner Classımızı yazalım.

   public static class StockCombiner extends Reducer<StockExchange, IntWritable, StockExchange, IntWritable>
   {
       @Override
       public void reduce(StockExchange key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
       {
           int sum = 0;
           for(IntWritable val : values)
           {
               sum++;
           }
           context.write(key, new IntWritable(sum));
       }
   }

 

Combiner classımızı job nesnesine belirtmemiz gerekiyor.

job.setCombinerClass(StockCombiner.class);

çalıştırdığımızda alınacak sonuç aşağıda ki gibidir.

bash-4.1# hadoop jar MapReduceSamples-1.0.jar kodumundunyasi.mapreducesamples.Cu

stomTypesSample kdOrnekler/in kdOrnekler/out

16/05/21 05:13:22 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0

:8032

16/05/21 05:13:23 WARN mapreduce.JobResourceUploader: Hadoop command-line option

parsing not performed. Implement the Tool interface and execute your applicatio

n with ToolRunner to remedy this.

16/05/21 05:13:24 INFO input.FileInputFormat: Total input paths to process : 1

16/05/21 05:13:24 INFO mapreduce.JobSubmitter: number of splits:1

16/05/21 05:13:24 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_14

63820534230_0002

16/05/21 05:13:24 INFO impl.YarnClientImpl: Submitted application application_14

63820534230_0002

16/05/21 05:13:24 INFO mapreduce.Job: The url to track the job: http://8baa64519

29f:8088/proxy/application_1463820534230_0002/

16/05/21 05:13:24 INFO mapreduce.Job: Running job: job_1463820534230_0002

16/05/21 05:13:34 INFO mapreduce.Job: Job job_1463820534230_0002 running in uber

mode : false

16/05/21 05:13:34 INFO mapreduce.Job:  map 0% reduce 0%

16/05/21 05:13:47 INFO mapreduce.Job:  map 67% reduce 0%

16/05/21 05:13:52 INFO mapreduce.Job:  map 100% reduce 0%

16/05/21 05:14:01 INFO mapreduce.Job:  map 100% reduce 100%

16/05/21 05:14:01 INFO mapreduce.Job: Job job_1463820534230_0002 completed succe

ssfully

16/05/21 05:14:01 INFO mapreduce.Job: Counters: 49

File System Counters

FILE: Number of bytes read=77898

FILE: Number of bytes written=386581

FILE: Number of read operations=0

FILE: Number of large read operations=0

FILE: Number of write operations=0

HDFS: Number of bytes read=45520833

HDFS: Number of bytes written=13696

HDFS: Number of read operations=6

HDFS: Number of large read operations=0

HDFS: Number of write operations=2

Job Counters

Launched map tasks=1

Launched reduce tasks=1

Data-local map tasks=1

Total time spent by all maps in occupied slots (ms)=15438

Total time spent by all reduces in occupied slots (ms)=6964

Total time spent by all map tasks (ms)=15438

Total time spent by all reduce tasks (ms)=6964

Total vcore-seconds taken by all map tasks=15438

Total vcore-seconds taken by all reduce tasks=6964

Total megabyte-seconds taken by all map tasks=15808512

Total megabyte-seconds taken by all reduce tasks=7131136

Map-Reduce Framework

Map input records=812989

Map output records=812989

Map output bytes=34892553

Map output materialized bytes=77898

Input split bytes=122

Combine input records=812989

Combine output records=1734

Reduce shuffle bytes=77898

Reduce output records=1734

Shuffled Maps =13468

Merged Map outputs=1

CPU time spent (ms)=6490

Virtual memory (bytes) snapshot=1391874048

Shuffle Errorscommitted heap usage (bytes)=198053888

CONNECTION=0

WRONG_LENGTH=0

WRONG_REDUCE=0

File InpBytes Read=45520711

File OutBytes Written=13696

Burada bazı çıktılara dikkat çekmek istiyorum

Shuffle, Combine ve CPU için harcanan değerlere baktığımızda Combine olmadan ki halinden çok çok daha az efor sarfedilmiş. Petabyte boyutlarında ki veriler için bu fark korkunç derecede artacak ve performansa etki edecektir. Bu nedenle Combiner doğru yerde kullanıldığında büyük bir avantaj yakalamamıza olanak sağlıyor.

Partitioner

Partitioner özelliği verileri farklı kombinasyonlar ile işlememizi sağlar. Combiner sınıfı gibi Map sonrası, Reduce öncesi çalışır. Partitioner sayısı Reduce sayısı ile eşit olmalıdır. Çünkü veriyi aynı simetride bölmesi gereklidir. Bunun için job nesnesinde reduce sayısı belirtilir. Hash metotları ile aynı mantık içerisinde çalışır.

Örnek senaryomuz şu şekilde olsun; Her yıl içerisinde ki en yüksek stok hareketini bulan bir MapReduce programı yazalım.

Mapper sınıfımız:

    public static class MapperClass extends Mapper<LongWritable, Text, IntWritable, IntWritable>
    {
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
        {
            String[] values = value.toString().split(",");
            String year = values[2].split("-")[0];
            String volume = values[7];
            
            
            context.write(new IntWritable(Integer.parseInt(year)), new IntWritable(Integer.parseInt(volume)));   
            
        }
    }

Burada key olarak yılı kullandık, çünkü yıl bazında sonuca ulaşmaya çalışıyoruz. Daha sonra stock_volume alanını okuyarak context nesnemize işliyoruz.

Reducer sınıfımız:

    public static class ReducerClass extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>
    {
        public int max = -1;
      
        @Override
        public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
        {
            max = -1;
            
            for(IntWritable value : values)
            {
            
                if(value.get() > max){
                    max = value.get();
                }
            }
            context.write(key, new IntWritable(max));
            
            
        }
    }

Reducer sınıfımızda yaptığımız işlem; max adında bir nesnemiz var ve bu nesne döngü içerisinde en yüksek rakamsal değeri bulana kadar dönüyor. Döndükten sonra da context nesnemize işliyoruz.

Partitioner sınıfımız:

    public static class PartitionerClass extends Partitioner<IntWritable, IntWritable>
    {
        @Override
        public int getPartition(IntWritable key, IntWritable value, int numReduceTasks)
        {
            
            int year = key.get();
            if (numReduceTasks == 0)
            {
                return 0;
            }
            if (year == 2000)
            {
                return 0;
            }
            else
            {
                return 1 % numReduceTasks;
            }
        }
    }

Yıla göre hangi partitionera yönerge vereceğini hesaplattırıyoruz. Burada dönüş 0 ve 1 olabilir. Çünkü iki yıl üzerinden partitioner işlemini başlatıyoruz. Index olarak düşündüğümüzde 0 2000 yılına, 1 2001 yılına işleniyor. Eğer veri setimiz 2000, 2001 ve 2002 yılları olsaydı bir mod işlemi daha eklememiz gerekecekti.

Son olarak main metodumuz ve run metodumuz

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
		
        Job job = new Job(conf, "Partitioner Example");
        job.setJarByClass(PartitionerSample.class);
		
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        job.setMapperClass(MapperClass.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);


        job.setPartitionerClass(PartitionerClass.class);
        job.setReducerClass(ReducerClass.class);
        job.setNumReduceTasks(2);
        job.setInputFormatClass(TextInputFormat.class);

        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true)? 0 : 1);
        return 0;
    }

Job nesnesine setNumReduceTasks metodu ile 2 tane reduce olacağını belirttik ve partitioner classımızı belirttik. Çalıştırdığımızda out klasörümüzde iki tane dosya olacaktır.

bash-4.1# hadoop dfs -ls kdOrnekler/out

DEPRECATED: Use of this script to execute hdfs command is deprecated.

Instead use the hdfs command for it.



Found 3 items

-rw-r--r--   1 root supergroup          0 2016-05-21 07:48 kdOrnekler/out/_SUCCE

SS

-rw-r--r--   1 root supergroup         15 2016-05-21 07:47 kdOrnekler/out/part-r

-00000

-rw-r--r--   1 root supergroup         15 2016-05-21 07:48 kdOrnekler/out/part-r

-00001

bash-4.1# hadoop dfs -cat kdOrnekler/out/part-r-00000

DEPRECATED: Use of this script to execute hdfs command is deprecated.

Instead use the hdfs command for it.



2000    143929200

bash-4.1# hadoop dfs -cat kdOrnekler/out/part-r-00001

DEPRECATED: Use of this script to execute hdfs command is deprecated.

Instead use the hdfs command for it.



2001    358280000

Tüm dosyaları GitHub sayfamdan indirebilirsiniz.

Bir sonra ki makale de görüşmek üzere.

Facebook'dan yorumla