Commit eb0ad02e authored by Cristian Aguirre's avatar Cristian Aguirre

Update match-and-exclude-records-actions_v1.py. Add new parameter...

Update match-and-exclude-records-actions_v1.py. Add new parameter create-tmp-table and force numeric field in 'amount-column'
parent 8d0caf60
...@@ -30,11 +30,13 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -30,11 +30,13 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.identifier = None self.identifier = None
self.max_combinations = None self.max_combinations = None
self.timeout = None self.timeout = None
self.create_tmp_table = None
self.exclude_pivot = None self.exclude_pivot = None
self.pivot_params = None self.pivot_params = None
self.ctp_params = None self.ctp_params = None
self.output = None self.output = None
self.config_params = ["max-record-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot"] self.config_params = ["max-record-per-combinations", "max-timeout-per-combinations", "exclude-entity-pivot",
"create-tmp-table"]
def parser(self, descriptor: Dict[str, Any]): def parser(self, descriptor: Dict[str, Any]):
self.app.logger.info(f"Descriptor recogido: {descriptor}") self.app.logger.info(f"Descriptor recogido: {descriptor}")
...@@ -65,6 +67,7 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -65,6 +67,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
self.max_combinations = configs["max-record-per-combinations"] self.max_combinations = configs["max-record-per-combinations"]
self.timeout = int(configs["max-timeout-per-combinations"]) // 1000 # Miliseconds self.timeout = int(configs["max-timeout-per-combinations"]) // 1000 # Miliseconds
self.exclude_pivot = configs["exclude-entity-pivot"] self.exclude_pivot = configs["exclude-entity-pivot"]
self.create_tmp_table = configs["create-tmp-table"]
self.pivot_params = pivot_params self.pivot_params = pivot_params
self.ctp_params = ctp_params self.ctp_params = ctp_params
...@@ -88,7 +91,25 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -88,7 +91,25 @@ class MatchAndExcludeRecordsAction(ActionInterface):
ctp_df = dd.read_sql_table(ctp_table, dialect, index_col=self.ctp_params["id-column"], ctp_df = dd.read_sql_table(ctp_table, dialect, index_col=self.ctp_params["id-column"],
npartitions=mp.cpu_count()*2, columns=total_ctp_cols) npartitions=mp.cpu_count()*2, columns=total_ctp_cols)
ctp_df = ctp_df.reset_index() ctp_df = ctp_df.reset_index()
self.app.logger.debug(f"Insumos cargados - {self.identifier}")
# Forzamos a que los datos que se restaran (usualmente el monto) sea númerico
pivot_df[self.pivot_params["amount-column"]] = pivot_df[self.pivot_params["amount-column"]].astype(float).round(ROUND_DECIMAL)
ctp_df[self.ctp_params["amount-column"]] = ctp_df[self.ctp_params["amount-column"]].astype(float).round(ROUND_DECIMAL)
self.app.logger.info(f"Insumos cargados - {self.identifier}")
if self.create_tmp_table:
import random
from sqlalchemy import text
# Creamos las tablas temporales del pivot y contraparte con sufijo N° aleatorio de 6 dígitos
random_number = random.randint(100000, 999999)
pivot_query = f"CREATE TABLE ENGINE_{pivot_table}_{random_number} AS SELECT * FROM {pivot_table}"
ctp_query = f"CREATE TABLE ENGINE_{ctp_table}_{random_number} AS SELECT * FROM {ctp_table}"
source_obj.create_engine()
engine = source_obj.engine
with engine.connect() as conn:
conn.execute(text(pivot_query))
conn.execute(text(ctp_query))
# Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input # Agregar un prefijo a cada columna, tanto del pivot como contraparte. Actualizar campos del input
# pivot: 'PIVOT_', contraparte: 'COUNTERPART_' # pivot: 'PIVOT_', contraparte: 'COUNTERPART_'
# Iterar sobre las columnas del DataFrame # Iterar sobre las columnas del DataFrame
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment