python – 在Spark中广播Annoy对象(对于最近的邻居)?

由于Spark的mllib没有最近邻功能,我试图将Annoy用于近似邻近邻居.我尝试播放Annoy对象并将其传递给工人;但是,它没有按预期运行.

下面是可重现性的代码(在PySpark中运行).在使用带有vs而没有Spark的Annoy时所看到的差异突出了这个问题.

from annoy import AnnoyIndex
import random
random.seed(42)

f = 40
t = AnnoyIndex(f)  # Length of item vector that will be indexed
allvectors = []
for i in xrange(20):
    v = [random.gauss(0, 1) for z in xrange(f)]
    t.add_item(i, v)
    allvectors.append((i, v))
t.build(10) # 10 trees

# Use Annoy with Spark
sparkvectors = sc.parallelize(allvectors)
bct = sc.broadcast(t)
x = sparkvectors.map(lambda x: bct.value.get_nns_by_vector(vector=x[1], n=5))
print "Five closest neighbors for first vector with Spark:",
print x.first()

# Use Annoy without Spark
print "Five closest neighbors for first vector without Spark:",
print(t.get_nns_by_vector(vector=allvectors[0][1], n=5))

输出见:

Five closest neighbors for first vector with Spark: None

Five closest neighbors for first vector without Spark: [0, 13, 12, 6, 4]

解决方法:

我从未使用过Annoy,但我很确定包描述解释了这里发生了什么:

It also creates large read-only file-based data structures that are mmapped into memory so that many processes may share the same data.

由于它在序列化时使用内存映射索引并将其传递给worker,因此所有数据都会丢失.

尝试这样的事情:

from pyspark import SparkFiles

t.save("index.ann")
sc.addPyFile("index.ann")

def find_neighbors(iter):
    t = AnnoyIndex(f)
    t.load(SparkFiles.get("index.ann"))
    return (t.get_nns_by_vector(vector=x[1], n=5) for x in iter)

sparkvectors.mapPartitions(find_neighbors).first()
## [0, 13, 12, 6, 4]
上一篇:python-用同一列的平均值填充Pyspark数据框列的空值


下一篇:如何将字典列表转换为Pyspark DataFrame