Hadoop: Implementazione Algoritmo Clustering K-means

Grazie all’esame di Programmazione Concorrente ,che spero sosterro’ a breve :P, mi sono cimentato con l’implementazione dell’Algoritmo di Clustering K-means utilizzando il framework Hadoop ed i suoi costrutti per la programmazione parallela, l’esempio in javascript puo’ aiutarvi nella comprensione reale dei problemi che e’ possibile risolvere utilizzando K-means.

Nel caso non conosceste ancora Hadoop vi consiglio di leggere il nostro articolo su Hadoop per avere un’idea di base del funzionamento di Hadoop e del paradigma Map-Reduce e sulla configurazione sul proprio computer

Il tutto e’ stato sviluppato con il linguaggio di programmazione Java ed utilizzando il paradigma Map-Combiner-Reduce.

 

 

Vi allego subito il codice delle varie classi, dato che sara’ molto piu’ chiaro di una mia eventuale spiegazione ( caspita come mi butto a terra da solo 😀 ).

Purtroppo a causa della paginazione del blog il codice sara’ poco chiaro.. per cui vi bastera’ cliccare sul nome della classe per visualizzare il file sorgente .java

 

KmeansDriver.java


import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;

import java.util.HashMap;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class KmeansDriver {

	public static void main(String[] args) throws Exception {

        long start, start_loop, stop, stop_loop;

		//start global timer
		start = System.currentTimeMillis();

		Configuration conf = new Configuration();

		boolean converged=true;

		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

			if (otherArgs.length != 4) {
				System.err.println("Usage: KmeansDriver <in_directory> <out_directory> <in_cluster/cluster_file> num_coord_point");
				System.exit(2);
			}

			int i=0;

			do{
			//set global coord value from commandline
			conf.set("my.num.coord", otherArgs[3]);

			if(i==0){
			conf.set("my.cluster.coord",otherArgs[2] );
			}
			else
				if(i==1)
				conf.set("my.cluster.coord",otherArgs[1]+"0"+"/part-r-00000" );
				else
					conf.set("my.cluster.coord",otherArgs[1]+(i-1)+"/part-r-00000");

			Job job = new Job(conf, "Kmeans_unicondor");
			//only one reduce
			job.setNumReduceTasks(1);
			job.setJarByClass(KmeansDriver.class);
			job.setMapperClass(KmeansMapper.class);
			job.setCombinerClass(KmeansCombiner.class);
			job.setReducerClass(KmeansReducer.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
			//job.setMapOutputKeyClass(Text.class);
			//job.setMapOutputValueClass(Text.class);
			FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
			FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]+i));

			//start timer loop
			start_loop = System.currentTimeMillis();

			job.waitForCompletion(true);

            //check if cluster_centroid are equal
			if(i>=2)

            		converged=isConverged(i,Integer.parseInt(otherArgs[3]), otherArgs[1]);

             	i++;

               	//stop timer loop
               	stop_loop = System.currentTimeMillis();

               	System.out.println("Time_loop_"+(i-1) +": "+ (stop_loop - start_loop)/1000 + " s");

				}

			while(i<500 && converged);

			System.out.println("Clustering...");

			//start timer clustering
			long start_clustering = System.currentTimeMillis();

			clustering(otherArgs[0],otherArgs[1]+"points",conf.get("my.cluster.coord"),otherArgs[3]);

			//stop timer clustering
			long stop_clustering = System.currentTimeMillis();

			System.out.println("Time Clustering: " + (stop_clustering - start_clustering)/1000 + " s");

			//stop timer programma
			stop = System.currentTimeMillis();
			System.out.println("Time Total: " + (stop - start)/1000 + " s");

	}

	private static boolean isConverged(int iteration, int num_coord, String dir_output) throws IOException {

		    boolean ret=true;

		    ByteArrayOutputStream byte1=new ByteArrayOutputStream();
            PrintStream out2 = new PrintStream(byte1);

            HashMap<String,double[]> cluster= new HashMap<String,double[]>();
            HashMap<String,double[]> cluster1= new HashMap<String,double[]>();

    		Configuration conf = new Configuration();

            FileSystem fs = FileSystem.get(new Path(dir_output+(iteration-1)).toUri(),conf);

            FSDataInputStream in = null, in1=null;

            try {

            in = fs.open(new Path(dir_output+(iteration-1)+"/part-r-00000"));

            IOUtils.copyBytes(in, out2, 4096, false);
            String s=byte1.toString();

            String lines[]= s.split("\n");

            	for(int i=0; i<lines.length; i++){

            		double[] centers= new double[num_coord];

            		StringTokenizer itr = new StringTokenizer(lines[i]);
            		String id_cluster= itr.nextElement().toString();
                    int j=0;
                    	while(itr.hasMoreElements()){
                    		centers[j]=(Double.parseDouble(itr.nextElement().toString()));
                    		j++;
                    	}

            		cluster.put(id_cluster, centers);

            	}

            	ByteArrayOutputStream byte2=new ByteArrayOutputStream();
                PrintStream out3 = new PrintStream(byte2);

                FileSystem fs1 = FileSystem.get(new Path(dir_output+iteration).toUri(),conf);
            	in1 = fs1.open(new Path(dir_output+iteration+"/part-r-00000"));

                IOUtils.copyBytes(in1, out3, 4096, false);
                String s1=byte2.toString();

               String lines1[]= s1.split("\n");

            	for(int i=0; i<lines1.length; i++){

            		double[] centers1= new double[num_coord];           

            		StringTokenizer itr = new StringTokenizer(lines1[i]);
            		String id_cluster= itr.nextElement().toString();
                    int j=0;

                    	while(itr.hasMoreElements()){
                    		centers1[j]=(Double.parseDouble(itr.nextElement().toString()));
                    		j++;
                    	}

            		cluster1.put(id_cluster, centers1);

            	}

            } 

            finally {
            IOUtils.closeStream(in);
            }

            int cont=0;

            double[] first_cluster;
            double[] second_cluster;

            for(String key :cluster.keySet()){

     		   first_cluster = cluster.get(key);
     		   second_cluster = cluster1.get(key);

     		   		if(KmeansUtil.isEqualThresHold(first_cluster, second_cluster)) {
     		   			cont++;

     		   		}

            }

            	if(cont==cluster.size()) {
            		ret=false;

        		//debug
            		System.out.println("PATH is: " +dir_output+(iteration)+"/part-r-00000");
            		System.out.println("PATH is: " +dir_output+(iteration-1)+"/part-r-00000");
              		System.out.println("debug all clusters are equal");
            		System.out.println("HashMap size:"+ cluster.size());
            		System.out.println("HashMap1 size:"+ cluster1.size());
            		System.out.println("Cont :" + cont);
            	}

            return ret;

		  }

	public static void clustering(String input, String output,String cluster, String num_coord) throws Exception {

		Configuration conf = new Configuration();
		conf.set("my.num.coord", num_coord);
		conf.set("my.cluster.coord",cluster );

		Job job = new Job(conf, "Kmeans_unicondor_clustering");
		job.setJarByClass(KmeansDriver.class);
		job.setMapperClass(KmeansMapper.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);	

		FileInputFormat.addInputPath(job, new Path(input));
		FileOutputFormat.setOutputPath(job, new Path(output));

		 job.waitForCompletion(true);

	}

}

