Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests for formato_sql.py module #157

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
125 changes: 68 additions & 57 deletions formatos/formato_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@


def esquema_sql(tipos_registro, conf={}):
from formato_txt import A, N, I
from .formato_txt import A, N, I

for tabla, formato in tipos_registro:
sql = []
Expand Down Expand Up @@ -92,16 +92,20 @@ def configurar(schema):
tablas = {}
campos = {}
campos_rev = {}
if not schema:
for tabla in "encabezado", "detalle", "cmp_asoc", "permiso", "tributo", "iva":
tablas[tabla] = tabla
campos[tabla] = {"id": "id"}
campos_rev[tabla] = dict([(v, k) for k, v in list(campos[tabla].items())])
for tabla in "encabezado", "detalle", "cmp_asoc", "permiso", "tributo", "iva":
tablas[tabla] = tabla
campos[tabla] = {"id": "id"}
campos_rev[tabla] = {"id": "id"}
if schema and tabla in schema:
for campo, valor in schema[tabla].items():
campos[tabla][campo] = valor
campos_rev[tabla][valor] = campo
return tablas, campos, campos_rev


def ejecutar(cur, sql, params=None):
##print sql, params
if DEBUG:
print(sql, params)
if params is None:
return cur.execute(sql)
else:
Expand All @@ -117,7 +121,8 @@ def max_id(db, schema={}):
ret = None
try:
ejecutar(cur, query)
for row in cur:
row = cur.fetchone()
if row:
ret = row[0]
if not ret:
ret = 0
Expand All @@ -128,19 +133,19 @@ def max_id(db, schema={}):


def redondear(formato, clave, valor):
from formato_txt import A, N, I
from pyafipws.formatos.formato_txt import A, N, I

# corregir redondeo (aparentemente sqlite no guarda correctamente los decimal)
import decimal

try:
long = [fmt[1] for fmt in formato if fmt[0] == clave]
longitud = [fmt[1] for fmt in formato if fmt[0] == clave]
tipo = [fmt[2] for fmt in formato if fmt[0] == clave]
if not tipo:
return valor
tipo = tipo[0]
if DEBUG:
print("tipo", tipo, clave, valor, int)
print("tipo", tipo, clave, valor, longitud)
if valor is None:
return None
if valor == "":
Expand All @@ -153,8 +158,8 @@ def redondear(formato, clave, valor):
valor = str(valor)
if isinstance(valor, basestring):
valor = Decimal(valor)
if int and isinstance(int[0], (tuple, list)):
decimales = old_div(Decimal("1"), Decimal(10 ** (int[0][1])))
if longitud and isinstance(longitud[0], (tuple, list)):
decimales = old_div(Decimal("1"), Decimal(10 ** (longitud[0][1])))
else:
decimales = Decimal(".01")
valor1 = valor.quantize(decimales, rounding=decimal.ROUND_DOWN)
Expand All @@ -163,10 +168,11 @@ def redondear(formato, clave, valor):
return valor1
except Exception as e:
print("IMPOSIBLE REDONDEAR:", clave, valor, e)
return None


def escribir(facts, db, schema={}, commit=True):
from formato_txt import ENCABEZADO, DETALLE, TRIBUTO, IVA, CMP_ASOC, PERMISO, DATO
from .formato_txt import ENCABEZADO, DETALLE, TRIBUTO, IVA, CMP_ASOC, PERMISO, DATO

tablas, campos, campos_rev = configurar(schema)
cur = db.cursor()
Expand Down Expand Up @@ -326,7 +332,7 @@ def escribir(facts, db, schema={}, commit=True):


def modificar(fact, db, schema={}, webservice="wsfev1", ids=None, conf_db={}):
from formato_txt import ENCABEZADO, DETALLE, TRIBUTO, IVA, CMP_ASOC, PERMISO, DATO
from .formato_txt import ENCABEZADO, DETALLE, TRIBUTO, IVA, CMP_ASOC, PERMISO, DATO

