-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoracle_to_doris.py
46 lines (41 loc) · 1.57 KB
/
oracle_to_doris.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, when, lit
spark = SparkSession.builder \
.appName("oracle_to_doris") \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
# read oracle
df = spark.read \
.format("jdbc") \
.option("driver", "oracle.jdbc.driver.OracleDriver") \
.option("url", "jdbc:oracle:thin:@host:port:service") \
.option("dbtable", "schema.table") \
.option("user", "user") \
.option("password", "password") \
.load()
df.show()
pattern = r"(\r\n|\n|\r|\t|\r)"
processed_df = df \
.select(
regexp_replace(col("BOOKID"), pattern, "").alias("book_id"),
regexp_replace(col("TITLE"), pattern, "").alias("title"),
regexp_replace(col("AUTHOR"), pattern, "").alias("author"),
regexp_replace(col("PUBLICATIONYEAR"), pattern,"").alias("publication_year"),
regexp_replace(col("GENRE"), pattern, "").alias("genre"),
regexp_replace(col("RATING"), pattern, "").alias("rating"),
when(regexp_replace(col("RATING"), pattern, "").isNull(), lit(0))
.otherwise(regexp_replace(col("RATING"), pattern, ""))
.alias("wo_create_date"),
regexp_replace(col("STATUS"), pattern, "").alias("status")
)
# save doris
ds = processed_df \
.write \
.mode("append") \
.format("doris") \
.option("checkpointLocation", "./checkpoint/table") \
.option("doris.table.identifier", "database.table") \
.option("doris.fenodes", "host:port") \
.option("user", "user") \
.option("password", "password") \
.save()