MapReduce
MapReduce es un modelo de programación para dar soporte a la computación paralela sobre grandes colecciones de datos en grupos de computadoras y al commodity computing. El nombre del framework está inspirado en los nombres de dos importantes métodos, macros o funciones en programación funcional: Map y Reduce. MapReduce ha sido adoptado mundialmente, ya que existe una implementación OpenSource denominada Hadoop. Su desarrollo fue liderado inicialmente por Yahoo y actualmente lo realiza el proyecto Apache. En esta década de los años 2010 existen diversas iniciativas similares a Hadoop tanto en la industria como en el ámbito académico. Se han escrito implementaciones de bibliotecas de MapReduce en diversos lenguajes de programación como C++, Java y Python.
MapReduce se emplea en la resolución práctica de algunos algoritmos susceptibles de ser paralelizados.[1] No obstante MapReduce no es la solución para cualquier problema, de la misma forma que cualquier problema no puede ser resuelto eficientemente por MapReduce.[2] Por regla general se abordan problemas con datasets de gran tamaño, alcanzando los petabytes de tamaño. Es por esta razón por la que este framework suele ejecutarse en sistema de archivos distribuidos (HDFS).
Historia
Las primeras implementaciones de Google necesitaban realizar operaciones de multiplicación de grandes matrices para calcular el PageRank, esto es, la clasificación de páginas en una búsqueda. De esta forma se hizo popular MapReduce como un método de cálculo de álgebra lineal. La preocupación por tratar grandes colecciones de datos, llevó a crear algoritmos y frameworks capaces de poder procesar terabytes de información. Una de las primeras aplicaciones capaces de programar MapReduce fue implementado inicialmente en Hadoop, diseñado inicialmente por Doug Cutting,[3] que lo nombró así por el elefante de juguete de su hijo.[4] Fue desarrollado originalmente para apoyar la distribución del proyecto de motor de búsqueda Nutch.[5]
Concepto
No todos los procesos pueden ser abordados desde el framework MapReduce. Concretamente son abordables sólo aquellos que se pueden disgregar en las operaciones de map() y de reduce() y esto es importante a la hora de poder elegir este framework para resolver un problema. Las funciones Map y Reduce están definidas ambas con respecto a datos estructurados en tuplas del tipo (clave, valor).
Función Map()
Map toma uno de estos pares de datos con un tipo en un dominio de datos, y devuelve una lista de pares en un dominio diferente:
Map(k1,v1) -> list(k2,v2)
- La función map(): se encarga del mapeo y es aplicada en paralelo para cada ítem en la entrada de datos. Esto produce una lista de pares (k2,v2) por cada llamada. Después de eso, el framework de MapReduce junta todos los pares con la misma clave de todas las listas y los agrupa, creando un grupo por cada una de las diferentes claves generadas. Desde el punto de vista arquitectural el nodo master toma el input, lo divide en pequeñas piezas o problemas de menor identidad, y los distribuye a los denominados worker nodes. Un worker node puede volver a sub-dividir, dando lugar a una estructura arbórea. El worker node procesa el problema y pasa la respuesta al nodo maestro.
Función Reduce()
La función reduce es aplicada en paralelo para cada grupo, produciendo una colección de valores para cada dominio:
Reduce(k2, list (v2)) -> list(v3)
- La función reduce(): cada llamada a Reduce típicamente produce un valor v3 o una llamada vacía, aunque una llamada puede retornar más de un valor. El retorno de todas esas llamadas se recoge como la lista de resultado deseado.
Por lo tanto, el framework MapReduce transforma una lista de pares (clave, valor) en una lista de valores. Este comportamiento es diferente de la combinación "map and reduce" de programación funcional, que acepta una lista arbitraria de valores y devuelve un valor único que combina todos los valores devueltos por mapa.
Arquitectura del MapReduce
La función map() se ejecuta de forma distribuida a lo largo de varias máquinas. Los datos de entrada, procedentes por regla general de un gran archivo (fichero), se dividen en un conjunto de M particiones de entrada de generalmente 16 a 64 megabytes. Estas particiones pueden ser procesadas en diversas máquinas. En una invocación de MapReduce suelen ocurrir varias operaciones:
- Se procede a dividir las entradas en M particiones de tamaño aproximado de 16 a 64 megabytes. El programa MapReduce se comienza a instanciar en las diversas máquinas del cluster. Por regla general, el número de instancias se configura en las aplicaciones.
- Una de las copias del programa es especial y toma el papel de "maestro". El resto de copias se denominan como "workers" y reciben la asignación de sus tareas desde el master. Se considera que existen una cantidad de M map() tareas y de R reduce(). El "maestro" se encarga de recopilar "workers" en reposo (es decir sin tarea asignada) y le asignará una tarea específica de map() o de reduce(). Un worker sólo puede tener tres estados: reposo, trabajando, completo.
- Un worker que tenga asignada una tarea específica de map() tomará como entrada la partición que le corresponda. Se dedicará a parsear los pares (clave, valor) para crear una nueva pareja de salida, tal y como se específica en su programación. Los pares clave y valor producidos por la función map() se almacenan como buffer en la memoria.
- Periódicamente, los pares clave-valor almacenados en el buffer se escriben en el disco local, repartidos en R regiones. Las regiones de estos pares clave-valor son pasados al master, que es responsable de redirigir a los "workers" que tienen tareas de reduce().
- Cuando un worker de tipo reduce es notificado por el "maestro" con la localización de una partición, éste emplea llamadas remotas para hacer lecturas de la información almacenada en los discos duros de los diversos workers de tipo map(). Cuando un worker de tipo reduce() lee todos los datos intermedios, ordena las claves de tal modo que se agrupen los datos encontrados que poseen la misma clave. El ordenamiento es necesario debido a que, por regla general, muchas claves de funciones map() diversas pueden ir a una misma función reduce(). En aquellos casos en los que la cantidad de datos intermedios sean muy grandes, se suele emplear un ordenamiento externo.
- El worker de tipo reduce() itera sobre el conjunto de valores ordenados intermedios, y lo hace por cada una de las claves únicas encontradas. Toma la clave y el conjunto de valores asociados a ella y se los pasa a la función reduce(). La salida de reduce() se añade al archivo (fichero) de salida de MapReduce.
- Cuando todas las tareas map() y reduce() se han completado, el "maestro" levanta al programa del usuario. Llegados a este punto la llamada MapReduce retorna el control al código de un usuario.
Se considera que ha habido un final de las tareas cuando este control se ha devuelto al usuario. Las salidas se ditribuyen en un fichero completo, o en su defecto se reparten en R ficheros. Estos R ficheros pueden ser la entrada de otro MapReduce o puede ser procesado por cualquier otro programa que necesite estos datos.
Combinador (Agregadores locales)
En un entorno de clusterización, una de las límitaciones se encuentra en el transporte de grandes ficheros entre ordenadores debido a lo limitado de su ancho de banda. En el framework MapReduce la función map() escribe en una memoria intermedia de carácter local, como puede ser un disco duro. La información que se escribe localmente es agregada y ordenada por una función agregadora encargada de realizar esta operación. Los valores ordenados son de la forma [k, [v1, v2, v3, ..., vn]]. De esta forma la función reduce() recibe una lista de valores asociados a una única clave procedente del combinador. Debido a que la latencia de red de ordenadores, y de sus discos suele ser mayor que cualquier otra de las operaciones, cualquier reducción en la cantidad de datos intermedios incrementará la eficiencia de los algoritmos. En MapReduce, cualquier agregación local de los resultados intermedios causa una mejora real de la eficiencia global.
Es por esta razón por la que muchas distribuciones oficiales de MapReduce suelen incluir operaciones de agregación en local, mediante el uso de funciones capaces de agregar datos localmente. Evitando, o reduciendo en la medida de lo posible el movimiento de grandes ficheros. Bien sea añadidas a las funciones map(), o a los agregadores locales.
Tolerancia a Fallos
El mecanismo de MapReduce es tolerante a fallos cuando uno de los workers se ve sometido a un fallo. Como MapReduce se ha diseñado para procesos en los que se encuentran involucrados grandes tamaños de datos mediante el empleo de cientos o miles de ordenadores. Aún siendo la probabilidad de fallo baja, es muy posible que uno (o varios) de los workers quede desactivado precisamente por fallo de la máquina que le daba soporte. El "master" periódicamente hace ping a cada worker para comprobar su estatus.
Si no existe respuesta tras un cierto instante de espera, el master interpreta que el worker está desactivado. Cualquier tarea map() que ha sido completa por el worker regresa de inmediato a su estado de espera, y por lo tanto puede resultar elegible para su asignación en otros workers. De forma similar, cualquier función map() (o reduce) que se encuentre en progreso durante el fallo, se resetea a estado de reposo pudiendo ser elegida para su nueva re-asignación.
Las tareas de map() completados se vuelven a re-ejecutar ante un fallo debido en parte a que su salida se almacena en los discos locales de la máquina que falló, y por lo tanto se consideran inaccesibles. Las tareas reduce() completas no son necesarias volver a ser re-ejecutadas debido a que su salida se ha almacenado en el sistema global. cuando la tarea de map() se ejecuta por un worker A y luego por un worker B (debido principalmente a un fallo), en este caso todas las tareas reduce() son notificadas para que eliminen datos procedentes del worker A y acepten las del worker B. De esta forma la ejecución de MapReduce es resiliente.
Ejemplos
En la descripción de los ejemplos de uso de MapReduce sólo es necesario describir en detalle como se implementan las operaciones de map() y de reduce() en cada caso. La literatura muestra ejemplos reiterados de conteo de palabras en un documento, de operaciones matriciales y de operaciones de consulta a bases de datos relacionales.
Conteo de palabras
Este ejemplo de MapReduce es un proceso para contar las apariciones de cada palabra en un conjunto de documentos:
map(String name, String document):
// clave: nombre del documento
// valor: contenido del documento
for each word w in document:
EmitIntermediate(w, 1);
La función map() en este caso divide un documento en palabras (es decir lo tokeniza) mediante el empleo de un simple analizador léxico, y emite una serie de tuplas de la foma (clave, valor) donde la clave es la palabra y el valor es "1". Es decir, por ejemplo, del documento "La casa de la pradera" la función map retornaría: ("la", "1"), ("casa", "1"), ("de", "1"), ("la", "1"), ("pradera", "1").
reduce(String word, Iterator partialCounts):
// word: una palabra
// partialCounts: una [[Iterador (patrón de diseño)|lista parcial]] para realizar cuentas agregadas
int result = 0;
for each v in partialCounts:
result += ParseInt(v);
Emit(result);
Aquí, cada documento es dividido en palabras, y cada palabra se cuenta con valor inicial "1" por la función Map, utilizando la palabra como el resultado clave. El framework reúne todos los pares con la misma clave y se alimenta a la misma llamada Reduce, por lo tanto, esta función sólo necesita la suma de todos los valores de su entrada para encontrar el total de las apariciones de esa palabra. En el ejemplo anterior ("la", "1") aparece dos veces debido a que la clave "la" tiene dos ocurrencias, el resto de claves sólo aparece una vez.
Multiplicación de una matriz por un vector
Los ejemplos de álgebra lineal para operaciones de matrices son los más adecuados por la idoneidad del framework en estos casos. Supongamos que tenemos una matriz cuadrada M de tamaño nxn. Al elemento ubicado en la fila i y columna j le denominamos mij. Supongamos que tenemos un vector v de tal forma que en la posición j se tiene el elemento vj. De esta forma la resultante de la multiplicación entre la matriz M y el vector v será un vector x de longitud n, de tal forma que el elemento xi es tal que:
Esta operación se realiza sin problema alguno para matrices de varios miles de elementos, siendo costoso para varios millones. El problema de su computación proviene cuando se pretende realizar con centenares de billones. Es por esta razón por la que se asume en la aplicación de MapReduce que n es del orden de 1012. La función map () en este caso toma una fila i de la matriz y el vector v completo para formar pares: (i, mijvj). Es decir de la forma (1, m11v1), (1, m12v2), (1, mi3v3) ... (1, mijvj).
map(Vector rowMatrix, Vector vector):
// clave: i -> índice del vector
// valor: producto de m<sub>ij</sub> por v<sub>j</sub>.
for each position i in vector:
EmitIntermediate(i, value);
La función reduce() en este caso sólo tiene que colectar los pares que poseen la misma clave i y sumarlos.
reduce(String word, Iterator partialCounts):
// word: una palabra
// partialCounts: una [[Iterador (patrón de diseño)|lista parcial]] para realizar cuentas agregadas
int result = 0;
for each v in partialCounts:
result += ParseInt(v);
Emit(result);
Flujo de datos
El framework de MapReduce es un gran algoritmo distribuido de ordenamiento. Los módulos principales que la aplicación define son:
- Un lector de entrada
- Una función Map
- Una función de partición
- Una función de comparación
- Una función Reduce
- Un escritor de salida
Lector de entrada
El lector de entrada divide la entrada en 'divisiones' de tamaño apropiado (típicamente entre 64 MB a 128 MB) y el framework asigna una división a cada función Map. El lector de entrada lee los datos desde almacenamiento estable (generalmente un Sistema de archivos distribuido) y genera pares llave/valor.
Un ejemplo común leerá un directorio lleno de archivos de texto y retornara cada línea como un registro.
Función Map
La función Map toma una serie de pares clave/valor, los procesa, y genera cero o más pares llave/valor de salida. Los tipos de los Mapas de entrada y salida pueden (y a menudo son) diferentes entre si.
Si la aplicación está realizando conteo de palabras, la función Map partirá las líneas en palabras y generará un par llave/valor de salida para cada palabra. Cada par de salida contendrá la palabra como llave y el número de instancias de la misma en la línea como valor.
Función de partición
Cada salida de la función Map es asignada a un reductor mediante la función de partición para generar fragmentación. La función de partición recibe la llave y el número de reductores y retorna el índice del reductor deseado.
El comportamiento por defecto es obtener el hash de la llave y utilizar el hash módulo el número de reductores. Es importante elegir una función de partición que genere una distribución aproximadamente uniforme de datos por fragmento para mantener el balance, de otra forma la operación MapReduce puede enlentecerse esperando a que reductores lentos (reductores asignados a más datos de los contenidos en su fragmento) finalicen.
Entre las etapas de mapeo y reducción los datos son barajados (ordenados paralelamente / intercambiados entre nodos) en orden de mover los datos desde el fragmento donde fueron producidos hacia el fragmento en el que serán reducidos. El barajamiento puede en algunos casos tomar más tiempo que el procesamiento dependiendo del ancho de banda, velocidades de CPU, datos producidos y tiempo consumido entre los procesamientos de mapeo y reducción.
Función de comparación
La entrada para cada Reducción es obtenida desde la máquina donde se ejecutó el Map y se ordenó utilizando la función de comparación
Función de Reducción
El framework llama a la función de Reducción de la aplicación una vez para cada llave única en la lista ordenada. La Reducción puede iterar entre los valores que están asociados con esa llave y producir cero o más salidas.
En el ejemplo de conteo de palabras, la función Reducción toma los valores de entrada, los suma y genera una salida única para la palabra y la suma final.
Escritor de salida
El Escritor de salida escribe la salida de la función Reducción a las tablas de almacenamiento, usualmente un sistema de archivos distribuido.
Usos
Por regla general se emplea MapReduce en aquellos problemas de Computación concurrente entre los que se encuentran involucrados grandes datasets que deben ser procesandos por una gran cantidad de computadoras (nodos), a los que se refiere de forma colectiva como clusteres (si todos los nodos se encuentran en la misma red de área local y empleando el mismo hardware), o a grids (si los nodos se comparten de forma distribuida a lo largo de extensas zonas geográficas o administrativas, y que generalmente poseen un hardware más heterogéneo). El procesamiento paralelo puede ocurrir con el empleo de datos almacenados tanto en filesystem (no estructurado) o en una database (estructurados).[1] Es por esta razón por la que se emplea en aplicaciones que poseen datos a gran escala, tales como aplicaciones paralelas, indexación web, data mining, y simulación científica.
Ver también
Implementaciones de MapReduce
Referencias
- ↑ a b Jeffrey Dean, Sanjay Ghemawat, (2008), MapReduce: simplified data processing on large clusters, Communications of the ACM - 50th anniversary issue: 1958 - 2008, Volume 51 Issue 1, January 2008 Pages 107-113
- ↑ Anand Rajaraman,Jeffrey David Ullman, (2012), Mining of Massive Datasets
- ↑ Hadoop creator goes to Cloudera
- ↑ Ashlee Vance (17 de marzo de 2009). «Hadoop, a Free Software Program, Finds Uses Beyond Search». New York Times. Consultado el 20 de enero de 2010.
- ↑ "Hadoop contains the distributed computing platform that was formerly a part of Nutch. This includes the Hadoop Distributed Filesystem (HDFS) and an implementation of map/reduce." About Hadoop