Grazie all’esame di Programmazione Concorrente ,che spero sosterro’ a breve :P, mi sono cimentato con l’implementazione dell’Algoritmo di Clustering K-means utilizzando il framework Hadoop ed i suoi costrutti per la programmazione parallela, l’esempio in javascript puo’ aiutarvi nella comprensione reale dei problemi che e’ possibile risolvere utilizzando K-means.
Nel caso non conosceste ancora Hadoop vi consiglio di leggere il nostro articolo su Hadoop per avere un’idea di base del funzionamento di Hadoop e del paradigma Map-Reduce e sulla configurazione sul proprio computer
Il tutto e’ stato sviluppato con il linguaggio di programmazione Java ed utilizzando il paradigma Map-Combiner-Reduce.
Vi allego subito il codice delle varie classi, dato che sara’ molto piu’ chiaro di una mia eventuale spiegazione ( caspita come mi butto a terra da solo 😀 ).
Purtroppo a causa della paginazione del blog il codice sara’ poco chiaro.. per cui vi bastera’ cliccare sul nome della classe per visualizzare il file sorgente .java
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class KmeansDriver {
public static void main(String[] args) throws Exception {
long start, start_loop, stop, stop_loop;
//start global timer
start = System.currentTimeMillis();
Configuration conf = new Configuration();
boolean converged=true;
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 4) {
System.err.println("Usage: KmeansDriver <in_directory> <out_directory> <in_cluster/cluster_file> num_coord_point");
System.exit(2);
}
int i=0;
do{
//set global coord value from commandline
conf.set("my.num.coord", otherArgs[3]);
if(i==0){
conf.set("my.cluster.coord",otherArgs[2] );
}
else
if(i==1)
conf.set("my.cluster.coord",otherArgs[1]+"0"+"/part-r-00000" );
else
conf.set("my.cluster.coord",otherArgs[1]+(i-1)+"/part-r-00000");
Job job = new Job(conf, "Kmeans_unicondor");
//only one reduce
job.setNumReduceTasks(1);
job.setJarByClass(KmeansDriver.class);
job.setMapperClass(KmeansMapper.class);
job.setCombinerClass(KmeansCombiner.class);
job.setReducerClass(KmeansReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//job.setMapOutputKeyClass(Text.class);
//job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]+i));
//start timer loop
start_loop = System.currentTimeMillis();
job.waitForCompletion(true);
//check if cluster_centroid are equal
if(i>=2)
converged=isConverged(i,Integer.parseInt(otherArgs[3]), otherArgs[1]);
i++;
//stop timer loop
stop_loop = System.currentTimeMillis();
System.out.println("Time_loop_"+(i-1) +": "+ (stop_loop - start_loop)/1000 + " s");
}
while(i<500 && converged);
System.out.println("Clustering...");
//start timer clustering
long start_clustering = System.currentTimeMillis();
clustering(otherArgs[0],otherArgs[1]+"points",conf.get("my.cluster.coord"),otherArgs[3]);
//stop timer clustering
long stop_clustering = System.currentTimeMillis();
System.out.println("Time Clustering: " + (stop_clustering - start_clustering)/1000 + " s");
//stop timer programma
stop = System.currentTimeMillis();
System.out.println("Time Total: " + (stop - start)/1000 + " s");
}
private static boolean isConverged(int iteration, int num_coord, String dir_output) throws IOException {
boolean ret=true;
ByteArrayOutputStream byte1=new ByteArrayOutputStream();
PrintStream out2 = new PrintStream(byte1);
HashMap<String,double[]> cluster= new HashMap<String,double[]>();
HashMap<String,double[]> cluster1= new HashMap<String,double[]>();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new Path(dir_output+(iteration-1)).toUri(),conf);
FSDataInputStream in = null, in1=null;
try {
in = fs.open(new Path(dir_output+(iteration-1)+"/part-r-00000"));
IOUtils.copyBytes(in, out2, 4096, false);
String s=byte1.toString();
String lines[]= s.split("\n");
for(int i=0; i<lines.length; i++){
double[] centers= new double[num_coord];
StringTokenizer itr = new StringTokenizer(lines[i]);
String id_cluster= itr.nextElement().toString();
int j=0;
while(itr.hasMoreElements()){
centers[j]=(Double.parseDouble(itr.nextElement().toString()));
j++;
}
cluster.put(id_cluster, centers);
}
ByteArrayOutputStream byte2=new ByteArrayOutputStream();
PrintStream out3 = new PrintStream(byte2);
FileSystem fs1 = FileSystem.get(new Path(dir_output+iteration).toUri(),conf);
in1 = fs1.open(new Path(dir_output+iteration+"/part-r-00000"));
IOUtils.copyBytes(in1, out3, 4096, false);
String s1=byte2.toString();
String lines1[]= s1.split("\n");
for(int i=0; i<lines1.length; i++){
double[] centers1= new double[num_coord];
StringTokenizer itr = new StringTokenizer(lines1[i]);
String id_cluster= itr.nextElement().toString();
int j=0;
while(itr.hasMoreElements()){
centers1[j]=(Double.parseDouble(itr.nextElement().toString()));
j++;
}
cluster1.put(id_cluster, centers1);
}
}
finally {
IOUtils.closeStream(in);
}
int cont=0;
double[] first_cluster;
double[] second_cluster;
for(String key :cluster.keySet()){
first_cluster = cluster.get(key);
second_cluster = cluster1.get(key);
if(KmeansUtil.isEqualThresHold(first_cluster, second_cluster)) {
cont++;
}
}
if(cont==cluster.size()) {
ret=false;
//debug
System.out.println("PATH is: " +dir_output+(iteration)+"/part-r-00000");
System.out.println("PATH is: " +dir_output+(iteration-1)+"/part-r-00000");
System.out.println("debug all clusters are equal");
System.out.println("HashMap size:"+ cluster.size());
System.out.println("HashMap1 size:"+ cluster1.size());
System.out.println("Cont :" + cont);
}
return ret;
}
public static void clustering(String input, String output,String cluster, String num_coord) throws Exception {
Configuration conf = new Configuration();
conf.set("my.num.coord", num_coord);
conf.set("my.cluster.coord",cluster );
Job job = new Job(conf, "Kmeans_unicondor_clustering");
job.setJarByClass(KmeansDriver.class);
job.setMapperClass(KmeansMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
}
}
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class KmeansMapper extends Mapper<Object, Text, Text, Text>{
private HashMap<String,double[]> meansCluster= new HashMap<String,double[]>();
@Override
public void setup(Context context){
try {
ByteArrayOutputStream byte1=new ByteArrayOutputStream();
PrintStream out2 = new PrintStream(byte1);
String uri = context.getConfiguration().get("my.cluster.coord");
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new Path(uri).toUri(),conf);
FSDataInputStream in = null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, out2, 4096, false);
String s=byte1.toString();
String lines[]= s.split("\n");
for(int i=0; i<lines.length; i++){
double[] centers= new double[Integer.parseInt(context.getConfiguration().get("my.num.coord"))];
StringTokenizer itr = new StringTokenizer(lines[i]);
String id_cluster= itr.nextElement().toString();
int j=0;
while(itr.hasMoreElements()){
centers[j]=(Double.parseDouble(itr.nextElement().toString()));
j++;
}
meansCluster.put(id_cluster, centers);
}
}
finally {
IOUtils.closeStream(in);
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void map(Object key, Text value, Context context ) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
//get global value passing from main arguments
int num_coord= Integer.parseInt(context.getConfiguration().get("my.num.coord"));
Text word = new Text();
word.set(itr.nextElement().toString());
double[] array_points = new double[num_coord];
int k=0;
while (itr.hasMoreTokens()) {
array_points[k]=Double.parseDouble(itr.nextToken());
k++;
}
//get coordinate cluster
double nearestDistanceOut=Double.MAX_VALUE;
double nearestDistance = 100;
double[] array_cluster = new double[num_coord];
String cluster=null;
Set<String> list = meansCluster.keySet();
for(String s:list) {
array_cluster = meansCluster.get(s);
double distance = KmeansUtil.getEuclideanDistance( array_cluster, array_points);
if (distance < nearestDistance) {
nearestDistance = distance;
cluster=s;
}
nearestDistanceOut=nearestDistance;
}
context.write(new Text(cluster), new Text(KmeansUtil.getString(array_points)));
}
}
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Vector;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class KmeansCombiner extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int num_coord=Integer.parseInt(context.getConfiguration().get("my.num.coord"));
int i;
// all points vector
Vector<double[]> all_points_vector = new Vector<double[]>();
for (Text val : values) {
// single vector of points only one centroid
double[] vec_point_by_key= new double[num_coord];
StringTokenizer itr = new StringTokenizer(val.toString());
i=0;
while(itr.hasMoreElements()) {
vec_point_by_key[i]=Double.parseDouble(itr.nextElement().toString());
i++;
}
all_points_vector.add(vec_point_by_key);
}
double sum;
double[] vec_point_get_by_key= new double[num_coord];
double[] centroid_cluster_partial_sum = new double[num_coord];
int cont=0;
for(int j=0; j<num_coord;j++){
cont=0;
sum=0;
vec_point_get_by_key= new double[num_coord];
for(int k=0; k<all_points_vector.size(); k++){
vec_point_get_by_key=(double[])all_points_vector.get(k);
sum+= vec_point_get_by_key[j];
cont++;
}
centroid_cluster_partial_sum[j]=sum;
}
context.write(key, new Text(Integer.toString(cont)+ " "+ KmeansUtil.getStringPadding(centroid_cluster_partial_sum)));
}
}
import java.io.IOException;import java.util.StringTokenizer;import java.util.Vector;import java.util.Iterator;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class KmeansReducer extends Reducer<Text,Text,Text,Text> {public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {int num_coord=Integer.parseInt(context.getConfiguration().get("my.num.coord"));// Double vector all points of certain centroidVector<double[]> all_points_vector = new Vector<double[]>();int i;int total_points=0;for( Text val : values){StringTokenizer itr = new StringTokenizer(val.toString());//sum number of elementtotal_points+= Integer.parseInt(itr.nextToken());double[] vec_point_by_key= new double[num_coord];i=0;while(itr.hasMoreElements()) {vec_point_by_key[i]=Double.parseDouble(itr.nextElement().toString());i++;}all_points_vector.add(vec_point_by_key);}double sum;double[] vec_point_get_by_key= new double[num_coord];double[] new_centroid_cluster = new double[num_coord];for(int j=0; j<num_coord;j++){sum=0;vec_point_get_by_key= new double[num_coord];for(int k=0; k<all_points_vector.size(); k++){vec_point_get_by_key=(double[])all_points_vector.get(k);sum+= vec_point_get_by_key[j];}new_centroid_cluster[j]= sum/total_points;}context.write(key, new Text(KmeansUtil.getStringPadding(new_centroid_cluster)));}}
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.util.List;
public class KmeansUtil {
public static double getEuclideanDistance(double[] a,double[] b){
if ( a.length != b.length ){
throw new RuntimeException("Attempting to compare two array (cluster and point) of different dimensions");
}
double sum = 0;
for ( int i = 0; i < a.length; i++ ){
double diff = a[i] - b[i];
sum += diff*diff;
}
return Math.sqrt(sum);
}
public static double getEuclideanDistance(float[] a,float[] b){
if ( a.length != b.length ){
throw new RuntimeException("Attempting to compare two array (cluster and point) of different dimensions");
}
double sum = 0;
for ( int i = 0; i < a.length; i++ ){
double diff = a[i] - b[i];
sum += diff*diff;
}
return Math.sqrt(sum);
}
public static double sumDifferences(List a, List b){
assert(a.size() == b.size());
double sumDiff = 0;
double aSum = 0;
double bSum = 0;
for ( int i = 0; i < a.size(); i++ ){
sumDiff += Math.abs(a.get(i) - b.get(i));
aSum += a.get(i);
bSum += b.get(i);
}
return sumDiff;
}
public static String getString(double[] array){
StringBuilder result = new StringBuilder();
for ( int idx = 0 ; idx < array.length ; idx++ ) {
//= Array.get(aArray, idx);
result.append(String.valueOf(array[idx]));
result.append(" ");
}
return result.toString();
}
public static String getStringPadding(double[] array){
StringBuilder result = new StringBuilder();
for ( int idx = 0 ; idx < array.length ; idx++ ) {
//= Array.get(aArray, idx);
DecimalFormat df= new DecimalFormat();
DecimalFormatSymbols dfs = new DecimalFormatSymbols();
dfs.setDecimalSeparator('.');
df.setDecimalFormatSymbols(dfs);
result.append(String.valueOf(String.format("%.6f",array[idx])).replace(",", "."));
result.append(" ");
}
return result.toString();
}
public static boolean isEqual(double[] a,double[] b){
boolean ret=false;
if ( a.length != b.length ){
throw new RuntimeException("Attempting to compare two array (cluster_old and cluster_new) of different dimensions");
}
int p=0;
for ( int i = 0; i < a.length; i++ ){
if (a[i] == b[i]) p++;
}
if(p==a.length) ret= true;
return ret;
}
public static boolean isEqualThresHold(double[] a,double[] b){
//ThresHold = 0.001
boolean ret=false;
if ( a.length != b.length ){
throw new RuntimeException("Attempting to compare two array (cluster_old and cluster_new) of different dimensions");
}
int p=0;
for ( int i = 0; i < a.length; i++ ){
if (a[i] == b[i]|| Math.abs(a[i]-b[i])<0.001) p++;
}
if(p==a.length) ret= true;
return ret;
}
}
Esempio completo del progetto
Kmeans.jar
Non mi resta altro che ringraziare la Dott.ssa Rita Di Candia per il prezioso aiuto nella realizzazione del progetto 😀
Ciao,
che versione di hadoop hai utilizzato?
hadoop-0.20.2, quando ho iniziato il progetto era l’unica stabile. Ora se non sbaglio quella stabile è la 0.21.. ma come costrutti ed implementazione del codice non cambia nulla rispetto alla 0.21.
Infatti questa versione del mio algoritmo gira con le nuove versioni di hadoop dato che, come sicuramente avrai notato, il Driver ha il nuovo sistema di gestione del Job job = new Job(conf, “Kmeans_unicondor”); e non il vecchio JobConf..
Cmq è stata una bella esperienza mettere mani vicino al framework 😀
a presto
Flavio
Hi Favio,
Thank you for making available your source code. I am very interested in testing it in our cluster. Do you have some examples of input datasets and/or running command lines? I already have a Hadoop cluster configured here.
Thank you again.
Marcelo
Hi Marcello,
Below this post there is a link on my final paper about OpenMPI vs Hadoop on the cluster of my University. Hadoop run on 4 node quad-core 2.8 Ghz, there 1 node that’s Nameserver and also JobTracker.
The input file was:
a) 1 million of points each with 20 coordinates
b) 5 million of points each with 20 coordinates
First calculating 10 cluster centers, 20 cluster centers and 50 cluster centers
I’m sorry but the documentation it’s only Italian.. i hope you can understand it 😀
http://www.vitadastudente.it/file_pdf/OpenMPI_vs_Hadoop.pdf
Flavio
Thank you very much for the quickly response. Your paper will be very useful for me. Now, I will try to reproduce your tests here.
Regards,
Marcelo
Hi Flavio,
I am trying to build you program, but it seems to be missing a class (KmeansUtil). I found a page in google code for you project:
http://code.google.com/p/kmeans-hadoop/
Which code you recommend to be used?
Thank you in advance,
Marcelo
Thanks Marcelo,
I update the post.. at the post’s end you can download both the KmeansUtil.java and the complete project Kmens.jar.
Flavio
Buen comienzo