Değerli KD okuyucuları, bir önceki yazımda temel Apache Hadoop hakkında bilgiler edinmiş ufak çaplı bir MapReduce programı yazmıştık. Bu yazımızda ise MapReduce programlarının detaylarından bahsedeceğiz. Aşağıda basit bir MapReduce programının diyagramı bulunuyor.

MapReduce nasıl çalışır

Şemada aslında her şey yeterince açık. Sonuca baktığımızda kelimelerin bulunduğu havuzda her bir kelimeden kaç tane olduğunu buluyoruz fakat her zaman adet bazında işlemlerimiz de olmuyor. Bu yazımızda MapReduce algoritmasının derinliklerine ineceğiz. Custom Writable, Combiner, Partitioner konularını ele alacağız.

Örneklerimizi açık bir veri seti olan Newyork Stock Exchange(NYSE) üzerinden yapacağız. Veri setini https://s3.amazonaws.com/hw-sandbox/tutorial1/NYSE-2000-2001.tsv.gz adresinden indirelim.

Dosyayı arşivden çıkarttıktan sonra CSV türüne dönüştürmemiz gerekiyor. Aynı zamanda kullanılabilir bir veri şeklinde sokabilmemiz için Identity türünde bir ek alana da daha ihtiyacımız var. Tüm bunlar için aşağıda ki komutu uygulayalım.

awk '{printf "%s,%s\n",NR,$0}' NYSE-2000-2001.tsv | sed '2,$y/\t/,/' > data.csv

Şimdi herhangi bir metin editöründe “NYSE,” metnini tüm satırlar için silelim. Bunun için Find And Replace türünde bir işlem yapabilirsiniz. Artık örneklerimizi yapmaya hazırız.

Custom Writable

Custom Writable dediğimiz mevzu aslında kendi classlarımızı işleyebilmemiz anlamına geliyor. Bir önce ki makalemizde hatırlarsanız key tipini IntWritable olarak kullanmıştık. Şimdi ise kendi key tipimizi tanımlayacağız. Bunun için Hadoop çekirdeğinden WritableComparable Interface’ini implement edeceğiz.

Abstract olarak üç metot içeren bu Interface, tanımladığımız bir Java Class’ı Context türünde ki nesnelerimize işleme olanağı sağlar. Şimdi bu NYSE veri setini bir class’a çevirelim. Bunun için yeni bir Java Class’ı ekledikten sonra gerekli propertyler’i tanımlayalım.

    public IntWritable id;
public Text symbol;
public Text date;
public FloatWritable priceOpen;
public FloatWritable priceHigh;
public FloatWritable priceLow;
public FloatWritable priceClose;
public IntWritable stockVolume;
public FloatWritable priceAdjClose;

Şimdi bu Class’ın Constructor metodunu kullanacağız ve propertyler’imizi set edeceğiz.

    public StockExchange()
{
this.id = new IntWritable();
this.symbol = new Text();
this.date = new Text();
this.priceOpen = new FloatWritable();
this.priceHigh = new FloatWritable();
this.priceLow = new FloatWritable();
this.priceClose = new FloatWritable();
this.stockVolume = new IntWritable();
this.priceAdjClose = new FloatWritable();
}

    public StockExchange(
IntWritable id, Text symbol, 
Text date, FloatWritable priceOpen,
FloatWritable priceHigh, FloatWritable priceLow,
FloatWritable priceClose, IntWritable stockVolume,
FloatWritable priceAdjClose)
{
this.id = id;
this.symbol = symbol;
this.date = date;
this.priceOpen = priceOpen;
this.priceHigh = priceHigh;
this.priceClose = priceClose;
this.stockVolume = stockVolume;
this.priceAdjClose = priceAdjClose;
}

Classımıza özgün setter metodumuzu tanımlayalım.

    public void set(IntWritable id, Text symbol, 
Text date, FloatWritable priceOpen,
FloatWritable priceHigh, FloatWritable priceLow,
FloatWritable priceClose, IntWritable stockVolume,
FloatWritable priceAdjClose)
{
this.id = id;
this.symbol = symbol;
this.date = date;
this.priceOpen = priceOpen;
this.priceHigh = priceHigh;
this.priceClose = priceClose;
this.stockVolume = stockVolume;
this.priceAdjClose = priceAdjClose;
}

 

