Değerli KD okuyucuları, bir önce ki yazımızda Big Data kavramına giriş yapmış ve teorik bilgiler edinmiştik. Bugünki yazımda ise teoriden ziyade daha pratik konuların başlangıcını vereceğiz ki bu ilk adımda Apache Hadoop projesi demek oluyor.

İsmi son zamanlarda çokça karşımıza çıkan bu yazılımı daha yakından inceleyeceğiz. Ne zaman ve neden çıktı, içerisinde ki parçalardan, nasıl çalıştığından, hangi amaçlarla kullanıldığından bahsedeceğiz. Fakat kurulum anlatmayacağım bunun yerine Cloudera’nın hazır Sandbox ürününü kullanacağım. Kurulum ile ilgili internette fazlaca makale bulabilirsiniz ama ben hazır yapılmışını kullanmayı tercih ediyorum.

Öncelikle Hadoop’un tarihçesinde bir diğer Apache projesi olan Nutch var diyelim. Nutch; GFS(Google File System) ve MapReduce beraberliğinde baz alınarak geliştirildi. Google da yaptıklarıyla beraber örnek de teşkil etmeye bu şekilde başlamış oldu.

 

HDFS

Gelelim Hadoop ekosistemine. Hadoop Java dili ile yazılmış bir dağıtık dosya sistemidir ve açık kaynaklıdır. HDFS(Hadoop Distirbuted File System) ve MapReduce birimlerinden oluşur. Esas çalışma mantığı birden çok sunucu üzerinde bir Cluster yani küme gibi konumlanarak tek bir veri merkezi olarak davranmasıdır ki açılımında bulunan “dağıtık” kelimesi buradan gelmektedir. Verileri bloklara ayırarak küme üzerine dağıtır. Bunun en büyük avantajı veri kayıplarını azaltır ve hata toleransını arttırır.

 

NameNode

Verilerin sunucular üzerinde ki meta bilgilerini saklar. Verilerin tüm dinamikleri bu bölümde saklanır ve küme üzerinde oluşan hataları çözümler. Bu nedenle her küme üzerinde yalnızca bir tane bulunabilir, veriyi kontrol etmek maksatlı bir birim olduğundan veri saklamaz. Bu bölüme ekosistemin içerisinde “master” adı verilir.

 

DataNode

Verilerin saklandığı düğüm veya düğümlerdir. Kendi verilerini sakladığı gibi diğer DataNode’lar ile haberleşir, yedeklemesini alır. Bu nedenle DataNode’lar için verilerin saklandığı yer veya yerler diyebiliriz. Her kümede birden fazla olabilir. Bu bölüme ekosistemin içerisinde “slave” adı verilir.

MapReduce

HDFS üzerinde ki verileri işlemek için kullanılan algoritmadır. Map ve Reduce olarak iki fonksiyonu barındırır. Map fonksiyonu; istediğimiz parametrelere uygun olan verilere ulaşmamızı sağlar. Reduce fonksiyonu ise ulaştığımız bu verileri belli agrasyonlara tabi tutar. MapReduce için “zurnanın zırt dediği yer” diyebiliriz aslında 🙂

Anlaşılacağı üzere sistemsel ve yazılımsal olarak iki ana konuya ayrılmış durumdayız. Hadoop ile bir küme nasıl oluşturulur ve yönetilir?  Bu kümeye yeni bir üye tanımlamak gerekirse ne yapılmalı? Soruları var olduğu gibi, veriyi işleme noktasına geldiğimizde MapReduce fonksiyonlarını nasıl yazacağız? Sorusu da kendini belli ediyor. İlk önce sistem tarafından söz etmek istiyorum. Cloudera Sandbox’ı üzerinden aşağıda ki adımları izleyerek başlayalım.

Cloudera Servis Parametreleri

Masaüstünde bulunan “Launch Cloudera Express” e tıklayarak “Command” alanına “–force” parametresini ekleyelim ve daha sonra başlatalım.