KmenasMapper.java



import java.io.ByteArrayOutputStream;

import java.io.IOException;

import java.io.PrintStream;

import java.util.HashMap;

import java.util.Iterator;

import java.util.Set;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class KmeansMapper extends Mapper<Object, Text, Text, Text>{

	  

	

	 

	 private HashMap<String,double[]> meansCluster= new HashMap<String,double[]>();

 		 

	 @Override

	   public void setup(Context context){

		

		  

		   

		    	 try {

		    		

			          

		             ByteArrayOutputStream byte1=new ByteArrayOutputStream();

		             PrintStream out2 = new PrintStream(byte1);

		             

		             

		                String uri = context.getConfiguration().get("my.cluster.coord");

		                Configuration conf = new Configuration();

		               FileSystem fs = FileSystem.get(new Path(uri).toUri(),conf);

		               

		                FSDataInputStream in = null;

		                try {

		                in = fs.open(new Path(uri));

		                IOUtils.copyBytes(in, out2, 4096, false);   

		                String s=byte1.toString();

		                

		                

		               

		                	String lines[]= s.split("\n");

		                	

		                	for(int i=0; i<lines.length; i++){

		                	

		                		

		                    

		                		double[] centers= new double[Integer.parseInt(context.getConfiguration().get("my.num.coord"))];

		                		

		                		StringTokenizer itr = new StringTokenizer(lines[i]);

		                		String id_cluster= itr.nextElement().toString();

		                        int j=0;

		                        	while(itr.hasMoreElements()){

		                        		centers[j]=(Double.parseDouble(itr.nextElement().toString()));

		                        		j++;

		                        	}

		                			

		                		meansCluster.put(id_cluster, centers);

		                   

		                	}

		                   

		                  

		            

		                } 

		                finally {

		                IOUtils.closeStream(in);

		                }

		                

		                

		                         

		               

		                

		        } catch (IOException e) {

		                e.printStackTrace();

		        }

		   

		   

	   }

	   

			   

		

	 public void map(Object key, Text value, Context context ) throws IOException, InterruptedException {

	

		 StringTokenizer itr = new StringTokenizer(value.toString());

		 //get global value passing from main arguments

		 int num_coord= Integer.parseInt(context.getConfiguration().get("my.num.coord"));

		 Text word = new Text();

		 word.set(itr.nextElement().toString());

		 double[] array_points = new double[num_coord];

		 int k=0;

		 	while (itr.hasMoreTokens()) {

		 		array_points[k]=Double.parseDouble(itr.nextToken());

	

		 		

		 		k++;

	

		 	}

		 	//get coordinate cluster

		 	double nearestDistanceOut=Double.MAX_VALUE;

		 	double nearestDistance = 100;

		 	double[] array_cluster = new double[num_coord];

		 	String cluster=null;

		 	

		 	Set<String> list = meansCluster.keySet();

		 	for(String s:list) {

   	

		 		array_cluster = meansCluster.get(s);

   	   

   	      	        

   	   		double distance = KmeansUtil.getEuclideanDistance( array_cluster, array_points);

   	   			if (distance < nearestDistance) {

   	    			nearestDistance = distance;

   	   				cluster=s;

   	   			}

	            nearestDistanceOut=nearestDistance;

   	   

	            

	           

   	   

   	   }

  	     

      context.write(new Text(cluster), new Text(KmeansUtil.getString(array_points)));

	 }

}