Daha sonra Context nesnemize işlemek üzere önemli olan iki alanımız id ve symbol propertyler’i için getter metodlarını da ekleyelim.

 

    public IntWritable getId(){
return this.id;
}
public Text getSymbol(){
return this.symbol;
}

 

Şu ana kadar standart bir Java Class’ı yazdık. Gelelim Interface tarafına. Classımızı WritableComparable Interface’ine implement edelim.

public class StockExchange implements WritableComparable<StockExchange>

Şimdi abstract metotlarımızı ekliyoruz.

 

    @Override
public void write(DataOutput out) throws IOException {
this.id.write(out);
this.symbol.write(out);
this.date.write(out);
this.priceOpen.write(out);
this.priceHigh.write(out);
this.priceClose.write(out);
this.stockVolume.write(out);
this.priceAdjClose.write(out);
}

 

    @Override
public void readFields(DataInput in) throws IOException {
this.id.readFields(in);
this.symbol.readFields(in);
this.date.readFields(in);
this.priceOpen.readFields(in);
this.priceHigh.readFields(in);
this.priceClose.readFields(in);
this.stockVolume.readFields(in);
this.priceAdjClose.readFields(in);
}

 

    @Override
public int compareTo(StockExchange o) {
return symbol.compareTo(o.symbol);
}
@Override 
public boolean equals(Object o)
{
if (o instanceof StockExchange)
{
StockExchange se = (StockExchange)o;
return this.symbol.equals(se.symbol);
}
return false;
}
@Override
public int hashCode() {
return this.symbol.hashCode();
}

 

Classımızı hazır. Şimdi basit bir senaryo ile MapReduce fonksiyonlarımızı yazalım. Senaryomuz şu şekilde olsun; Her bir stok biriminin toplam hareketini ve hacmini bulalım.

public class CustomTypesSample extends Configured implements Tool

Diyerek başlayalım ve hemen Map fonksiyonumuzu ekleyelim.

   public static class StockMapper extends Mapper<LongWritable, Text, StockExchange, IntWritable>
{
private static final IntWritable one = new IntWritable(1);
private final StockExchange stockExchange = new StockExchange();
private final IntWritable id = new IntWritable();
private final Text symbol = new Text();
private final Text date = new Text();
private final FloatWritable priceOpen = new FloatWritable();
private final FloatWritable priceHigh = new FloatWritable();
private final FloatWritable priceLow = new FloatWritable();
private final FloatWritable priceClose = new FloatWritable();
private final IntWritable stockVolume = new IntWritable();
private final FloatWritable priceAdjClose = new FloatWritable();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String[] values = value.toString().split(",");
id.set(Integer.parseInt(values[0]));
symbol.set(values[1]);
date.set(values[2]);
priceOpen.set(Float.parseFloat(values[3]));
priceHigh.set(Float.parseFloat(values[4]));
priceLow.set(Float.parseFloat(values[5]));
priceClose.set(Float.parseFloat(values[6]));
stockVolume.set(Integer.parseInt(values[7]));
priceAdjClose.set(Float.parseFloat(values[8]));
stockExchange.set(id, symbol, date, priceOpen, priceHigh, priceLow, priceClose, stockVolume, priceAdjClose);
context.write(stockExchange, stockVolume);
}
}

 

Map Class’ımızın extends parametrelerine ve map metoduna baktığımızda, context nesnemize işlenmek üzere önceden tanımladığımız StockExchange tipi var. Aslında genel adıyla “entity” nesnemizi işletmiş oluyoruz. Şimdi reduce metodumuzu yazalım.

   public static class StockReducer extends Reducer<StockExchange, IntWritable, Text, IntWritable>
{
private final IntWritable result = new IntWritable();
private Text symbol = new Text();
@Override
public void reduce(StockExchange key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
symbol = key.getSymbol();
for(IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(symbol, result);
}
}

 

Reduce metodunun içerisinde bir satıra dikkat çekmek istiyorum:  symbol = key.getSymbol(); yani key nesnesi bizim belirlediğimiz bir tip ve yazdığımız bir metod üzerinden geliyor. Buna bağlı olarakta stok adını reduce işlemine farklı efor sarfettirmeden sokabiliyoruz. Son olarakta map fonksiyonunda context’e stockVolume property’sini value olarak işlettirdiğimiz için, reduce içerisinde toplamasını da rahatlıkla yapabiliyoruz. main metodumuza geçelim.