2

Sonrasında tarayıcıyı açarak yönetim sayfasına gidelim.

HDFS ve YARN servislerini başlattıktan sonra işe koyuabiliriz.

Hadoop’u ilk tanıtırken Java ile yazıldığından bahsetmiştim, dolayısıyla MapReduce fonkiyonlarını da Java dilinde yazacağız. İlerleyen zamanlarda ayrı ele alacağımız olan MapReduce konusunu Java dili olmadan nasıl yazabiliriz sorusuna yanıt bulacağız. Bunun için ayrı makaleler yazacağım. Şimdi gelelim  Java ile olan kısmıa. Öncelikle bize ne lazım? Data 🙂

Şöyle bir senaryo çizelim, elinize bir reklam yönetim sistemi var ve bu sistemin loglarını inceliyoruz. Log dosyamız şu şekilde olsun;

1293868800864,319248,1,flickr.com,12

1293868801728,625828,1,npr.org,19

1293868802592,522177,2,wikipedia.org,16

1293868803456,535052,2,cnn.com,20

1293868804320,287430,2,sfgate.com,2

1293868805184,616809,2,sfgate.com,1

1293868806048,704032,1,nytimes.com,7

1293868806912,631825,2,amazon.com,11

1293868807776,610228,2,npr.org,6

1293868808640,454108,2,twitter.com,18

1293868809504,723726,1,foxnews.com,9

1293868810368,777690,2,cnn.com,2

1293868811232,441293,2,nytimes.com,3

1293868812096,938131,1,bbc.co.uk,18

1293868812960,931279,1,flickr.com,1

1293868813824,651914,1,yahoo.com,20

1293868814688,101535,2,facebook.com,5

1293868815552,909907,1,facebook.com,3

1293868816416,154380,2,amazon.com,1

1293868817280,718757,2,npr.org,13

Burada ne yazdığını şöyle açıklayayım, anlayabileceğiniz üzere delimiter fonksiyonu ile veriler ayrıştırılmış. İlk alan unix tipinde zamanı, ikinci alan kullanıcı kimliğini(Id), üçüncü alan aksiyonu(1 tıklama, 2 görüntüleme) üçüncü alan hangi domain üzerinde yayınlandığını, dördüncü alan ise hangi reklam kampanyasına ait olduğunu belirtiyor. MapReduce fonksiyonu ile bu verileri ayrıştıracağız. Her kampanya için ne kadar tıklama, ne kadar görüntüleme var bunu göreceğiz. Java kodumuz aşağıda ki gibi olacaktır.