KmeansCombiner.java


import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Vector;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class KmeansCombiner extends Reducer<Text,Text,Text,Text> {

	public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

		int num_coord=Integer.parseInt(context.getConfiguration().get("my.num.coord"));

		int i;

		// all points vector
		Vector<double[]> all_points_vector = new Vector<double[]>();

		 for (Text val : values) {

			 // single vector of points only one centroid
			 double[] vec_point_by_key= new double[num_coord];

			StringTokenizer itr = new StringTokenizer(val.toString());

			    i=0;

			 	while(itr.hasMoreElements()) {

			 		vec_point_by_key[i]=Double.parseDouble(itr.nextElement().toString());

			 		i++;

			 	}

			 	all_points_vector.add(vec_point_by_key);

		  }

	     double sum;
	     double[] vec_point_get_by_key= new double[num_coord];
	     double[] centroid_cluster_partial_sum = new double[num_coord];
	     int cont=0;

		 for(int j=0; j<num_coord;j++){

			 cont=0;
			 sum=0;
			 vec_point_get_by_key= new double[num_coord];

			 	for(int k=0; k<all_points_vector.size(); k++){

			 		vec_point_get_by_key=(double[])all_points_vector.get(k);
			 		sum+= vec_point_get_by_key[j];

				cont++;

			 	}

			 	centroid_cluster_partial_sum[j]=sum;

		 }

		 context.write(key, new Text(Integer.toString(cont)+ " "+ KmeansUtil.getStringPadding(centroid_cluster_partial_sum)));

	}
}

KmeansReducer.java

 

 

 

import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Vector;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class KmeansReducer extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int num_coord=Integer.parseInt(context.getConfiguration().get("my.num.coord"));
// Double vector all points of certain centroid
Vector<double[]> all_points_vector = new Vector<double[]>();
int i;
int total_points=0;
for( Text val : values){
StringTokenizer itr = new StringTokenizer(val.toString());
//sum number of element
total_points+= Integer.parseInt(itr.nextToken());
double[] vec_point_by_key= new double[num_coord];
i=0;
while(itr.hasMoreElements()) {
vec_point_by_key[i]=Double.parseDouble(itr.nextElement().toString());
i++;
}
all_points_vector.add(vec_point_by_key);
}
double sum;
double[] vec_point_get_by_key= new double[num_coord];
double[] new_centroid_cluster = new double[num_coord];
for(int j=0; j<num_coord;j++){
sum=0;
vec_point_get_by_key= new double[num_coord];
for(int k=0; k<all_points_vector.size(); k++){
vec_point_get_by_key=(double[])all_points_vector.get(k);
sum+= vec_point_get_by_key[j];
}
new_centroid_cluster[j]= sum/total_points;
}
context.write(key, new Text(KmeansUtil.getStringPadding(new_centroid_cluster)));
}
}

KmeansUtil.java



import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.util.List;

public class KmeansUtil {
        public static double getEuclideanDistance(double[] a,double[] b){
                if ( a.length != b.length ){
                        throw new RuntimeException("Attempting to compare two array (cluster and point) of different dimensions");
                }

                double sum = 0;
                for ( int i = 0; i < a.length; i++ ){
                        double diff = a[i] - b[i];
                        sum += diff*diff;
                }
                return Math.sqrt(sum);
        }