    @Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job();
job.setJobName("Stock Exchange Job");
job.setJarByClass(CustomTypesSample.class);
job.setMapperClass(StockMapper.class);
job.setCombinerClass(StockCombiner.class);
job.setReducerClass(StockReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapOutputKeyClass(StockExchange.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return (job.waitForCompletion(true) ? 1 : 0);
}

 

   public static void main(String[] args) throws IOException, Exception
{
int res = ToolRunner.run(new Configuration(), new CustomTypesSample(), args);
System.exit(res);
}

 

Önceki makalede gördüğümüz gibi aynı şekilde run adında bir metod tanımlayıp input/output parametrelerini belirtiyoruz.

Programı derledikten sonra çalıştıralım.

hadoop jar MapReduceSamples-1.0.jar kodumundunyasi.mapreducesamples.CustomTypesSample kdOrnekler/in kdOrnekler/out
bash-4.1# hadoop jar MapReduceSamples-1.0.jar kodumundunyasi.mapreducesamples.Cu
stomTypesSample kdOrnekler/in kdOrnekler/out
16/05/19 09:07:20 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0
:8032
16/05/19 09:07:21 WARN mapreduce.JobResourceUploader: Hadoop command-line option
parsing not performed. Implement the Tool interface and execute your applicatio
n with ToolRunner to remedy this.
16/05/19 09:07:21 INFO input.FileInputFormat: Total input paths to process : 1
16/05/19 09:07:21 INFO mapreduce.JobSubmitter: number of splits:1
16/05/19 09:07:21 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_14
63645504864_0014
16/05/19 09:07:22 INFO impl.YarnClientImpl: Submitted application application_14
63645504864_0014
16/05/19 09:07:22 INFO mapreduce.Job: The url to track the job: http://8baa64519
29f:8088/proxy/application_1463645504864_0014/
16/05/19 09:07:22 INFO mapreduce.Job: Running job: job_1463645504864_0014
16/05/19 09:07:30 INFO mapreduce.Job: Job job_1463645504864_0014 running in uber
mode : false
16/05/19 09:07:30 INFO mapreduce.Job:  map 0% reduce 0%
16/05/19 09:07:43 INFO mapreduce.Job:  map 67% reduce 0%
16/05/19 09:07:47 INFO mapreduce.Job:  map 100% reduce 0%
16/05/19 09:07:56 INFO mapreduce.Job:  map 100% reduce 100%
16/05/19 09:07:57 INFO mapreduce.Job: Job job_1463645504864_0014 completed succe
ssfully
16/05/19 09:07:58 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=36518537
FILE: Number of bytes written=73267467
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=45520833
HDFS: Number of bytes written=22879
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=14529
Total time spent by all reduces in occupied slots (ms)=7267
Total time spent by all map tasks (ms)=14529
Total time spent by all reduce tasks (ms)=7267
Total vcore-seconds taken by all map tasks=14529
Total vcore-seconds taken by all reduce tasks=7267
Total megabyte-seconds taken by all map tasks=14877696
Total megabyte-seconds taken by all reduce tasks=7441408
Map-Reduce Framework
Map input records=812989
Map output records=812989
Map output bytes=34892553
Map output materialized bytes=36518537
Input split bytes=122
Combine input records=0
Combine output records=0
Reduce shuffle bytes=36518537
Reduce output records=17349
Shuffled Maps =11625978
Merged Map outputs=1
CPU time spent (ms)=12140
Virtual memory (bytes) snapshot=1391702016
Shuffle Errorscommitted heap usage (bytes)=198053888
CONNECTION=0
WRONG_LENGTH=0
WRONG_REDUCE=0
File InpBytes Read=45520711
File OutBytes Written=22879

Herşey tamam, sonuca bakalım

SNH     36582800
SNN     6724200
SNP     21844800
SNS     1116000
SNV     204124500
SO      948512200
SON     79370900
SOR     3458400
SPA     2569200
SPF     195048200
SPG     188042000
SPH     24934500
SPN     30090800
SPP     87433700
SPW     167380800
SQM     97164000
SR      17715200
SRE     339118200
SRI     12150900
SRT     30680400
SRZ     275164400
SSD     17488400
SSL     24065800
SSP     32296700
SSS     16306900
STC     17453400

devam eder….

 

Combiner

Combiner özelliği yardımcı reducer olarak ta tanımlanabilir. Opsiyonel bir özelliktir. Performans açısından önem arz eder. Map işleminden sonra, Reduce işleminden önce çalışır. Yazılış biçimini Reduce Class’ından farksızdır ancak belli bir formüle uyarlanması gerekir. Map işleminden sonra ki key-value parçalarını Reduce işlemine iletir. Aynı key değelerini Reduce fonksiyonuna özetleyerek iletir böylelikle Reduce fonksiyonunun yükü azaltılır.

 

combiner

Şemada görüldüğü üzere Map üzerinde ki işlemleri tekrar birleştirme işlemi yaparak yerine yerleştirir.

Şimdi Combiner Classımızı yazalım.

   public static class StockCombiner extends Reducer<StockExchange, IntWritable, StockExchange, IntWritable>
{
@Override
public void reduce(StockExchange key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for(IntWritable val : values)
{
sum++;
}
context.write(key, new IntWritable(sum));
}
}

 

Combiner classımızı job nesnesine belirtmemiz gerekiyor.

job.setCombinerClass(StockCombiner.class);

çalıştırdığımızda alınacak sonuç aşağıda ki gibidir.

bash-4.1# hadoop jar MapReduceSamples-1.0.jar kodumundunyasi.mapreducesamples.Cu
stomTypesSample kdOrnekler/in kdOrnekler/out
16/05/21 05:13:22 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0
:8032
16/05/21 05:13:23 WARN mapreduce.JobResourceUploader: Hadoop command-line option
parsing not performed. Implement the Tool interface and execute your applicatio
n with ToolRunner to remedy this.
16/05/21 05:13:24 INFO input.FileInputFormat: Total input paths to process : 1
16/05/21 05:13:24 INFO mapreduce.JobSubmitter: number of splits:1
16/05/21 05:13:24 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_14
63820534230_0002
16/05/21 05:13:24 INFO impl.YarnClientImpl: Submitted application application_14
63820534230_0002
16/05/21 05:13:24 INFO mapreduce.Job: The url to track the job: http://8baa64519
29f:8088/proxy/application_1463820534230_0002/
16/05/21 05:13:24 INFO mapreduce.Job: Running job: job_1463820534230_0002
16/05/21 05:13:34 INFO mapreduce.Job: Job job_1463820534230_0002 running in uber
mode : false
16/05/21 05:13:34 INFO mapreduce.Job:  map 0% reduce 0%
16/05/21 05:13:47 INFO mapreduce.Job:  map 67% reduce 0%
16/05/21 05:13:52 INFO mapreduce.Job:  map 100% reduce 0%
16/05/21 05:14:01 INFO mapreduce.Job:  map 100% reduce 100%
16/05/21 05:14:01 INFO mapreduce.Job: Job job_1463820534230_0002 completed succe
ssfully
16/05/21 05:14:01 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=77898
FILE: Number of bytes written=386581
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=45520833
HDFS: Number of bytes written=13696
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=15438
Total time spent by all reduces in occupied slots (ms)=6964
Total time spent by all map tasks (ms)=15438
Total time spent by all reduce tasks (ms)=6964
Total vcore-seconds taken by all map tasks=15438
Total vcore-seconds taken by all reduce tasks=6964
Total megabyte-seconds taken by all map tasks=15808512
Total megabyte-seconds taken by all reduce tasks=7131136
Map-Reduce Framework
Map input records=812989
Map output records=812989
Map output bytes=34892553
Map output materialized bytes=77898
Input split bytes=122
Combine input records=812989
Combine output records=1734
Reduce shuffle bytes=77898
Reduce output records=1734
Shuffled Maps =13468
Merged Map outputs=1
CPU time spent (ms)=6490
Virtual memory (bytes) snapshot=1391874048
Shuffle Errorscommitted heap usage (bytes)=198053888
CONNECTION=0
WRONG_LENGTH=0
WRONG_REDUCE=0
File InpBytes Read=45520711
File OutBytes Written=13696

Burada bazı çıktılara dikkat çekmek istiyorum

Shuffle, Combine ve CPU için harcanan değerlere baktığımızda Combine olmadan ki halinden çok çok daha az efor sarfedilmiş. Petabyte boyutlarında ki veriler için bu fark korkunç derecede artacak ve performansa etki edecektir. Bu nedenle Combiner doğru yerde kullanıldığında büyük bir avantaj yakalamamıza olanak sağlıyor.

Partitioner

Partitioner özelliği verileri farklı kombinasyonlar ile işlememizi sağlar. Combiner sınıfı gibi Map sonrası, Reduce öncesi çalışır. Partitioner sayısı Reduce sayısı ile eşit olmalıdır. Çünkü veriyi aynı simetride bölmesi gereklidir. Bunun için job nesnesinde reduce sayısı belirtilir. Hash metotları ile aynı mantık içerisinde çalışır.

Örnek senaryomuz şu şekilde olsun; Her yıl içerisinde ki en yüksek stok hareketini bulan bir MapReduce programı yazalım.

Mapper sınıfımız:

    public static class MapperClass extends Mapper<LongWritable, Text, IntWritable, IntWritable>
{
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String[] values = value.toString().split(",");
String year = values[2].split("-")[0];
String volume = values[7];
context.write(new IntWritable(Integer.parseInt(year)), new IntWritable(Integer.parseInt(volume)));   
}
}

Burada key olarak yılı kullandık, çünkü yıl bazında sonuca ulaşmaya çalışıyoruz. Daha sonra stock_volume alanını okuyarak context nesnemize işliyoruz.

Reducer sınıfımız:

