-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclean.py
21 lines (19 loc) · 891 Bytes
/
clean.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import pyspark.sql.functions as F
df = spark.read.csv('newcountries.csv', header=True)
df.replace({
'Congo (the Democratic Republic of the)': 'Democratic Republic of Congo',
'Korea (Democratic People\'s Republic of)': 'North Korea',
'Korea (the Republic of)': 'South Korea',
'Lao People\'s Democratic Republic': 'Laos',
'Russian Federation': 'Russia',
'Viet Nam': 'Vietnam',
'Palestine, State of': 'Palestine',
'United Kingdom of Great Britain and Northern Ireland': 'United Kingdom',
'Syrian Arab Republic': 'Syria',
'Brunei Darussalam': 'Brunei'
}).selectExpr(
"regexp_replace(country_name, ' \\\\(.*\\\\)', '') country",
"regexp_replace(country_border_name, ' \\\\(.*\\\\)', '') neighbour"
).groupBy('country').agg(
F.collect_list('neighbour').alias('neighbours')
).orderBy('country').coalesce(1).write.json('newcountries')