diff --git a/share/AbstractReader.py b/share/AbstractReader.py index fba659e..b98182c 100644 --- a/share/AbstractReader.py +++ b/share/AbstractReader.py @@ -1,78 +1,218 @@ +from enum import Enum + +class Aggregator(Enum): + """ + Enum to describe aggregation method to use. + Note that this aggregation functions should + be supported at the backend level. + """ + COUNT = 1 + COUNT_ERRORS = 2 + COUNT_NAN = 3 + FIRST = 4 + LAST = 5 + MIN = 6 + MAX = 7 + AVG = 8 + STD_DEV = 9 -import time - class AbstractReader(object): """ Subclass this class to create a PyTangoArchiving Reader for your specific DB - + e.g. TimeDBReader(AbstractReader) """ - def __init__(self,config='',...): + def __init__(self, config='',**kwargs): ''' - config must be an string like user:passwd@host - or a json-like dictionary "{'user':'...','passwd':'...'}" + Config can be an string like user:passwd@host + or a json-like dictionary "{'user':'...','password':'...','database':}" ''' - self.db = YourDb(config) + try: + self.db = YourDb(**(config or kwargs)) + except: + raise Exception('WrongDatabaseConfig') return - def get_database(self,epoch=-1): + def get_connection(self): """ - This method should provide the current connection object to DB - - + Return the connection object to avoid a client + to open one for custom queries. + The returned object will be implementation specific. """ return self.db - def get_attributes(self,active=False,regexp=''): - """ + def get_attributes(self, active=False, pattern=''): + """ Queries the database for the current list of archived attributes. arguments: - active: True/False: attributes currently archived - regexp: '' :filter for attributes to retrieve + active: True: only attributes currently archived + False: all attributes, even the one not archiving anymore + pattern: '' :filter for attributes to retrieve """ return list() - def is_attribute_archived(self,attribute): + def is_attribute_archived(self, attribute, active=False): """ Returns if an attribute has values in DB. - If active=True, only returns for value currently adding new values + + arguments: + attribute: fqdn for the attribute. + active: if true, only check for active attributes, + otherwise check all. """ return True - def load_last_values(self,attribute): + def get_last_attribute_value(self, attribute): """ Returns last value inserted in DB for an attribute - - (epoch, r_value, w_value, quality) + + arguments: + attribute: fqdn for the attribute. + returns: + (epoch, r_value, w_value, quality, error_desc) """ - return (time.time(), 0., 0., 0) - def get_attribute_values(self,attribute,start_date,stop_date=None, - decimate=False): + return self.get_last_attributes_values([attribute])[attribute] + + def get_last_attributes_values(self, attributes, columns = 'time, r_value'): """ - Returns attribute values between dates in the format: - [(epoch0, r_value, w_value, quality), - (epoch1, r_value, w_value, quality), - (epoch2, Exception, None, ATTR_INVALID), - ... ] - decimate may be False, True or an aggregation method - w_value and quality are optional, while r_value is mandatory + Returns last values inserted in DB for a list of attributes + + arguments: + attribute: fqdn for the attribute. + columns: requested columns separated by commas + returns: + {'att1':(epoch, r_value, w_value, quality, error_desc), + 'att2':(epoch, r_value, w_value, quality, error_desc), + ... + } """ - return [(time.time(), 0., 0., 0)] - def get_attributes_values(self,attribute,start_date,stop_date=None, - decimate=False, correlate=False): + return {attributes[0]: (time.time(), 0., 0., 0, "")} + + def get_attribute_values(self, attribute, + start_date, stop_date=None, + decimate=None, + **params): """ - Returns attribute values between dates in the format: - [(epoch0, (r_value,w_value,quality)), - (epoch1, (r_value,w_value,quality)), - (epoch2, (Exception, None, ATTR_INVALID)), + Returns attribute values between start and stop dates. + + arguments: + attribute: fqdn for the attribute. + start_date: datetime, beginning of the period to query. + stop_date: datetime, end of the period to query. + if None, now() is used. + decimate: aggregation function to use in the form: + {'timedelta0':(MIN, MAX, …) + , 'timedelta1':(AVG, COUNT, …) + , …} + if None, returns raw data. + returns: + [(epoch0, r_value, w_value, quality, error_desc), + (epoch1, r_value, w_value, quality, error_desc), ... ] - decimate may be False, True, an aggregation method or just an interval in seconds + """ + return self.get_attributes_values([attribute], start_date, stop_date, decimate, False, params)[attribute] + + def get_attributes_values(self, attributes, + start_date, stop_date=None, + decimate=None, + correlate = False, + columns = 'time, r_value', + **params): + """ + Returns attributes values between start and stop dates + , using decimation or not, correlating the values or not. + + arguments: + attributes: a list of the attributes' fqdn + start_date: datetime, beginning of the period to query. + stop_date: datetime, end of the period to query. + if None, now() is used. + decimate: aggregation function to use in the form: + {'timedelta0':(MIN, MAX, …) + , 'timedelta1':(AVG, COUNT, …) + , …} + if None, returns raw data. + correlate: if True, data is generated so that + there is available data for each timestamp of + each attribute. + columns: columns separated by commas + time, r_value, w_value, quality, error_desc - if correlate is True, attributes with no value in the interval will be correlated + returns: + {'attr0':[(epoch0, r_value, w_value, quality, error_desc), + (epoch1, r_value, w_value, quality, error_desc), + ... ], + 'attr1':[(…),(…)]} """ - return {'attr0':[(time.time(), 0., 0., 0)], 'attr1':[(time.time(), 0., 0., 0)]} + return {'attr0': [(time.time(), 0., 0., 0, '')] + , 'attr1': [(time.time(), 0., 0., 0, '')]} + +############################################################################### + +__usage__ = """ +Usage: + +:> reader : print this help + +:> reader [options] list [pattern] : + returns matching attributes from database + +:> reader [options] : + print last value for attribute + +:> reader [options] : + returns values for attribute between given dates + +Options (at least some is needed): + --prompt + --config=user:password@host:port/database + --database= + --host= + --port= + --user= + --password= + +""" + +def main(apiclass=AbstractReader,timeformatter=None): + import sys + + args = [a for a in sys.argv[1:] if not a.startswith('-')] + opts = dict([a.strip('-').split('=') for a in sys.argv[1:] + if a not in args and '=' in a]) + if '--prompt' in sys.argv: + opts['host'] = input('host:') + opts['database'] = input('database:') + opts['user'] = input('user:') + opts['password'] = input('password:') + opts['port'] = input('port(3306):') or '3306' + + if not args or not opts: + print(__usage__) + sys.exit(0) + + reader = apiclass(**opts) + if args[0] == 'list': + pattern = (args[1:] or [''])[0] + print('\n'.join(reader.get_attributes(pattern=pattern))) + else: + if args[1:]: + data = reader.get_attribute_values(args[0],args[1],args[2], + decimate=True) + for d in data: + l = '\t'.join(map(str,d)) + if timeformatter: + print('%s\t%s' % (timeformatter(d[0]),l)) + else: + print(l) + else: + print(reader.get_attribute_id_table(args[0])) + print(reader.get_last_attribute_value(args[0])) + +if __name__ == '__main__': + main() diff --git a/share/mariadb_reader.py b/share/mariadb_reader.py new file mode 100755 index 0000000..a291f32 --- /dev/null +++ b/share/mariadb_reader.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python3 + +import sys, re, traceback +from timeutils import * +import AbstractReader + +try: + import pymysql as mariadb +except: + import MySQLdb as mariadb + + + +class MariadbReader(AbstractReader.AbstractReader): + """ + read-only API for hdb++ databases, based on PyTangoArchiving AbstractReader + """ + + def __init__(self,config='',**kwargs): + """ + Arguments accepted by pymysql connections: + + :param host: Host where the database server is located + :param user: Username to log in as + :param password: Password to use. + :param database: Database to use, None to not use a particular one. + :param port: MySQL port to use, default is usually OK. (default: 3306) + :param bind_address: When the client has multiple network interfaces, specify + the interface from which to connect to the host. Argument can be + a hostname or an IP address. + :param unix_socket: Optionally, you can use a unix socket rather than TCP/IP. + :param read_timeout: The timeout for reading from the connection in seconds (default: None - no timeout) + :param write_timeout: The timeout for writing to the connection in seconds (default: None - no timeout) + :param charset: Charset you want to use. + :param sql_mode: Default SQL_MODE to use. + :param read_default_file: + Specifies my.cnf file to read these parameters from under the [client] section. + :param conv: + Conversion dictionary to use instead of the default one. + This is used to provide custom marshalling and unmarshaling of types. + See converters. + :param use_unicode: + Whether or not to default to unicode strings. + This option defaults to true for Py3k. + :param client_flag: Custom flags to send to MySQL. Find potential values in constants.CLIENT. + :param cursorclass: Custom cursor class to use. + :param init_command: Initial SQL statement to run when connection is established. + :param connect_timeout: Timeout before throwing an exception when connecting. + (default: 10, min: 1, max: 31536000) + :param ssl: + A dict of arguments similar to mysql_ssl_set()'s parameters. + :param read_default_group: Group to read from in the configuration file. + :param compress: Not supported + :param named_pipe: Not supported + :param autocommit: Autocommit mode. None means use server default. (default: False) + :param local_infile: Boolean to enable the use of LOAD DATA LOCAL command. (default: False) + :param max_allowed_packet: Max size of packet sent to server in bytes. (default: 16MB) + Only used to limit size of "LOAD LOCAL INFILE" data packet smaller than default (16KB). + :param defer_connect: Don't explicitly connect on contruction - wait for connect call. + (default: False) + :param auth_plugin_map: A dict of plugin names to a class that processes that plugin. + The class will take the Connection object as the argument to the constructor. + The class needs an authenticate method taking an authentication packet as + an argument. For the dialog plugin, a prompt(echo, prompt) method can be used + (if no authenticate method) for returning a string from the user. (experimental) + :param server_public_key: SHA256 authenticaiton plugin public key value. (default: None) + :param db: Alias for database. (for compatibility to MySQLdb) + :param passwd: Alias for password. (for compatibility to MySQLdb) + :param binary_prefix: Add _binary prefix on bytes and bytearray. (default: False) + """ + if config and isinstance(config,(str,bytes)): + config = self.parse_config(config) + + + self.config = config or {} + self.config.update(kwargs) + + self.database = self.config.get('database','hdbpp') + self.user = self.config.get('user','') + self.password = self.config.get('password','') + self.port = int(self.config.get('port','3306')) + self.host = self.config.get('host','localhost') + + #print([(k,v) for k,v in self.__dict__.items() + #if k not in type(self).__dict__()]) + + self.db = mariadb.connect(database=self.database, + user=self.user, password=self.password, port=self.port, + host=self.host) + self._cursor = self.db.cursor() + + def __del__(self): + self._cursor.close() + self.db.close() + + def _query(self,query,prune=False): + """ + query: SQL code + """ + #print(query) + self._cursor.execute(query) + if prune: + r,l = [],True + while l: + try: + l = self._cursor.fetchone() + if l and (not r or l[1:] != r[-1][1:]): + r.append(l) + except: + print(r[-1:], l) + traceback.print_exc() + break + return r + else: + return self._cursor.fetchall() + + def parse_config(self,config): + """ + config string as user:password@host:port/database + or dictionary like + """ + try: + if re.match('.*[:].*[@].*',config): + h = config.split('@') + u,p = h[0].split(':') + config = {'user':u,'password':p} + if '/' in h[1]: + config['host'],config['database'] = h[1].split('/') + else: + config['host'] = h[1] + if ':' in config['host']: + config['host'],config['port'] = config['host'].split(':') + else: + if '{' not in config: + config = '{%s}' % config.replace(';',',') + config = dict(eval(config)) + except: + raise Exception('Wrong format in config, should be dict-like') + return config + + def get_attributes(self, active=False, pattern=''): + """ + Queries the database for the current list of archived attributes. + arguments: + active: True: only attributes currently archived + False: all attributes, even the one not archiving anymore + regexp: '' :filter for attributes to retrieve + """ + q = 'select att_name from att_conf' + if pattern: + q += " where att_name like '%s'" % pattern.replace('*','%') + #print(q) + return [str(a[0]).lower() for a in self._query(q) if a] + + def get_attribute_name(self,attribute): + attribute = str(attribute).lower() + if ':' not in attribute: + attribute = '%' + '/' + attribute + + elif '.' not in attribute: + attribute = attribute.rsplit(':',1) + attribute = attribute[0] + '.%' + attribute[1] + + if 'tango' not in attribute: + attribute = '%' + '/' + attribute + + attrs = self.get_attributes(pattern=attribute) + if len(attrs)!=1: + raise Exception('MultipleAttributeMatches') + + return attrs[0] if attrs else '' + + def is_attribute_archived(self, attribute, active=False): + """ + Returns if an attribute has values in DB. + + arguments: + attribute: fqdn for the attribute. + active: if true, only check for active attributes, + otherwise check all. + """ + return bool(self.get_attribute_name(attribute)) + + def get_attribute_id_table(self, attribute=''): + """ + for each matching attribute returns name, ID and table name + """ + q = "select att_name,att_conf_id,data_type " + q += " from att_conf, att_conf_data_type where " + q += "att_conf.att_conf_data_type_id = att_conf_data_type.att_conf_data_type_id" + if attribute: + q += " and att_name like '%s'" % attribute + + return [(a,i,'att_'+t) for (a,i,t) in self._query(q)] + + def get_last_attributes_values(self, attributes, columns = '', n = 1): + """ + Returns last values inserted in DB for a list of attributes + + arguments: + attribute: fqdn for the attribute. + columns: requested columns separated by commas + returns: + {'att1':(epoch, r_value, w_value, quality, error_desc), + 'att2':(epoch, r_value, w_value, quality, error_desc), + ... + } + """ + data = {} + columns = columns or 'data_time, value_r, quality, att_error_desc_id' + + for a in attributes: + try: + a,i,t = self.get_attribute_id_table(a)[0] + tdesc = str(self._query('describe %s'%t)) + tcol = ('int_time' if 'int_time' in tdesc else 'data_time') + cols = ','.join(c for c in columns.split(',') + if c.strip() in tdesc) + data[a] = self._query('select %s from %s where ' + 'att_conf_id = %s order by %s desc limit %s' + % (cols, t, i, tcol, n)) + except: + raise Exception('AttributeNotFound: %s' % a) + + return data + + def get_attributes_values(self, attributes, + start_date, stop_date=None, + decimate=None, + correlate = False, + columns = '', + **params): + """ + Returns attributes values between start and stop dates + , using decimation or not, correlating the values or not. + + arguments: + attributes: a list of the attributes' fqdn + start_date: datetime, beginning of the period to query. + stop_date: datetime, end of the period to query. + if None, now() is used. + decimate: aggregation function to use in the form: + {'timedelta0':(MIN, MAX, …) + , 'timedelta1':(AVG, COUNT, …) + , …} + if None, returns raw data. + correlate: if True, data is generated so that + there is available data for each timestamp of + each attribute. + columns: columns separated by commas + time, r_value, w_value, quality, error_desc + + returns: + {'attr0':[(epoch0, r_value, w_value, quality, error_desc), + (epoch1, r_value, w_value, quality, error_desc), + ... ], + 'attr1':[(…),(…)]} + """ + data = {} + columns = columns or 'data_time, value_r, quality, att_error_desc_id' + if isinstance(start_date,(int,float)): + start_date = time2str(start_date) + if stop_date is None: + stop_date = now() + if isinstance(stop_date,(int,float)): + stop_date = time2str(stop_date) + + for a in attributes: + try: + a,i,t = self.get_attribute_id_table(a)[0] + tdesc = str(self._query('describe %s'%t)) + tcol = ('int_time' if 'int_time' in tdesc else 'data_time') + if tcol == 'int_time': + b,e = str2time(start_date),str2time(stop_date) + else: + b,e = "'%s'" % start_date, "'%s'" % stop_date + + cols = ','.join(c for c in columns.split(',') + if c.strip() in tdesc) + print(cols) + if 'data_time,' in cols: + cols = cols.replace('data_time,', + 'CAST(UNIX_TIMESTAMP(data_time) AS DOUBLE),') + data[a] = self._query('select %s from %s where ' + 'att_conf_id = %s and %s between %s and %s ' + 'order by data_time' + % (cols, t, i, tcol, b, e), prune=decimate) + except: + import traceback + traceback.print_exc() + #raise Exception('AttributeNotFound: %s' % a) + + return data + + return {'attr0': [(time.time(), 0., 0., 0, '')] + , 'attr1': [(time.time(), 0., 0., 0, '')]} + + +############################################################################## + +if __name__ == '__main__': + AbstractReader.main(apiclass=MariadbReader,timeformatter=time2str) + diff --git a/share/timeutils.py b/share/timeutils.py new file mode 100644 index 0000000..d40ce9d --- /dev/null +++ b/share/timeutils.py @@ -0,0 +1,227 @@ +######################################################################## +## Time conversion methods from Fandango +######################################################################## + +import time, datetime, re, traceback + +END_OF_TIME = 1024*1024*1024*2-1 #Jan 19 04:14:07 2038 + +TIME_UNITS = { 'ns': 1e-9, 'us': 1e-6, 'ms': 1e-3, '': 1, 's': 1, 'm': 60, + 'h': 3600, 'd': 86.4e3, 'w': 604.8e3, 'M': 30*86.4e3, 'y': 31.536e6 } +TIME_UNITS.update((k.upper(),v) for k,v in list(TIME_UNITS.items()) if k!='m') + +#@todo: RAW_TIME should be capable to parse durations as of ISO 8601 +RAW_TIME = ('^(?:P)?([+-]?[0-9]+[.]?(?:[0-9]+)?)(?: )?(%s)$' + % ('|').join(TIME_UNITS)) # e.g. 3600.5 s + +MYSQL_TIME_FORMAT = '%Y-%m-%d %H:%M:%S' +ISO_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S' + +global DEFAULT_TIME_FORMAT +DEFAULT_TIME_FORMAT = MYSQL_TIME_FORMAT + +ALT_TIME_FORMATS = [ ('%s%s%s' % ( + date.replace('-',dash),separator if hour else '',hour)) + for date in ('%Y-%m-%d','%y-%m-%d','%d-%m-%Y', + '%d-%m-%y','%m-%d-%Y','%m-%d-%y') + for dash in ('-','/') + for separator in (' ','T') + for hour in ('%H:%M','%H:%M:%S','%H','')] + +def set_default_time_format(dtf, test = True): + """ + Usages: + + fandango.set_default_time_format('%Y-%m-%d %H:%M:%S') + + or + + fandango.set_default_time_format(fandango.ISO_TIME_FORMAT) + + """ + if test: + str2time(time2str(cad = dtf), cad = dtf) + global DEFAULT_TIME_FORMAT + DEFAULT_TIME_FORMAT = dtf + +def now(): + return time.time() + +def time2tuple(epoch=None, utc=False): + if epoch is None: epoch = now() + elif epoch<0: epoch = now()-epoch + if utc: + return time.gmtime(epoch) + else: + return time.localtime(epoch) + +def tuple2time(tup): + return time.mktime(tup) + +def date2time(date,us=True): + """ + This method would accept both timetuple and timedelta + in order to deal with times coming from different + api's with a single method + """ + try: + t = tuple2time(date.timetuple()) + us = us and getattr(date,'microsecond',0) + if us: t+=us*1e-6 + return t + except Exception as e: + try: + return date.total_seconds() + except: + raise e + +def date2str(date, cad = '', us=False): + #return time.ctime(date2time(date)) + global DEFAULT_TIME_FORMAT + cad = cad or DEFAULT_TIME_FORMAT + t = time.strftime(cad, time2tuple(date2time(date))) + us = us and getattr(date,'microsecond',0) + if us: t+='.%06d'%us + return t + +def time2date(epoch=None): + if epoch is None: epoch = now() + elif epoch<0: epoch = now()-epoch + return datetime.datetime.fromtimestamp(epoch) + +def utcdiff(t=None): + return now() - date2time(datetime.datetime.utcnow()) + +def time2str(epoch=None, cad='', us=False, bt=True, + utc=False, iso=False): + """ + cad: introduce your own custom format (see below) + use DEFAULT_TIME_FORMAT to set a default one + us=False; True to introduce ms precission + bt=True; negative epochs are considered relative from now + utc=False; if True it converts to UTC + iso=False; if True, 'T' will be used to separate date and time + + cad accepts the following formats: + + %a Locale’s abbreviated weekday name. + %A Locale’s full weekday name. + %b Locale’s abbreviated month name. + %B Locale’s full month name. + %c Locale’s appropriate date and time representation. + %d Day of the month as a decimal number [01,31]. + %H Hour (24-hour clock) as a decimal number [00,23]. + %I Hour (12-hour clock) as a decimal number [01,12]. + %j Day of the year as a decimal number [001,366]. + %m Month as a decimal number [01,12]. + %M Minute as a decimal number [00,59]. + %p Locale’s equivalent of either AM or PM. (1) + %S Second as a decimal number [00,61]. (2) + %U Week number of the year (Sunday as the first day of the week) as a decimal number [00,53]. + All days in a new year preceding the first Sunday are considered to be in week 0. (3) + %w Weekday as a decimal number [0(Sunday),6]. + %W Week number of the year (Monday as the first day of the week) as a decimal number [00,53]. + All days in a new year preceding the first Monday are considered to be in week 0. (3) + %x Locale’s appropriate date representation. + %X Locale’s appropriate time representation. + %y Year without century as a decimal number [00,99]. + %Y Year with century as a decimal number. + %Z Time zone name (no characters if no time zone exists). + %% A literal '%' character. + """ + if epoch is None: epoch = now() + elif bt and epoch<0: epoch = now()+epoch + global DEFAULT_TIME_FORMAT + if cad: + cad = 'T'.join(cad.split(' ',1)) if iso else cad + else: + cad = ISO_TIME_FORMAT if iso else DEFAULT_TIME_FORMAT + + t = time.strftime(cad,time2tuple(epoch,utc=utc)) + us = us and epoch%1 + if us: t+='.%06d'%(1e6*us) + return t + +epoch2str = time2str + +def str2time(seq='', cad='', throw=True, relative=False): + """ + :param seq: Date must be in ((Y-m-d|d/m/Y) (H:M[:S]?)) format or -N [d/m/y/s/h] + + See RAW_TIME and TIME_UNITS to see the units used for pattern matching. + + The conversion itself is done by time.strptime method. + + :param cad: You can pass a custom time format + :param relative: negative times will be converted to now()-time + :param throw: if False, None is returned instead of exception + """ + try: + if seq in (None,''): + return time.time() + if 'NOW-' in seq: + seq,relative = seq.replace('NOW',''),True + elif seq=='NOW': + return now() + + t, seq = None, str(seq).strip() + if not cad: + m = re.match(RAW_TIME,seq) + if m: + #Converting from a time(unit) format + value,unit = m.groups() + t = float(value)*TIME_UNITS[unit] + return t # must return here + + #Converting from a date format + ms = re.match('.*(\.[0-9]+)$',seq) #Splitting the decimal part + if ms: + ms,seq = float(ms.groups()[0]),seq.replace(ms.groups()[0],'') + + if t is None: + #tf=None will try default system format + global DEFAULT_TIME_FORMAT + time_fmts = ([cad] if cad else + [DEFAULT_TIME_FORMAT,None] + ALT_TIME_FORMATS) + for tf in time_fmts: + try: + tf = (tf,) if tf else () + t = time.strptime(seq,*tf) + break + except: + pass + + v = time.mktime(t)+(ms or 0) + if relative and v<0: + v = fn.now()-v + return v + except: + if throw: + raise Exception('PARAMS_ERROR','unknown time format: %s' % seq) + else: + return None + + +str2epoch = str2time + +def time2gmt(epoch=None): + if epoch is None: epoch = now() + return tuple2time(time.gmtime(epoch)) + +def timezone(): + t = now() + return old_div(int(t-time2gmt(t)),3600) + +#Auxiliary methods: +def ctime2time(time_struct): + try: + return (float(time_struct.tv_sec)+1e-6*float(time_struct.tv_usec)) + except: + return -1 + +def mysql2time(mysql_time,us=True): + try: + return date2time(mysql_time,us=us) + #t = time.mktime(mysql_time.timetuple()) + except: + return -1