    public static class ReducerClass extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>
{
public int max = -1;
@Override
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
max = -1;
for(IntWritable value : values)
{
if(value.get() > max){
max = value.get();
}
}
context.write(key, new IntWritable(max));
}
}

Reducer sınıfımızda yaptığımız işlem; max adında bir nesnemiz var ve bu nesne döngü içerisinde en yüksek rakamsal değeri bulana kadar dönüyor. Döndükten sonra da context nesnemize işliyoruz.

Partitioner sınıfımız:

    public static class PartitionerClass extends Partitioner<IntWritable, IntWritable>
{
@Override
public int getPartition(IntWritable key, IntWritable value, int numReduceTasks)
{
int year = key.get();
if (numReduceTasks == 0)
{
return 0;
}
if (year == 2000)
{
return 0;
}
else
{
return 1 % numReduceTasks;
}
}
}

Yıla göre hangi partitionera yönerge vereceğini hesaplattırıyoruz. Burada dönüş 0 ve 1 olabilir. Çünkü iki yıl üzerinden partitioner işlemini başlatıyoruz. Index olarak düşündüğümüzde 0 2000 yılına, 1 2001 yılına işleniyor. Eğer veri setimiz 2000, 2001 ve 2002 yılları olsaydı bir mod işlemi daha eklememiz gerekecekti.

