徐志远

推荐算法工程师的学习日常

0%

graphframe

counting common friends pseudocode without graphframe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.enableHiveSupport().master('local').getOrCreate()
from pyspark.sql.types import StructType,StructField,IntegerType,StringType
from pyspark.sql import Row

Edgelist = [(1,2),(1,3),(1,4),(2,3),(2,4),(xx,xx)]
graphData= sparkSession.sparkContext.parallelize(Edgelist).map(lambda (src,dst): Row(src,dst))
graphSchemaAB = StructType([StructField('A',IntegerType(),nullable=False),StructField('B',StringType(),nullable=False)])

abDF=sparkSession.createDataFrame(graphData,graphSchemaAB)
graphSchemaBC =StructType([StructField('B',IntegerType(),nullable=False),StructField('C',StringType(),nullable=False)])
bcDF = sparkSession.createDataFrame(graphData,graphSchemaBC)

abDF.show()

joinDF = abDF.join(bcDF,abDF.B == bcDF.B)
joinDF.show()
abcDF.drop('B').groupBy('A','C').count().filter('A=1').show()

graphframe

edges = sparkSession.createDataFrame([(‘xx’,‘xx’,‘friend’),(‘xx’,‘xx’,‘friend’)],[‘src’,‘dst’,‘relationship’])
g = GraphFrame(vertices,edges)

DSL

  1. Edge
  2. Union of Edges
  3. Names:
    identify common elements
    identify names of columns in the result DataFrame
  4. Anonymous edges and vertices
  5. Negation

triangles