Commit 06da121b authored by Cristian Aguirre's avatar Cristian Aguirre

Update action-exclude-records-v1.1

parent fe0e2748
......@@ -4,6 +4,7 @@ from itertools import combinations
import multiprocessing as mp
import numpy as np
import pandas as pd
from numba import njit
from parallel_pandas import ParallelPandas
from concurrent.futures import ThreadPoolExecutor
......@@ -187,15 +188,18 @@ class MatchAndExcludeRecordsAction(ActionInterface):
df3 = df3.toPandas()
total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"]
ParallelPandas.initialize(n_cpu=mp.cpu_count(), split_factor=8, disable_pr_bar=True)
resultado = df3[total_cols].groupby(group_cols).p_apply(lambda x: custom_func(x, amount_col, id_col, comb_per_group, max_combinations))
# ParallelPandas.initialize(n_cpu=mp.cpu_count(), split_factor=8, disable_pr_bar=True)
df3 = df3.sort_values(group_cols + [amount_col])
resultado = df3[total_cols].groupby(group_cols).apply(lambda x: custom_func(x, amount_col, id_col, max_combinations))
resultado = resultado.reset_index()
if len(resultado.columns) == 1:
resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"])
else:
resultado.columns = group_cols + ["LISTA_DIFF"]
# print(resultado["LISTA_DIFF"].apply(lambda x: x if pd.notna(x) and x[0]!=-1 else x))
meged2 = resultado.merge(merged_df.toPandas(), 'left', group_cols)
print(meged2)
meged2["LISTA_DIFF"] = meged2["LISTA_DIFF"].apply(self.handle_array)
meged2 = meged2[(meged2['DIFF'] == 0) | ((meged2['DIFF'] != 0) & (meged2['LISTA_DIFF'].apply(len) > 0))]
if meged2.empty:
......@@ -217,7 +221,8 @@ class MatchAndExcludeRecordsAction(ActionInterface):
return self.output
def handle_array(self, x):
if isinstance(x, List):
# print(type(x))
if isinstance(x, np.ndarray):
return x
else:
return []
......@@ -245,7 +250,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
raise Exception(f"Error creando sesion Spark. {e}")
def custom_func(group, amount_field, id_field, max_comb_per_group, max_combinations):
def custom_func(group, amount_field, id_field, max_combinations):
diff = group["DIFF"].values[0]
if pd.isna(diff) or diff == 0:
return None
......@@ -253,50 +258,84 @@ def custom_func(group, amount_field, id_field, max_comb_per_group, max_combinati
group[amount_field] = group[amount_field].astype(float)
group = group.reset_index(drop=True)
values = group[amount_field].values
indexs = group.index.values
np.random.shuffle(indexs)
tam = len(values)
rang = range(1, tam + 1) if tam <= max_combinations else range(1, max_combinations + 1)
final = None
stop_event = False
def buscar_combinacion(i):
nonlocal final, stop_event
if not stop_event:
for index, comb in enumerate(combinations(indexs, i)):
if stop_event or index > max_comb_per_group:
break
elif np.sum(values[list(comb)]).round(ROUND_DECIMAL) == diff:
final = group.loc[list(comb), id_field].tolist()
stop_event = True
break
return None
with ThreadPoolExecutor() as executor:
futures = [executor.submit(buscar_combinacion, i) for i in rang]
for future in futures:
try:
future.result()
except TimeoutError:
stop_event = True
break
if stop_event or final is not None:
break
return final
ids = group[id_field].values
n = len(values)
valores1 = encontrar_comb_1(values, diff)
if valores1[0] != -1:
indices = ids[valores1]
return indices
valores2 = encontrar_comb_2(values, diff, n)
if valores2[0] != -1:
indices = ids[valores2]
return indices
# Iterar sobre todos los índices posibles
# valores4 = encontrar_comb_4(values, diff, n)
# if valores4[0] != -1:
# indices = ids[valores4]
# return indices
valores5 = encontrar_comb_5(values, diff, n)
if valores5[0] != -1:
indices = ids[valores5]
return indices
@njit
def encontrar_comb_1(valores, target):
indice = [-1]
for idx, value in enumerate(valores):
suma = value
if round(suma, ROUND_DECIMAL) == target:
indice = [idx for idx, val in enumerate(valores) if val in [value]]
return indice
return indice
@njit
def encontrar_comb_2(valores, target, n):
indice = [-1]
for i in range(n):
array_except = np.delete(valores, i)
for idx, value in enumerate(array_except):
suma = value + valores[i]
if round(suma, ROUND_DECIMAL) == target:
indice = [idx for idx, val in enumerate(valores) if val in [value, valores[i]]]
return indice
return indice
@njit
def encontrar_comb_4(valores, target, n):
indice = [-1]
for i in range(n):
a1 = np.delete(valores, i)
for j in range(len(a1)):
a2 = np.delete(a1, j)
for k in range(len(a2)):
array_except = np.delete(a2, k)
for idx, value in enumerate(array_except):
suma = value + valores[i] + a1[j] + a2[k]
if round(suma, ROUND_DECIMAL) == target:
indice = [idx for idx, val in enumerate(valores) if val in [value, valores[i], a1[j], a2[k]]]
return indice
return indice
@njit
def encontrar_comb_5(valores, target, n):
indice = [-1]
for i in range(n):
a1 = np.delete(valores, i)
for j in range(len(a1)):
a2 = np.delete(a1, j)
for k in range(len(a2)):
a3 = np.delete(a2, k)
for l in range(len(a3)):
array_except = np.delete(a2, l)
for idx, value in enumerate(array_except):
suma = value + valores[i] + a1[j] + a2[k] + a3[l]
if round(suma, ROUND_DECIMAL) == target:
indice = [idx for idx, val in enumerate(valores) if val in [value, valores[i], a1[j], a2[k], a3[l]]]
return indice
return indice
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment