\documentclass[nojss]{jss} \usepackage{graphicx,keyval,thumbpdf,url} \newcommand{\sQuote}[1]{`{#1}'} \newcommand{\dQuote}[1]{``{#1}''} \newcommand{\file}[1]{\sQuote{\textsf{#1}}} \newcommand{\class}[1]{\code{"#1"}} %% need no \usepackage{Sweave.sty} %%\SweaveOpts{engine=R,eps=FALSE,results=verbatim,fig=FALSE,echo=TRUE,strip.white=true} \AtBeginDocument{\setkeys{Gin}{width=0.6\textwidth}} \date{\today} \title{Distributed Storage and Lists} \Plaintitle{Distributed Storage and Lists} \Shorttitle{DSL} \author{Stefan Theu\ss{}l} \Plainauthor{Stefan Theussl} %% \VignetteIndexEntry{DSL} \Abstract{ Distributed lists are list-type objects where elements (i.e., arbitrary \proglang{R} objects) are stored in serialized form on a distributed storage. The latter is often used in high performance computing environments to process large quantities of data. First proposed by Google, data located in such an environment is most efficiently processed using the MapReduce programming model. The \proglang{R} package~\pkg{DSL} provides an environment for creating and handling of distributed lists. The package allows to make use of different types of storage backends, in particular the Hadoop Distributed File System. Furthermore, it offers functionality to operate on such lists efficiently using the MapReduce programming model. } \Keywords{\proglang{R}, lists, MapReduce} \Plainkeywords{R, lists, MapReduce} \Address{ Stefan Theu\ss{}l\\ %% Institute for Statistics and Mathematics\\ %% WU Wirtschaftsuniversit\"at Wien\\ %% Augasse 2--6\\ %% 1090 Wien, Austria E-mail: \email{Stefan.Theussl@R-project.org}\\ %% URL: \url{http://statmath.wu.ac.at/~theussl/}\\ } \begin{document} <>= options(width = 60) require("DSL") @ % \maketitle \sloppy{} \section{Introduction} \label{sec:introduction} \emph{Distributed lists} are list-type objects using a \emph{distributed storage} to store their elements. Typically, distributed lists are advantageous in environments where large quantities of data need to be processed at once since all data is stored out of the main memory which is often limited. Usually, a ``distributed file system'' (DFS) can serve as a container to hold the data on a distributed storage. Such a container can hold arbitrary objects by serializing them to files. A recurrent function when computing on lists in \proglang{R}~\citep[][]{Rcore:2011} is \code{lapply()} and variants thereof. Conceptually, this is similar to a ``Map'' function from functional programming where a given (\proglang{R}) function is applied to each element of a vector (or in this case a list). Furthermore, another typical type of function often applied to lists is a function which combines contained elements. In functional programming this is called ``Reduce'' but variants thereof also exists in other areas (e.g., in the MPI standard, see \url{http://www.mpi-forum.org/docs/mpi22-report/node103.htm#Node103}). First proposed by Google the Map and Reduce functions are often sufficient to express many tasks for analyzing large data sets. They implement a framework which follows closely the MapReduce programming model~\citep[see][and \url{http://en.wikipedia.org/wiki/MapReduce}]{Dean+Ghemawat:2004}. Note however, that as pointed out e.g., in~\cite{Laemmel:2007} Map and Reduce operations in the MapReduce programming model do not necessarily follow the definition from functional programming. It rather aims to support computation (i.e., map and reduction operations) on large data sets on clusters of workstations in a distributed manner. Provided each mapping operation is independent of the others, all maps can be performed in parallel. Hadoop (\url{http://hadoop.apache.org/}) is an open source variant of this framework. Package~\pkg{DSL} is an extension package for \proglang{R} for creating and handling list-type objects whose elements are stored using a distributed storage backend. For operating on such distributed lists efficiently the package offers methods and functions from the MapReduce programming model. In particular, \pkg{DSL} allows to make use of the Hadoop Distributed File System~\citep[HDFS, see][]{Borthakur:2010} and Hadoop Streaming (MapReduce) for storing and distributed processing of data. In Section~\ref{sec:design+implementation}, we describe the underlying data structures, and the MapReduce functionality. Examples are discussed in Section~\ref{sec:examples}. Section~\ref{sec:conclusion+outlook} concludes the paper. \section{Design and Implementation} \label{sec:design+implementation} \subsection{Data Structures} \subsubsection{Distributed Storage} The S3 class \class{DStorage} defines a virtual storage where files are kept on a file system which possibly spans over several workstations. Data is distributed automatically among these nodes when using such a file system. Objects of class \class{DStorage} ``know'' how to use the corresponding file system by supplied accessor and modifier methods. The following file systems are supported to be used as distributed storage (DS): \begin{description} \item[\code{"LFS"}:] the local file system. This type uses functions and methods from the packages \pkg{base} and \pkg{utils} delivered with the \proglang{R} distribution to handle files. \item[\code{"HDFS"}:] the Hadoop distributed file system. Functions and Methods from package \pkg{hive}~\citep[][]{Theussl+Feinerer:2011} are used to interact with the HDFS. \end{description} Essentially, such a class needs methods for reading and writing to the distributed storage (DS). Note however that files are typically organized according to a published standard. Thus, one should not write or modify arbitrary files or directories on such a file system. To account for this, class \class{DStorage} specifies a directory \code{base\_dir} which can be modified freely but avoids that read/write operations can escape from that directory. The following (\pkg{DSL}-internal) methods are available for objects of class \class{DStorage}. \begin{itemize} \item \code{DS\_dir\_create()} \item \code{DS\_get()} \item \code{DS\_list\_directory()} \item \code{DS\_put()} \item \code{DS\_read\_lines()} \item \code{DS\_unlink()} \item \code{DS\_write\_lines()} \end{itemize} Depending on the type of storage suitable functions from different packages will be used to interact with the corresponding file system. Whereas \code{DS\_dir\_create()}, \code{DS\_list\_directory()}, \code{DS\_read\_lines()}, \code{DS\_unlink()}, and \code{DS\_write\_lines()} mimic the behavior of corresponding functions of package \pkg{base} (\code{dir.create()}, \code{dir()}, \code{readLines}, \code{unlink()}, and \code{writeLines()}, respectively), functions \code{DS\_get()} and \code{DS\_put()} can be used to read/write \proglang{R} objects from/to disk. The main reason of having such a virtual storage class in \proglang{R} is that it allows for easy extension of memory space in the \proglang{R} working environment. E.g., this storage can be used to store arbitrary (serialized) \proglang{R} objects. These objects are only loaded to the current working environment (i.e., into RAM) when they are needed for computation. However, it is in most cases not a good idea to place many small files on such a file system due to efficiency reasons. Putting several serialized \proglang{R} objects into files of a certain maximum size (e.g., line by line as key/value pairs) circumvents this issue. Indeed, frameworks like Hadoop benefit from such a setup \citep[see Section \emph{Data Organization} in][]{Borthakur:2010}. Thus, a constructor function must take the following arguments: \begin{description} \item[\code{type}:] the file system type, \item[\code{base\_dir}:] the directory under which chunks of serialized objects are to be stored, \item[\code{chunksize}:] the maximal size of a single chunk. \end{description} E.g., a DS of \code{type} \code{"LFS"} using the system-wide or a user-defined \emph{temporary directory} as the base directory (\code{base\_dir}) and a chunk size of 10MB can be instanciated using the function \code{DStorage()}: <>= ds <- DStorage( type = "LFS", base_dir = tempdir(), chunksize = 10 * 1024^2 ) @ Further methods to class \class{DStorage} are a corresponding \code{print()} and a \code{summary()} method. <>= ds summary(ds) @ \subsubsection{Distributed Lists} Distributed lists are defined in \proglang{R} by the S3 class \class{DList}. Objects of this class behave similar to standard \proglang{R} lists but use a distributed storage of class \class{DStorage} to store their elements. Distributed lists can be easily constructed using the function \code{DList()} or can be coerced using the generic function \code{as.DList()}. Available methods support coercion of \proglang{R} lists and character vectors representing paths to data repositories as well as coercion of \class{DList} objects to lists. <>= dl <- DList( letters = letters, numbers = 0:9 ) l <- as.list( letters ) names(l) <- LETTERS dl2 <- as.DList(l) identical( as.list(dl2), l ) dl3 <- as.DList( system.file("examples", package = "DSL") ) @ Note that the above example uses a default storage type, namely \code{"LFS"} using a \emph{temporary directory} generated with \code{tempdir()} as the base directory. In order to set a user defined storage the \code{DStorage} argument to the \code{DList()} constructor is used. <>= dl <- DList( letters = letters, numbers = 0:9, DStorage = ds ) @ Conceptually we want a distributed list to support a set of intuitive operations, like accessing each element (stored somewhere on a DFS) in a direct way, displaying the distributed list and each individual element, obtaining information about basic properties (e.g., the length of the list), or applying some operation on a range of elements. These requirements are formalized via a set of interfaces which must be implemented by the \class{DList} class: \begin{description} \item[Display] Since elements of the list are not directly available the \code{print} and \code{summary} methods should provide other useful information about the distributed list (like the number of list elements). \item[Length] The \code{length()} function must return the number of list elements. \item[Names] Named list must be supported. \item[Subset] The \code{[[}%]] \ operator must be implemented so that individual elements of the distributed list can be retrieved. \item[MapReduce] Map and Reduce operations as well as variants of \code{lapply} (which are conceptually similar to Map) can be used to express most of the computation on \class{DList} objects. \end{description} <>= #dl summary(dl) names( dl2 ) length(dl3) dl3[[1]] @ MapReduce is discussed in more detail in the next section. \subsection{Methods on Distributed Lists} \label{sec:methods} The MapReduce programming model as defined by~\cite{Dean+Ghemawat:2004} is as follows. The computation takes a set of input key/value pairs, and produces a set of output key/value pairs. The user expresses the computation as two functions: Map and Reduce. The Map function takes an input pair and produces a set of intermediate key/value pairs. The Reduce function accepts an intermediate key and a set of values for that key (possibly grouped by the MapReduce library). It merges these values together to form a possibly smaller set of values. Typically, just zero or one output value is produced per reduce invocation. Furthermore, data is usually stored on a (distributed) file system which is recognized by the MapReduce library. This allows such a framework to handle lists of values (here objects of class \class{DList}) that are too large to fit in main memory (i.e., RAM). \begin{description} \item[\code{DGather}:] this collective operation is similar to an MPI\_GATHER (\url{http://www.mpi-forum.org/docs/mpi22-report/node95.htm#Node95}). However, instead of collecting results from processes running in parallel, \code{DGather()} collects the contents of chunks holding the elements of a \class{DList}. By default a named list of length the number of chunks is to be returned. Its elements are character vectors of values from key/value pairs stored in chunks read line by line from the corresponding chunk. Alternatively, \code{DGather()} can be used to retrieve the keys only. \item[\code{DLapply}:] is an (l)apply-type function which is used to iteratively \emph{apply} a function to a set of input values. In case of \code{DLapply()} input values are elements of \class{DList} objects (i.e., the value of a key/value pair). A distributed list of the same length is to be returned. \item[\code{DMap}:] is similar to \code{DLapply()} above but always takes both the key and the value from a key/value pair as input. Thus, keys can also be modified. Indeed, the returned object can differ in length from the original as opposed to when using \code{DLapply}. \item[\code{DReduce}:] this collective operation takes a set of (intermediate) key/value pairs and combines values with the same associated key using a given directive (the reduce function). By default values are concatinated using the \code{c()} operator. \end{description} <>= dl <- DList( line1 = "This is the first line.", line2 = "Now, the second line." ) res <- DLapply( dl, function(x) unlist(strsplit(x, " ")) ) as.list( res ) foo <- function( keypair ) list( key = paste("next_", keypair$key, sep = ""), value = gsub("first", "mapped", keypair$value) ) dlm <- DMap( x = dl, MAP = foo) ## retrieve keys unlist(DGather(dlm, keys = TRUE, names = FALSE)) ## retrieve values as.list( dlm ) @ Further methods on \class{DList} objects are prefixed with \code{DL\_}. Currently, only methods for interacting with the underlying \class{DStorage} are available. \begin{description} \item[\code{DL\_storage}:] accesses the storage of \class{DList} objects. Returns objects of class \class{DStorage}. \item[\code{DL\_storage<-}:] replaces the storage in \class{DList} objects. Data is automatically transferred to the new storage. \end{description} <>= l <- list( line1 = "This is the first line.", line2 = "Now, the second line." ) dl <- as.DList( l ) DL_storage(dl) ds <- DStorage("HDFS", tempdir()) DL_storage(dl) <- ds as.list(dl) @ \section{Examples} \label{sec:examples} \subsection{Word Count} This examples demonstrates how \code{Dmap()} and \code{DReduce()} can be used to count words based on text files located somewhere on a given file system. The following two files contained in the example directory of the package will be used. <>= ## simple wordcount based on two files: dir(system.file("examples", package = "DSL")) @ We use a temporary directory as the base directory of a new \class{DStorage} object. By setting the maximum chunk size to 1 Byte we force the name of each file being placed in a separate chunk. Then we store the absolute path to the text files as elements of a \class{DList} object. <>= ## first force 1 chunk per file (set max chunk size to 1 byte): ds <- DStorage("LFS", tempdir(), chunksize = 1L) ## make "DList", i.e., read file contents and store in chunks dl <- as.DList( system.file("examples", package = "DSL"), DStorage = ds ) @ Data is read into chunks (one per original file) by using a simple call of \code{DMap()} on the distributed list. <>= ## read files dl <- DMap(dl, function( keypair ){ list( key = keypair$key, value = tryCatch(readLines(keypair$value), error = function(x) NA) ) }) @ The contents of the files is split into words using the following call. <>= ## split into terms splitwords <- function( keypair ){ keys <- unlist(strsplit(keypair$value, " ")) mapply( function(key, value) list( key = key, value = value), keys, rep(1L, length(keys)), SIMPLIFY = FALSE, USE.NAMES = FALSE ) } res <- DMap( dl, splitwords ) as.list(res) @ Eventually, collected intermediate results are summed. <>= ## now aggregate by term res <- DReduce( res, sum ) as.list( res ) @ % \subsection{Temperature} % TODO: Example from Hadoop book. around 30 GB of raw data. % <<>>= % data <- "~/tmp/NCDC" % require("hive") % require("DSL") % ## first force 1 chunk per file (set max chunk size to 1 byte): % ds <- DStorage("LFS", tempdir(), chunksize = 1L) % ## make "DList", i.e., read file contents and store in chunks % dl <- as.DList( data, % DStorage = ds ) % dl <- DMap(dl, function( keypair ){ % con <- gzfile(keypair$value) % lines <- tryCatch(readLines(con), % error = function(x) NA) % close( con ) % mapply( function(key, value) list( key = key, value = value), % keypair$key, lines, % SIMPLIFY = FALSE, USE.NAMES = FALSE ) % }) % ds <- DStorage( "HDFS", tempdir() ) % DL_storage(dl) <- ds % int <- DMap(dl, function( keypair ){ % airtemp <- as.integer( substr(keypair$value, 88, 92) ) % if( airtemp == 9999 || (! as.integer(substr(keypair$value, 93, 93)) % %in% c(0L,1L,4L,5L,9L)) ) % airtemp <- NA % list(key = substr(keypair$value, 16, 19), % value = airtemp) % }) % maxtemp <- DReduce( int, function(x) max(x, na.rm = TRUE) ) % as.list(maxtemp) % @ \section{Conclusion and Outlook} \label{sec:conclusion+outlook} Package~\pkg{DSL} was designed to allow for handling of large data sets not fitting into main memory. The main data structure is the class \class{DList} which is a list-type object storing its elements on a virtual storage of class \class{DStorage}. The package currently provides basic data structures for creating and handling \class{DList} and \class{DStorage} objects, and facilities for computing on these, including map and reduction methods based on the MapReduce paradigm. Possible future extensions include: \begin{itemize} \item \class{DStorage} interface to NoSQL database systems, \item better integration of the \pkg{parallel} package. Currently only the \emph{multicore} version of \code{lapply} is used for \code{"LFS"} type \class{DStorage}. \end{itemize} \subsubsection*{Acknowledgments} We are grateful to Christian Buchta for providing efficient \proglang{C} code for collecting partial results in \code{DReduce()}. {\small \bibliography{DSL} } \end{document}