I could get your example working but on two conditions. I am using crossJoin
on the two dataframes and using contains
.
from pyspark.sql import SparkSession
from pyspark.sql.functions import (col, to_date,)
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext, SparkConf
config = SparkConf().setAll([('spark.sql.crossJoin.enabled', 'true')])
sc = SparkContext('local', conf=config)
sqlContext = SQLContext(sc)
data1 = [("http://example.com", -1),
("http://example2.com/index.html",-1)
]
df1Columns = ["url_first", "id"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
df1 = df1.drop("id")
df1.show(truncate=False)
data2 = [
("http://example.com/contact", 12),
("http://example2.com/index.html/pif", 45),
("http://example.com/about", 68),
("http://example2.com/index.html/juk/er", 96)
]
df2Columns = ["url_second", "id"]
df2 = sqlContext.createDataFrame(data=data2, schema = df2Columns)
df2.show(truncate=False)
joined_df = df2.crossJoin(df1)
joined_df.show(truncate=False)
inter_result = joined_df.withColumn("myjoin", col("url_second").contains(col("url_first")))
inter_result.show(n=200, truncate=False)
final_result = inter_result.filter(col("myjoin") == True).groupBy("url_first").agg(collect_list(col("id")).alias("id_list"))
final_result.show(n=200, truncate=False)
Output is as follows.
+------------------------------+
|url_first |
+------------------------------+
|http://example.com |
|http://example2.com/index.html|
+------------------------------+
+-------------------------------------+---+
|url_second |id |
+-------------------------------------+---+
|http://example.com/contact |12 |
|http://example2.com/index.html/pif |45 |
|http://example.com/about |68 |
|http://example2.com/index.html/juk/er|96 |
+-------------------------------------+---+
+-------------------------------------+---+------------------------------+
|url_second |id |url_first |
+-------------------------------------+---+------------------------------+
|http://example.com/contact |12 |http://example.com |
|http://example.com/contact |12 |http://example2.com/index.html|
|http://example2.com/index.html/pif |45 |http://example.com |
|http://example2.com/index.html/pif |45 |http://example2.com/index.html|
|http://example.com/about |68 |http://example.com |
|http://example.com/about |68 |http://example2.com/index.html|
|http://example2.com/index.html/juk/er|96 |http://example.com |
|http://example2.com/index.html/juk/er|96 |http://example2.com/index.html|
+-------------------------------------+---+------------------------------+
+-------------------------------------+---+------------------------------+------+
|url_second |id |url_first |myjoin|
+-------------------------------------+---+------------------------------+------+
|http://example.com/contact |12 |http://example.com |true |
|http://example.com/contact |12 |http://example2.com/index.html|false |
|http://example2.com/index.html/pif |45 |http://example.com |false |
|http://example2.com/index.html/pif |45 |http://example2.com/index.html|true |
|http://example.com/about |68 |http://example.com |true |
|http://example.com/about |68 |http://example2.com/index.html|false |
|http://example2.com/index.html/juk/er|96 |http://example.com |false |
|http://example2.com/index.html/juk/er|96 |http://example2.com/index.html|true |
+-------------------------------------+---+------------------------------+------+
+------------------------------+--------+
|url_first |id_list |
+------------------------------+--------+
|http://example.com |[12, 68]|
|http://example2.com/index.html|[45, 96]|
+------------------------------+--------+
CLICK HERE to find out more related problems solutions.