update = [
"cae",
Expand Down Expand Up @@ -372,7 +378,7 @@ def modificar(fact, db, schema={}, webservice="wsfev1", ids=None, conf_db={}):


def leer(db, schema={}, webservice="wsfev1", ids=None, **kwargs):
from formato_txt import ENCABEZADO, DETALLE, TRIBUTO, IVA, CMP_ASOC, PERMISO, DATO
from .formato_txt import ENCABEZADO, DETALLE, TRIBUTO, IVA, CMP_ASOC, PERMISO, DATO

tablas, campos, campos_rev = configurar(schema)
cur = db.cursor()
Expand All @@ -399,11 +405,7 @@ def leer(db, schema={}, webservice="wsfev1", ids=None, **kwargs):
encabezado = {}
for i, k in enumerate(description):
val = row[i]
if isinstance(val, str):
val = val.decode(CHARSET)
if isinstance(val, basestring):
val = val.strip()
key = campos_rev["encabezado"].get(k[0], k[0].lower())
key = campos_rev["encabezado"].get(k[0], k[0]) #Instead of calling lower() on k[0], directly use k[0] as the default value if the key is not found
val = redondear(ENCABEZADO, key, val)
encabezado[key] = val
##print encabezado
Expand All @@ -412,23 +414,22 @@ def leer(db, schema={}, webservice="wsfev1", ids=None, **kwargs):
print(
("SELECT * FROM %(detalle)s WHERE %%(id)s = ?" % tablas)
% campos["detalle"],
[encabezado["id"]],
[encabezado.get("id")], #Instead of directly accessing the "id" key, we can use encabezado.get("id") to safely retrieve the value and avoid a KeyError if the key is not present.
)
ejecutar(
cur,
("SELECT * FROM %(detalle)s WHERE %%(id)s = ?" % tablas)
% campos["detalle"],
[encabezado["id"]],
[encabezado.get("id")],
)
for it in cur.fetchall():
detalle = {}
for i, k in enumerate(cur.description):
val = it[i]
if isinstance(val, str):
val = val.decode(CHARSET)
key = campos_rev["detalle"].get(k[0], k[0].lower())
val = redondear(DETALLE, key, val)
detalle[key] = val
if i < len(it):
val = it[i] #removal of the val.decode(CHARSET) line, as it is not needed in Python 3.
key = campos_rev["detalle"].get(k[0], k[0])
val = redondear(DETALLE, key, val)
detalle[key] = val
detalles.append(detalle)
encabezado["detalles"] = detalles

Expand All @@ -437,20 +438,21 @@ def leer(db, schema={}, webservice="wsfev1", ids=None, **kwargs):
print(
("SELECT * FROM %(cmp_asoc)s WHERE %%(id)s = ?" % tablas)
% campos["cmp_asoc"],
[encabezado["id"]],
[encabezado.get("id")],
)
ejecutar(
cur,
("SELECT * FROM %(cmp_asoc)s WHERE %%(id)s = ?" % tablas)
% campos["cmp_asoc"],
[encabezado["id"]],
[encabezado.get("id")],
)
for it in cur.fetchall():
cmp_asoc = {}
for i, k in enumerate(cur.description):
val = it[i]
key = campos_rev["cmp_asoc"].get(k[0], k[0].lower())
cmp_asoc[key] = val
if i < len(it):
val = it[i]
key = campos_rev["cmp_asoc"].get(k[0], k[0])
cmp_asoc[key] = val
cmps_asoc.append(cmp_asoc)
if cmps_asoc:
encabezado["cbtes_asoc"] = cmps_asoc
Expand All @@ -460,20 +462,21 @@ def leer(db, schema={}, webservice="wsfev1", ids=None, **kwargs):
print(
("SELECT * FROM %(permiso)s WHERE %%(id)s = ?" % tablas)
% campos["permiso"],
[encabezado["id"]],
[encabezado.get("id")],
)
ejecutar(
cur,
("SELECT * FROM %(permiso)s WHERE %%(id)s = ?" % tablas)
% campos["permiso"],
[encabezado["id"]],
[encabezado.get("id")],
)
for it in cur.fetchall():
permiso = {}
for i, k in enumerate(cur.description):
val = it[i]
key = campos_rev["permiso"].get(k[0], k[0].lower())
permiso[key] = val
if i < len(it):
val = it[i]
key = campos_rev["permiso"].get(k[0], k[0])
permiso[key] = val
permisos.append(permiso)
if permisos:
encabezado["permisos"] = permisos
Expand All @@ -483,20 +486,21 @@ def leer(db, schema={}, webservice="wsfev1", ids=None, **kwargs):
print(
("SELECT * FROM %(iva)s WHERE %%(id)s = ?" % tablas)
% campos["iva"],
[encabezado["id"]],
[encabezado.get("id")],
)
ejecutar(
cur,
("SELECT * FROM %(iva)s WHERE %%(id)s = ?" % tablas) % campos["iva"],
[encabezado["id"]],
[encabezado.get("id")],
)
for it in cur.fetchall():
iva = {}
for i, k in enumerate(cur.description):
val = it[i]
key = campos_rev["iva"].get(k[0], k[0].lower())
val = redondear(IVA, key, val)
iva[key] = val
if i < len(it):
val = it[i]
key = campos_rev["iva"].get(k[0], k[0])
val = redondear(IVA, key, val)
iva[key] = val
ivas.append(iva)
if ivas:
encabezado["ivas"] = ivas
Expand All @@ -506,21 +510,22 @@ def leer(db, schema={}, webservice="wsfev1", ids=None, **kwargs):
print(
("SELECT * FROM %(tributo)s WHERE %%(id)s = ?" % tablas)
% campos["tributo"],
[encabezado["id"]],
[encabezado.get("id")],
)
ejecutar(
cur,
("SELECT * FROM %(tributo)s WHERE %%(id)s = ?" % tablas)
% campos["tributo"],
[encabezado["id"]],
[encabezado.get("id")],
)
for it in cur.fetchall():
tributo = {}
for i, k in enumerate(cur.description):
val = it[i]
key = campos_rev["tributo"].get(k[0], k[0].lower())
val = redondear(TRIBUTO, key, val)
tributo[key] = val
if i < len(it):
val = it[i]
key = campos_rev["tributo"].get(k[0], k[0])
val = redondear(TRIBUTO, key, val)
tributo[key] = val
tributos.append(tributo)
if tributos:
encabezado["tributos"] = tributos
Expand All @@ -531,9 +536,9 @@ def leer(db, schema={}, webservice="wsfev1", ids=None, **kwargs):
cur.close()


def ayuda():
print("-- Formato:")
from formato_txt import ENCABEZADO, DETALLE, TRIBUTO, IVA, CMP_ASOC, DATO, PERMISO
def ayuda(file=None):
print("-- Formato:", file=file) # modified to include the file parameter, which redirects the output to the specified file-like object.
from .formato_txt import ENCABEZADO, DETALLE, TRIBUTO, IVA, CMP_ASOC, DATO, PERMISO

tipos_registro = [
("encabezado", ENCABEZADO),
Expand All @@ -544,10 +549,16 @@ def ayuda():
("permiso", PERMISO),
("dato", DATO),
]
print("-- Esquema:")
for sql in esquema_sql(tipos_registro):
print(sql)
print("-- Esquema:", file=file)
try:
for sql in esquema_sql(tipos_registro):
print(sql, file=file)
except Exception as e:
print(f"Error al generar esquema SQL: {str(e)}", file=file)

# The file parameter is added to the function signature,
# Which allows specifying the file-like object to which the output should be redirected.
# It defaults to None, which means the output will be printed to the console.

if __name__ == "__main__":
ayuda()
Loading
Loading