-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdependency_parser.py
88 lines (67 loc) · 2.44 KB
/
dependency_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import sparknlp
from pyspark.sql.types import *
from sparknlp.annotator import *
from sparknlp.base import *
from pyspark.sql.functions import udf
"""
Configuration:
spark-nlp 2.7.5
pyspark 2.4.7
spark 2.4.7
java 8
python 3.6.9
"""
def get_heads_children_info(dependency_metadata):
root_index = ['-1']
heads = root_index + [dm['head'] for dm in dependency_metadata]
children = []
for index, _ in enumerate(heads):
if index != 0:
children.append([i for i, e in enumerate(heads) if e == str(index)])
return heads[1:], children
if __name__ == '__main__':
spark = sparknlp.start()
test_ds = spark.createDataFrame(
["So what happened?",
"It should continue to be defanged.",
"That too was stopped."],
StringType()).toDF("text")
test_ds.show(5, False)
main_path = "/home/dburbano/IdeaProjects/JSL/spark-nlp/src/test/resources/"
conllU_training_file = main_path + "parser/labeled/train_small.conllu.txt"
pos_tagger = PerceptronModel.pretrained()
document_assembler = DocumentAssembler() \
.setInputCol("text") \
.setOutputCol("document")
sentence_detector = SentenceDetector() \
.setInputCols(["document"]) \
.setOutputCol("sentence")
tokenizer = Tokenizer() \
.setInputCols(["sentence"]) \
.setOutputCol("token")
dependency_parser = DependencyParserApproach() \
.setInputCols(["sentence", "pos", "token"]) \
.setOutputCol("dependency") \
.setConllU(conllU_training_file) \
.setNumberOfIterations(10)
pipeline_dependency_parser = Pipeline().setStages([
document_assembler,
sentence_detector,
tokenizer,
pos_tagger,
dependency_parser
])
dp_df = pipeline_dependency_parser.fit(test_ds).transform(test_ds)
dp_df.printSchema()
dp_df.select(dp_df["dependency.metadata"]).show(5, False)
schema = StructType([
StructField("heads", ArrayType(StringType()), False),
StructField("children", ArrayType(StringType()), False)
])
getHeadsAndChildrenUDF = udf(lambda z: get_heads_children_info(z), schema)
metadata_df = dp_df \
.withColumn("metadata", getHeadsAndChildrenUDF(dp_df["dependency.metadata"]))
metadata_df.withColumn("heads", metadata_df["metadata.heads"]) \
.withColumn("children", metadata_df["metadata.children"]) \
.select("heads", "children")\
.show(5, False)