package KodumunDunyasi;</code>
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class KampanyaYonetim extends Configured implements Tool
{
static class CustomMapper extends Mapper<Object, Text, IntWritable, IntWritable>
{
@Override
public void map(Object key, Text record, Context context) throws IOException
{
String[] veriler = record.toString().split(",");
String actionStr = veriler[2];
String campaignStr = veriler[4];
try
{
int action = Integer.parseInt(actionStr);
int campaign = Integer.parseInt(campaignStr);
IntWritable mapOutKey = new IntWritable(campaign);
IntWritable mapOutValue = new IntWritable(action);
context.write(mapOutKey, mapOutValue);
} catch (Exception e)
{
System.out.println("Hata olustu");
e.printStackTrace();
}
}
}
public static class CustomReducer extends Reducer<IntWritable, IntWritable, IntWritable, Text>
{
public void reduce(IntWritable key, Iterable results, Context context) throws IOException,
InterruptedException
{
int campaign = key.get();
int views = 0;
int clicks = 0;
for (IntWritable i : results)
{
int action = i.get();
if (action == 1)
views++;
else if (action == 2)
clicks++;
}
String stats = "goruntuleme=" + views + ", tiklama=" + clicks;
context.write(new IntWritable(campaign), new Text(stats));
}
}
public static void main(String[] args) throws Exception
{
int res = ToolRunner.run(new Configuration(), new KampanyaYonetim(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception
{
if (args.length != 2)
{
System.out.println("Girdi ve cikti yollari girilmelidir.");
return 1;
}
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
Configuration conf = getConf();
Job job = new Job(conf, "KampanyaYonetim");
job.setJarByClass(KampanyaYonetim.class);
job.setMapperClass(CustomMapper.class);
job.setReducerClass(CustomReducer.class);
job.setMapOutputValueClass(IntWritable.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextInputFormat.setInputPaths(job, inputPath);
TextOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true) ? 0 : 1;
}
}

Şimdi burada bazı satırlardan bahsetmek gerekiyor. Hadoop’un çekirdek dosyasından aldığımız bir takım şeyler var. Hatırlayacağımız üzere Map ve Reduce olmak üzere iki farklı fonksiyon olduğundan bahsetmiştik. İşte Map fonksiyonunu burada daha yakından inceliyoruz.

 

static class CustomMapper extends Mapper<Object, Text, IntWritable, IntWritable>

Hadoop çekirdeğinde bulunan Mapper objesini kullanarak kendi Mapper’ımızı yazıyoruz ve “map” metodunu override ediyoruz.  “,” işaretini split ederek bize lazım olan verileri önce string, daha sonra int türüne çeviriyoruz. Sonrada context objesine key-value olarak işletiyoruz. Aslında standart bir Java algoritması diyebiliriz bu işlem için.

 

Aynı şekilde Reduce fonksiyonunu yazdık. Yine çekirdek kütüphaneden implement ettiğimiz bir class yaratıyoruz.

public static class CustomReducer extends Reducer<IntWritable, IntWritable, IntWritable, Text>

“reduce” fonksiyonu ilede her bir keyi, results nesneleri ile agrasyona sokuyoruz.

Daha sonra main metodunda bir alt metod kullanarak Hadoop’un MapReduce fonksiyonlarını çalıştırması için yönergeleri belirtiyoruz. Bir Job parametresi ve MapReduce parametreleri dışında hiç birşey yok aslına bakarsınız. Oldukça kolay değil mi?

Şimdi gelelim Hadoop tarafına. Burada Hdfs’in kullanımından da biraz bahsedeceğiz. Aslına bakarsanız standart linux komutlarını aynen burada uygulayabiliriz. Şimdi bu log dosyasını Hdfs içerisini atmamız lazım ki orada işleyebilelim. Önce bir bakalım içeride neler var.

6

Şimdi örnek çalışmamız için bir klasör oluşturalım

7

Tüm komut listesi için https://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-common/FileSystemShell.html adresini ziyaret edebilirsiniz.

Buraya log dosyamızı atacağız. Cloudera’ya dosya bağlantısı yapmanın basit bir formülü var arkadaşlar. Direkt olarak /home klasörüne bağlanabilirsiniz. Filezilla kullanarak Sandbox IP adresi ve 22 portu ile. Kullanıcı adı: cloudera, şifre: cloudera

9

Dosyamızı attıktan sonra Hdfs içerisinde bir input klasörünü açacağız.

10

Şimdi bu klasöre log dosyamızı atalım.

11

Tekrar klasör içerisine bakıyoruz.

12

Dosyamız orada. Şimdi Java dosyasını da Sandbox içine gönderdikten sonra MapReduce işlemini başlatabiliriz. Java dosyasını build ettikten sonra bağımlılıklar için CLASSPATH envanterini kullanmamız gerekiyor. Cloudera için aşağıda ki terminal girdisi işimizi çözecektir.

$export CLASSPATH=/usr/lib/hadoop/client-0.20/\*

Şimdi işlemimizi başlatabiliriz.

13

Görüldüğü gibi Hadoop Map ve Reduce işlemlerini ayrı ayrı işledi. Şimdi gelelim sonuca.

14

MapReduce işlemi sonuçlanmış, agrasyona sokulmuş herşey tamam 🙂