        public static double getEuclideanDistance(float[] a,float[] b){
                if ( a.length != b.length ){
                        throw new RuntimeException("Attempting to compare two array (cluster and point) of different dimensions");
                }

                double sum = 0;
                for ( int i = 0; i < a.length; i++ ){
                        double diff = a[i] - b[i];
                        sum += diff*diff;
                }
                return Math.sqrt(sum);
        }

        public static double sumDifferences(List a, List b){
                assert(a.size() == b.size());
                double sumDiff = 0;
                double aSum = 0;
                double bSum = 0;
                for ( int i = 0; i < a.size(); i++ ){
                        sumDiff += Math.abs(a.get(i) - b.get(i));
                        aSum += a.get(i);
                        bSum += b.get(i);
                }
                return sumDiff;
        }

        public static String getString(double[] array){

             StringBuilder result = new StringBuilder();

        	for ( int idx = 0 ; idx < array.length ; idx++ ) {
            //= Array.get(aArray, idx);
        		result.append(String.valueOf(array[idx]));
        		result.append(" ");

        	}
        	return result.toString();
        }

        public static String getStringPadding(double[] array){

            StringBuilder result = new StringBuilder();

       	for ( int idx = 0 ; idx < array.length ; idx++ ) {
           //= Array.get(aArray, idx);
       		DecimalFormat df= new DecimalFormat();
       		DecimalFormatSymbols dfs = new DecimalFormatSymbols();
       		dfs.setDecimalSeparator('.');
       		df.setDecimalFormatSymbols(dfs);

       		result.append(String.valueOf(String.format("%.6f",array[idx])).replace(",", "."));
       		result.append(" ");

       	}
       	return result.toString();
       }

        public static boolean isEqual(double[] a,double[] b){

        	boolean ret=false;

        	if ( a.length != b.length ){
                throw new RuntimeException("Attempting to compare two array (cluster_old and cluster_new) of different dimensions");

           }
        	 int p=0;

        	 for ( int i = 0; i < a.length; i++ ){
                 if (a[i] == b[i]) p++;

            }

        	 if(p==a.length) ret= true;

        	return ret;
        }

 public static boolean isEqualThresHold(double[] a,double[] b){
	       //ThresHold = 0.001

        	boolean ret=false;

        	if ( a.length != b.length ){
                throw new RuntimeException("Attempting to compare two array (cluster_old and cluster_new) of different dimensions");

           }
        	 int p=0;

        	 for ( int i = 0; i < a.length; i++ ){
                 if (a[i] == b[i]|| Math.abs(a[i]-b[i])<0.001) p++;

            }

        	 if(p==a.length) ret= true;

        	return ret;
        }
}

Esempio completo del progetto
Kmeans.jar

Non mi resta altro che ringraziare la Dott.ssa Rita Di Candia per il prezioso aiuto nella realizzazione del progetto 😀

 

8 risposte a “Hadoop: Implementazione Algoritmo Clustering K-means”

    1. hadoop-0.20.2, quando ho iniziato il progetto era l’unica stabile. Ora se non sbaglio quella stabile è la 0.21.. ma come costrutti ed implementazione del codice non cambia nulla rispetto alla 0.21.
      Infatti questa versione del mio algoritmo gira con le nuove versioni di hadoop dato che, come sicuramente avrai notato, il Driver ha il nuovo sistema di gestione del Job job = new Job(conf, “Kmeans_unicondor”); e non il vecchio JobConf..
      Cmq è stata una bella esperienza mettere mani vicino al framework 😀

      a presto

      Flavio

  1. Hi Favio,

    Thank you for making available your source code. I am very interested in testing it in our cluster. Do you have some examples of input datasets and/or running command lines? I already have a Hadoop cluster configured here.

    Thank you again.
    Marcelo

    1. Hi Marcello,

      Below this post there is a link on my final paper about OpenMPI vs Hadoop on the cluster of my University. Hadoop run on 4 node quad-core 2.8 Ghz, there 1 node that’s Nameserver and also JobTracker.

      The input file was:
      a) 1 million of points each with 20 coordinates

      b) 5 million of points each with 20 coordinates

      First calculating 10 cluster centers, 20 cluster centers and 50 cluster centers

      I’m sorry but the documentation it’s only Italian.. i hope you can understand it 😀

      http://www.vitadastudente.it/file_pdf/OpenMPI_vs_Hadoop.pdf

      Flavio

      1. Thank you very much for the quickly response. Your paper will be very useful for me. Now, I will try to reproduce your tests here.

        Regards,
        Marcelo

        1. Thanks Marcelo,

          I update the post.. at the post’s end you can download both the KmeansUtil.java and the complete project Kmens.jar.

          Flavio

Lascia un commento

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *

Questo sito usa Akismet per ridurre lo spam. Scopri come i tuoi dati vengono elaborati.