forked from kartoza/parcel_plugin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase.py
127 lines (112 loc) · 5.29 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# -*- coding: utf-8 -*-
"""
Author: Robert Moerman
Contact: [email protected]
Company: AfriSpatial
This is a postgresql database manager.
/***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************/
"""
import psycopg2
class Field:
def __init__(self, name, type, required, unique):
self.name = name
self.type = type
self.required = required
self.unique = unique
class Manager:
def __init__(self, params):
# test db settings
self.params = params
self.connect(params)
self.disconnect()
def connect(self, params):
""" Create a backend postgres database connection
"""
try:
# check if connection object exist
if not hasattr(self, 'conn') or self.conn is None:
self.conn = psycopg2.connect("host='{HOST}' dbname='{NAME}' user='{USER}' password='{PASSWORD}' port='{PORT}'".format(HOST=params["HOST"], NAME=params["NAME"], USER=params["USER"], PASSWORD= params["PASSWORD"], PORT=params["PORT"]))
# check if cursor objet exists
if not hasattr(self, 'cursor') or self.cursor is None:
self.cursor = self.conn.cursor()
except Exception as e:
raise Exception('Could not connect to database!\nError raised: {error}.'.format(error = str(e)))
def disconnect(self):
""" Terminate a backend postgres database connection
"""
try:
# check if a cursor object exists
if hasattr(self, 'cursor') and self.cursor is not None:
self.cursor.close()
self.cursor = None
# check if a connection object exists
if hasattr(self, 'conn') and self.conn is not None:
self.conn.close()
self.conn = None
except Exception as e:
raise Exception('Could not disconnect from database!\nError raised: {error}.'.format(error = str(e)))
def query(self, query, data=None):
""" Execute query using given data against connection object
@returns resultset (array structure)
"""
try:
self.connect(self.params)
if data is None:
self.cursor.execute(query)
else:
self.cursor.execute(query, data)
records = None
try:
records = self.cursor.fetchall()
except:
pass
self.conn.commit()
self.disconnect()
return records
except Exception as e:
raise Exception('Backend database query failed!\nError raised: %s.' %(str(e),))
def queryPreview(self, query, data=None, multi_data=False):
""" Preview query
@returns query (str)
"""
try:
self.connect(self.params)
sql = ""
if data is None:
sql = self.cursor.mogrify(query)
else:
if multi_data:
placeholders = ','.join(['%s' for dummy in data])
query = query % (placeholders)
sql = self.cursor.mogrify(query, data)
else:
sql = self.cursor.mogrify(query, data)
self.disconnect()
return sql
except Exception as e:
raise Exception('Backend database mogrification failed!\nError raised: %s.' %(str(e),))
def getSchema(self, tbl_name, fld_ignore):
""" Get information abot a specific table
@returns [<Field Name>, <Field Type>, <Nullable>] (list)
"""
return [Field(
data[0],
self._pythonize_type(data[1]),
data[2],
data[3]
) for data in reversed(self.query("SELECT c.column_name, c.data_type, CASE WHEN c.is_nullable = 'NO' THEN TRUE ELSE FALSE END AS required, CASE WHEN u.column_name IS NOT NULL THEN TRUE ELSE FALSE END AS unique FROM information_schema.columns c LEFT JOIN (SELECT kcu.column_name, tc.table_name FROM information_schema.table_constraints tc LEFT JOIN information_schema.key_column_usage kcu ON tc.constraint_catalog = kcu.constraint_catalog AND tc.constraint_schema = kcu.constraint_schema AND tc.constraint_name = kcu.constraint_name WHERE tc.constraint_type IN ('UNIQUE', 'PRIMARY KEY') AND tc.table_name = '{table}') u ON u.column_name = c.column_name WHERE c.table_name = '{table}' AND c.column_name NOT IN ({ignore});".format(table = tbl_name, ignore = ", ".join("'%s'" %(i,) for i in fld_ignore))))]
def _pythonize_type(self, db_type):
""" Get python type
@returns type (type)
"""
if "char" in db_type.lower(): return str
elif "double" in db_type.lower(): return float
elif "integer" in db_type.lower(): return int
else: return object