Son olarak main metodumuz ve run metodumuz

    @Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf, "Partitioner Example");
job.setJarByClass(PartitionerSample.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setMapperClass(MapperClass.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setPartitionerClass(PartitionerClass.class);
job.setReducerClass(ReducerClass.class);
job.setNumReduceTasks(2);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}

Job nesnesine setNumReduceTasks metodu ile 2 tane reduce olacağını belirttik ve partitioner classımızı belirttik. Çalıştırdığımızda out klasörümüzde iki tane dosya olacaktır.

bash-4.1# hadoop dfs -ls kdOrnekler/out
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.
Found 3 items
-rw-r--r--   1 root supergroup          0 2016-05-21 07:48 kdOrnekler/out/_SUCCE
SS
-rw-r--r--   1 root supergroup         15 2016-05-21 07:47 kdOrnekler/out/part-r
-00000
-rw-r--r--   1 root supergroup         15 2016-05-21 07:48 kdOrnekler/out/part-r
-00001
bash-4.1# hadoop dfs -cat kdOrnekler/out/part-r-00000
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.
2000    143929200
bash-4.1# hadoop dfs -cat kdOrnekler/out/part-r-00001
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.
2001    358280000

Tüm dosyaları GitHub sayfamdan indirebilirsiniz.

Bir sonra ki makale de görüşmek üzere.

Facebook'dan yorumla