Apache Spark: Applying a function from sklearn parallel on partitions

1.3k Views Asked by At

I'm new to Big Data and Apache Spark (and an undergrad doing work under a supervisor).

Is it possible to apply a function (i.e. a spline) to only partitions of the RDD? I'm trying to implement some of the work in the paper here.

The book "Learning Spark" seems to indicate that this is possible, but doesn't explain how.

"If you instead have many small datasets on which you want to train different learning models, it would be better to use a single- node learning library (e.g., Weka or SciKit-Learn) on each node, perhaps calling it in parallel across nodes using a Spark map()."

3

There are 3 best solutions below

0
On

Actually, we have a library which does exactly that. We have several sklearn transformators and predictors up and running. It's name is sparkit-learn.
From our examples:

from splearn.rdd import DictRDD  
from splearn.feature_extraction.text import SparkHashingVectorizer  
from splearn.feature_extraction.text import SparkTfidfTransformer  
from splearn.svm import SparkLinearSVC  
from splearn.pipeline import SparkPipeline  

from sklearn.feature_extraction.text import HashingVectorizer  
from sklearn.feature_extraction.text import TfidfTransformer  
from sklearn.svm import LinearSVC  
from sklearn.pipeline import Pipeline  

X = [...]  # list of texts  
y = [...]  # list of labels  
X_rdd = sc.parallelize(X, 4)
y_rdd = sc.parralelize(y, 4)
Z = DictRDD((X_rdd, y_rdd),
            columns=('X', 'y'),
            dtype=[np.ndarray, np.ndarray])

local_pipeline = Pipeline((
    ('vect', HashingVectorizer()),
    ('tfidf', TfidfTransformer()),
    ('clf', LinearSVC())
))
dist_pipeline = SparkPipeline((
    ('vect', SparkHashingVectorizer()),
    ('tfidf', SparkTfidfTransformer()),
    ('clf', SparkLinearSVC())
))

local_pipeline.fit(X, y)
dist_pipeline.fit(Z, clf__classes=np.unique(y))

y_pred_local = local_pipeline.predict(X)
y_pred_dist = dist_pipeline.predict(Z[:, 'X'])

You can find it here.

0
On

Im not 100% sure that I am following, but there are a number of partition methods, such as mapPartitions. These operators hand you the Iterator on each node, and you can do whatever you want to the data and pass it back through a new Iterator

rdd.mapPartitions(iter=>{
  //Spin up something expensive that you only want to do once per node
  for(item<-iter) yield {
    //do stuff to the items using your expensive item
  }
})
0
On

If your data set is small (it is possible to load it and train on one worker) you can do something like this:

def trainModel[T](modelId: Int, trainingSet: List[T]) = {
  //trains model with modelId and returns it
}

//fake data
val data = List()
val numberOfModels = 100
val broadcastedData = sc.broadcast(data)
val trainedModels = sc.parallelize(Range(0, numberOfModels))
  .map(modelId => (modelId, trainModel(modelId, broadcastedData.value)))

I assume you have some list of models (or some how parametrized models) and you can give them ids. Then in function trainModel you pick one depending on id. And as result you will get rdd of pairs of trained models and their ids.