Commit 6fa897ba authored by Erly Villaroel's avatar Erly Villaroel

Merge remote-tracking branch 'origin/developer_ca' into developer_ev

parents 5c4528b1 06da121b
...@@ -4,6 +4,7 @@ from itertools import combinations ...@@ -4,6 +4,7 @@ from itertools import combinations
import multiprocessing as mp import multiprocessing as mp
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from numba import njit
from parallel_pandas import ParallelPandas from parallel_pandas import ParallelPandas
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
...@@ -187,15 +188,18 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -187,15 +188,18 @@ class MatchAndExcludeRecordsAction(ActionInterface):
df3 = df3.toPandas() df3 = df3.toPandas()
total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"] total_cols = group_cols + [amount_col, id_col, EXCLUDE_ROWS_FIELD, "DIFF"]
ParallelPandas.initialize(n_cpu=mp.cpu_count(), split_factor=8, disable_pr_bar=True) # ParallelPandas.initialize(n_cpu=mp.cpu_count(), split_factor=8, disable_pr_bar=True)
resultado = df3[total_cols].groupby(group_cols).p_apply(lambda x: custom_func(x, amount_col, id_col, comb_per_group, max_combinations)) df3 = df3.sort_values(group_cols + [amount_col])
resultado = df3[total_cols].groupby(group_cols).apply(lambda x: custom_func(x, amount_col, id_col, max_combinations))
resultado = resultado.reset_index() resultado = resultado.reset_index()
if len(resultado.columns) == 1: if len(resultado.columns) == 1:
resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"]) resultado = pd.DataFrame([], columns=group_cols + ["LISTA_DIFF"])
else: else:
resultado.columns = group_cols + ["LISTA_DIFF"] resultado.columns = group_cols + ["LISTA_DIFF"]
# print(resultado["LISTA_DIFF"].apply(lambda x: x if pd.notna(x) and x[0]!=-1 else x))
meged2 = resultado.merge(merged_df.toPandas(), 'left', group_cols) meged2 = resultado.merge(merged_df.toPandas(), 'left', group_cols)
print(meged2)
meged2["LISTA_DIFF"] = meged2["LISTA_DIFF"].apply(self.handle_array) meged2["LISTA_DIFF"] = meged2["LISTA_DIFF"].apply(self.handle_array)
meged2 = meged2[(meged2['DIFF'] == 0) | ((meged2['DIFF'] != 0) & (meged2['LISTA_DIFF'].apply(len) > 0))] meged2 = meged2[(meged2['DIFF'] == 0) | ((meged2['DIFF'] != 0) & (meged2['LISTA_DIFF'].apply(len) > 0))]
if meged2.empty: if meged2.empty:
...@@ -217,7 +221,8 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -217,7 +221,8 @@ class MatchAndExcludeRecordsAction(ActionInterface):
return self.output return self.output
def handle_array(self, x): def handle_array(self, x):
if isinstance(x, List): # print(type(x))
if isinstance(x, np.ndarray):
return x return x
else: else:
return [] return []
...@@ -245,7 +250,7 @@ class MatchAndExcludeRecordsAction(ActionInterface): ...@@ -245,7 +250,7 @@ class MatchAndExcludeRecordsAction(ActionInterface):
raise Exception(f"Error creando sesion Spark. {e}") raise Exception(f"Error creando sesion Spark. {e}")
def custom_func(group, amount_field, id_field, max_comb_per_group, max_combinations): def custom_func(group, amount_field, id_field, max_combinations):
diff = group["DIFF"].values[0] diff = group["DIFF"].values[0]
if pd.isna(diff) or diff == 0: if pd.isna(diff) or diff == 0:
return None return None
...@@ -253,50 +258,84 @@ def custom_func(group, amount_field, id_field, max_comb_per_group, max_combinati ...@@ -253,50 +258,84 @@ def custom_func(group, amount_field, id_field, max_comb_per_group, max_combinati
group[amount_field] = group[amount_field].astype(float) group[amount_field] = group[amount_field].astype(float)
group = group.reset_index(drop=True) group = group.reset_index(drop=True)
values = group[amount_field].values values = group[amount_field].values
indexs = group.index.values ids = group[id_field].values
np.random.shuffle(indexs)
tam = len(values) n = len(values)
rang = range(1, tam + 1) if tam <= max_combinations else range(1, max_combinations + 1) valores1 = encontrar_comb_1(values, diff)
if valores1[0] != -1:
final = None indices = ids[valores1]
stop_event = False return indices
valores2 = encontrar_comb_2(values, diff, n)
def buscar_combinacion(i): if valores2[0] != -1:
nonlocal final, stop_event indices = ids[valores2]
if not stop_event: return indices
for index, comb in enumerate(combinations(indexs, i)): # Iterar sobre todos los índices posibles
if stop_event or index > max_comb_per_group: # valores4 = encontrar_comb_4(values, diff, n)
break # if valores4[0] != -1:
elif np.sum(values[list(comb)]).round(ROUND_DECIMAL) == diff: # indices = ids[valores4]
final = group.loc[list(comb), id_field].tolist() # return indices
stop_event = True valores5 = encontrar_comb_5(values, diff, n)
break if valores5[0] != -1:
indices = ids[valores5]
return None return indices
with ThreadPoolExecutor() as executor:
futures = [executor.submit(buscar_combinacion, i) for i in rang] @njit
def encontrar_comb_1(valores, target):
for future in futures: indice = [-1]
try: for idx, value in enumerate(valores):
future.result() suma = value
except TimeoutError: if round(suma, ROUND_DECIMAL) == target:
stop_event = True indice = [idx for idx, val in enumerate(valores) if val in [value]]
break return indice
if stop_event or final is not None: return indice
break
@njit
return final def encontrar_comb_2(valores, target, n):
indice = [-1]
for i in range(n):
array_except = np.delete(valores, i)
for idx, value in enumerate(array_except):
suma = value + valores[i]
if round(suma, ROUND_DECIMAL) == target:
indice = [idx for idx, val in enumerate(valores) if val in [value, valores[i]]]
return indice
return indice
@njit
def encontrar_comb_4(valores, target, n):
indice = [-1]
for i in range(n):
a1 = np.delete(valores, i)
for j in range(len(a1)):
a2 = np.delete(a1, j)
for k in range(len(a2)):
array_except = np.delete(a2, k)
for idx, value in enumerate(array_except):
suma = value + valores[i] + a1[j] + a2[k]
if round(suma, ROUND_DECIMAL) == target:
indice = [idx for idx, val in enumerate(valores) if val in [value, valores[i], a1[j], a2[k]]]
return indice
return indice
@njit
def encontrar_comb_5(valores, target, n):
indice = [-1]
for i in range(n):
a1 = np.delete(valores, i)
for j in range(len(a1)):
a2 = np.delete(a1, j)
for k in range(len(a2)):
a3 = np.delete(a2, k)
for l in range(len(a3)):
array_except = np.delete(a2, l)
for idx, value in enumerate(array_except):
suma = value + valores[i] + a1[j] + a2[k] + a3[l]
if round(suma, ROUND_DECIMAL) == target:
indice = [idx for idx, val in enumerate(valores) if val in [value, valores[i], a1[j], a2[k], a3[l]]]
return indice
return indice
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment