1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| from pyspark.sql import SparkSession sparkSession = SparkSession.builder.enableHiveSupport().master('local').getOrCreate() from pyspark.sql.types import StructType,StructField,IntegerType,StringType from pyspark.sql import Row
Edgelist = [(1,2),(1,3),(1,4),(2,3),(2,4),(xx,xx)] graphData= sparkSession.sparkContext.parallelize(Edgelist).map(lambda (src,dst): Row(src,dst)) graphSchemaAB = StructType([StructField('A',IntegerType(),nullable=False),StructField('B',StringType(),nullable=False)])
abDF=sparkSession.createDataFrame(graphData,graphSchemaAB) graphSchemaBC =StructType([StructField('B',IntegerType(),nullable=False),StructField('C',StringType(),nullable=False)]) bcDF = sparkSession.createDataFrame(graphData,graphSchemaBC)
abDF.show()
joinDF = abDF.join(bcDF,abDF.B == bcDF.B) joinDF.show() abcDF.drop('B').groupBy('A','C').count().filter('A=1').show()
|