-
Notifications
You must be signed in to change notification settings - Fork 7
huemul_Table
huemul_Table permite definir una estructura de tabla en BigData. La definición se hace en forma similar a la definición de una clase, y conceptualmente tiene los mismos principios que una tabla tradicional, pero con las ventajas de definir el almacenamiento en HDFS, y la conexión con Hive.
La clase se inicializa con la instancia de huemul_BigDataGovernance y Control
import com.huemulsolutions.bigdata.common._
import com.huemulsolutions.bigdata.control._
import com.huemulsolutions.bigdata.tables._
//El nombre de la tabla es el nombre de la clase
class tbl_NombreTabla(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control) extends huemul_Table(huemulBigDataGov, Control) with Serializable {
/********** C O N F I G U R A C I O N D E L A T A B L A ****************************************/
/********** S E T E O O P T I M I Z A C I O N ****************************************/
/********** S E T E O I N F O R M A T I V O ****************************************/
/********** D A T A Q U A L I T Y T A B L E ****************************************/
/********** S E G U R I D A D ****************************************/
/********** C O L U M N A S ****************************************/
/********** F O R E I N G K E Y ****************************************/
/********** D A T A Q U A L I T Y C O L U M N A S ****************************************/
this.ApplyTableDefinition()
}
class huemul_Table(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control)
Estos métodos permiten configurar el tipo de tabla, comportamiento al agregar/modificar datos y tipo de almacenamiento, entre otros. Adicionalmente estos parámetros son almacenados en el modelo de Gobierno de Datos en PostgreSQL.
- setDataBase(value: ArrayBuffer[huemul_KeyValuePath]): Asigna la base de datos en Hive donde se creará la tabla externa, value se obtiene desde la configuración de GlobalSettings
class tbl_NombreTabla(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control) extends huemul_Table(huemulBigDataGov, Control) with Serializable {
this.setDataBase(huemulBigDataGov.GlobalSettings.MASTER_DataBase)
...
- setTableType(value: huemulType_Tables): Define el tipo de tabla. Las tablas de tipo "Master" y "Reference" son tablas de catálogos tipo maestras, es decir no tienen partición por fechas y los datos pueden ser actualizables, aunque su variación es poco probable (por ejemplo tabla de países). Las tablas de tipo "Transaction" tienen partición, por lo general la partición es por período, y cuando hay reprocesamiento de una partición los datos son eliminados y se vuelven a cargar.
class tbl_NombreTabla(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control) extends huemul_Table(huemulBigDataGov, Control) with Serializable {
this.setTableType(huemulType_Tables.Transaction)
...
- setStorageType(value: huemulType_StorageType): Establece el tipo de almacenamiento de la tabla en HDFS, la recomendación es usar Parquet como almacenamiento.
class tbl_NombreTabla(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control) extends huemul_Table(huemulBigDataGov, Control) with Serializable {
this.setStorageType(huemulType_StorageType.PARQUET)
...
- setPartitionField(value: String): Para las tablas de tipo "Transaction", define el campo que contiene la partición de la tabla, por lo general es una columna con datos por periodo. Para las tablas de tipo "Master" y "Reference" este método no se debe incluir en la definición de la tabla.
class tbl_NombreTabla(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control) extends huemul_Table(huemulBigDataGov, Control) with Serializable {
this.setPartitionField("periodo_mes")
...
- setGlobalPaths(value: ArrayBuffer[huemul_KeyValuePath]): Establece la ruta en HDFS donde se almacenará la tabla. La ruta completa se compone por GlobalPath + LocalPath. La ruta de GlobalPath es establecida por el administrador del ambiente, y está disponible en los parámetros de GlobalSettings.
class tbl_NombreTabla(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control) extends huemul_Table(huemulBigDataGov, Control) with Serializable {
this.setGlobalPaths(huemulBigDataGov.GlobalSettings.MASTER_BigFiles_Path)
...
- setLocalPath(value: String): Establece la sub-carpeta en HDFS donde se almacenará el archivo físico.
class tbl_NombreTabla(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control) extends huemul_Table(huemulBigDataGov, Control) with Serializable {
this.setLocalPath("yourapplication/")
...
- setFrequency(value: huemulType_Frequency): Establece la frecuencia de datos que tiene esta tabla. Este parámetro es relevante para poder monitorear y controlar los datos.
class tbl_NombreTabla(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control) extends huemul_Table(huemulBigDataGov, Control) with Serializable {
this.setFrequency(huemulType_Frequency.MONTHLY)
...
- setNumPartitions(): (disponible desde versión 1.3) Indica el número de particiones (distribuciones físicas de un archivo parquet) que se almacena al escribir en HDFS. Por lo general para un archivo de datos se crean N distribuciones del archivo, lo que implica que el almacenamiento de tablas pequeñas es muy poco óptimo. El valor predeterminado es null, por tanto se determina en forma automática por Spark. En tablas chicas se sugiere un valor pequeño (por ejemplo 2)
class tbl_NombreTabla(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control) extends huemul_Table(huemulBigDataGov, Control) with Serializable {
this.setNumPartitions(2)
...
- ApplyTableDefinition(): Aplica todas las definiciones sobre la tabla, y realiza una validación del seteo. Este método debe ser invocado al final de la definición de la clase.
Estos métodos permiten configurar el rendimiento.
- setNumPartitions(value: Integer): Indica la cantidad de archivos que generará al guardar el fichero (1 para archivos pequeños, de esta forma se evita el problema de los archivos pequeños en HDFS.
this.setNumPartitions(1)
Estos métodos permiten agregar información de documentación de la tabla. Cuando se crea la instancia de la clase se almacena esta información en forma automática en el modelo de Gobierno de Datos en PostgreSQL, y puede ser consultado por cualquier persona con acceso al diccionario de datos.
- setDescription(value: String): Descripción del contenido de la tabla
class tbl_NombreTabla(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control) extends huemul_Table(huemulBigDataGov, Control) with Serializable {
this.setDescription("Contenido de la tabla, ideal incluir algunos ejemplos para que cualquier persona pueda entender")
...
- setIT_ResponsibleName(value: String): Responsable desde la perspectiva de TI, se puede especificar el área y/o persona responsable.
class tbl_NombreTabla(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control) extends huemul_Table(huemulBigDataGov, Control) with Serializable {
this.setIT_ResponsibleName("Area de archivos normativos")
...
- setBusiness_ResponsibleName(value: String): Responsable desde la perspectiva del negocio, se puede especificar el área y/o persona responsable.
class tbl_NombreTabla(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control) extends huemul_Table(huemulBigDataGov, Control) with Serializable {
this.setBusiness_ResponsibleName("Area de Contabilidad")
...
- setDQ_MaxNewRecords_Num(value: Long): Limita la cantidad de registros nuevos que pueden ser insertados en la tabla. Esta validación es relevante para evitar insertar en forma masiva sobre tablas maestras que, en teoría, no deben crecer.
class tbl_NombreTabla(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control) extends huemul_Table(huemulBigDataGov, Control) with Serializable {
//Máximo se pueden insertar 100 registros cada vez que se ejecuta el proceso
this.setDQ_MaxNewRecords_Num(100)
...
- setDQ_MaxNewRecords_Perc(value: Decimal): Limita el porcentaje de registros nuevos que pueden ser insertados en la tabla. Esta validación es relevante para evitar insertar en forma masiva sobre tablas maestras que, en teoría, no deben crecer.
class tbl_NombreTabla(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control) extends huemul_Table(huemulBigDataGov, Control) with Serializable {
//Máximo se pueden insertar 15% registros cada vez que se ejecuta el proceso
this.setDQ_MaxNewRecords_Perc(0.15)
...
- WhoCanRun_executeFull_addAccess(ClassName: String, PackageName: String): Establece autorización de ejecutar una carga full de datos en la tabla (método ExecuteFull). La omisión de este método indica que la carga de datos full sobre la tabla es pública.
class tbl_NombreTabla(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control) extends huemul_Table(huemulBigDataGov, Control) with Serializable {
this.WhoCanRun_executeFull_addAccess("process_entidad_mes", "com.yourcompany.yourapplication")
...
- WhoCanRun_executeOnlyInsert_addAccess(ClassName: String, PackageName: String): Establece autorización para insertar datos nuevos en la tabla, sin modificar los datos ya existentes (método ExecuteOnlyInsert). La omisión de este método indica que la carga de datos sobre la tabla es pública.
class tbl_NombreTabla(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control) extends huemul_Table(huemulBigDataGov, Control) with Serializable {
this.WhoCanRun_executeOnlyInsert_addAccess("process_entidad_mes", "com.yourcompany.yourapplication")
...
- WhoCanRun_executeOnlyUpdate_addAccess(ClassName: String, PackageName: String): Establece autorización para actualizar datos en la tabla, sin insertar nuevos datos (método ExecuteOnlyUpdate). La omisión de este método indica que la actualización de datos sobre la tabla es pública.
class tbl_NombreTabla(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control) extends huemul_Table(huemulBigDataGov, Control) with Serializable {
this.WhoCanRun_executeOnlyUpdate_addAccess("process_entidad_mes", "com.yourcompany.yourapplication")
...
Estos métodos permiten agregar validaciones de datos más complejas.
Estos métodos permiten agregar DataQuality de integridad referencial entre tablas.
- huemul_Table_Relationship(Class_TableName: Object, allowNull: Boolean): Permite establecer una relación de integridad referencial entre tablas. El parámetro Class_TableName recibe la tabla que tiene los datos maestros, y el parámetro allowNull establece la posibilidad de nulos (relación 0 a muchos).
- huemul_Table_RelationshipColumns (pk: huemul_Columns, fk: huemul_Columns): Permite especificar las columnas que conforman la relación entre las tablas. El parámetro "pk" es la columna de la tabla externa, y el parámetro "fk" recibe la columna de la tabla que se está definiendo.
- setExternalCode(value: String): huemul_Table_Relationship: Indica el código externo para indicar la validación de FK
-
setNotification(value: huemulType_DQNotification ): huemul_Table_Relationship: indica el nivel de criticidad para la notificación de la validación de FK. El valor por default es "ERROR", de esta forma se mantiene el comportamiento y compatibilidad con versiones anteriores. las opciones ahora son las siguientes:
- huemulType_DQNotification.WARNING_ERROR: Genera un error que detiene la ejecución.
- huemulType_DQNotification.WARNING_EXCLUDE: Genera un warning, excluye los registros que no cumplen con la condición (son almacenados en la tabla [nombre_tabla]_dq), y la ejecución continúa con el resto de las filas.
- huemulType_DQNotification.WARNING: Se genera un warning, los registros con fallas son almacenados en la tabla detallada de DQ (en caso de estar activa) y la ejecución continúa con el 100% de las filas (buenas y malas).
class tbl_sbif_eerr_mes(huemulBigDataGov: huemul_BigDataGovernance, Control: huemul_Control) extends huemul_Table(huemulBigDataGov, Control) with Serializable {
/********** C O N F I G U R A C I O N D E L A T A B L A ****************************************/
//Tipo de tabla, Master y Reference son catalogos sin particiones de periodo
this.setTableType(huemulType_Tables.Transaction)
...
/********** C O L U M N A S ****************************************/
//Columna de periodo
val periodo_mes = new huemul_Columns (StringType, true,"periodo de los datos")
periodo_mes.setIsPK(true)
val ins_id = new huemul_Columns (StringType, true, "Código institución.")
ins_id.setIsPK(true)
ins_id.setARCO_Data(false)
ins_id.setSecurityLevel(huemulType_SecurityLevel.Public)
ins_id.setDQ_MinLen(3)
ins_id.setDQ_MaxLen(3)
...
//ESTABLECE INSTANCIA DE LA TABLA QUE CONTIENE LA PK
val itbl_comun_institucion = new tbl_comun_institucion(huemulBigDataGov,Control)
//huemul_Table_Relationship: CREA RELACIÓN ENTRE TABLAS, NO PERMITE VALORES EN NULOS
val fk_tbl_comun_institucion = new huemul_Table_Relationship(itbl_comun_institucion, false).setNotification(huemulType_DQNotification.WARNING_EXCLUDE).setExternalCode("USER_FK_CODE").broadcastJoin(true)
//AGREGA RELACION DE COLUMNAS ENTRE TABLAS
fk_tbl_comun_institucion.AddRelationship(itbl_comun_institucion.ins_id, ins_id)
Estos métodos se establecen al instanciar la clase de la tabla, es decir, en el momento en que se requiera ejecutar una carga de datos.
- DF_from_SQL(Alias: String, sql: String, SaveInTemp: Boolean = true, NumPartitions: Integer = null): Permite ejecutar una sentencia SQL desde el parámetro "sql", y el DataFrame resultante es creado como vista temporal con el alias "Alias" para ser accedido desde Spark-SQL. El parámetro opcional NumPartitions (desde la versión 1.3) indica la cantidad de particiones en Spark, esto permite administrar de mejor forma la performance del aplicativo. Es equivalente a invocar el método DataFramehuemul.DF_from_SQL
- DF_from_DF(Alias: String, DF: DataFrame, SaveInTemp: Boolean = true): Asigna un DataFrame a partir de un DataFrame existente, y crea una vista temporal con el alias "Alias" para ser accedido desde Spark-SQL
- setMappingAuto(): Mapea en forma automática las columnas que están definidas en la tabla, con las columnas que tiene el DataFrame que será cargado. Asume que el 100% de las columnas serán cargadas.
- setAutoCast(value: Boolean): Permite hacer cast del DataFrame a los tipos de datos definidos en la tabla. Por default al procesar los datos hace autocast, si se quiere deshabilitar el autocast invocar con valor false.
- setApplyDistinct(value: Boolean): Permite hacer distinct del dataframe en forma automática para eliminar filas duplicadas. Por default al procesar los datos hace distinct, si se quiere omitir el distinct al cargar los datos se debe invocar setApplyDistinct(false), esto hará que la carga de datos sea más eficiente, sin embargo se corre el riesgo de errores en la carga por datos duplicados.
- executeFull(NewAlias: String, storageLevelOfDF: org.apache.spark.storage.StorageLevel = null): Boolean: Ejecuta la carga de datos completa. Para las tablas de tipo "Transaction" borra el contenido anterior de la partición, y vuelve a cargar los datos del nuevo DataFrame. Para las tablas de tipo "Master" y "Reference" inserta los datos nuevos, actualiza los datos que ya existían y marca como eliminados los datos que antes existían y ahora no están en el DataFrame. El resultante de la ejecución queda disponible en el atributo "DataFramehuemul", y también se puede acceder a el mediante spark-SQL utilizando el alias del parámetro "NewAlias". El parámetro opcional storageLevelOfDF (desde versión 1.3) permite dejar resultado del DataFrame Final en memoria, lo que optimiza las tareas posteriores de DataQuality y validación de integridad referencia.
val MyTableInstance = new tbl_NombreTabla(huemulBigDataGov, Control)
MyTableInstance.DF_from_SQL("DatosNuevos","select 56 as codigo, 'chile' as chile ")
MyTableInstance.setMappingAuto()
if (!MyTableInstance.executeFull("Final"))
Control.RaiseError(s"User: Error al intentar masterizar paises(${MyTableInstance.Error_Code}): ${MyTableInstance.Error_Text}")
...
- executeOnlyInsert(NewAlias: String, storageLevelOfDF: org.apache.spark.storage.StorageLevel = null): Boolean: Ejecuta la carga de datos en modo solo insert. Para las tablas de tipo "Transaction" no es posible llamar este método. Para las tablas de tipo "Master" y "Reference" inserta los datos nuevos sin hacer modificación alguna sobre los datos existentes. El resultante de la ejecución queda disponible en el atributo "DataFramehuemul", y también se puede acceder a el mediante spark-SQL utilizando el alias del parámetro "NewAlias". El parámetro opcional storageLevelOfDF (desde versión 1.3) permite dejar resultado del DataFrame Final en memoria, lo que optimiza las tareas posteriores de DataQuality y validación de integridad referencia.
val MyTableInstance = new tbl_NombreTabla(huemulBigDataGov, Control)
MyTableInstance.DF_from_SQL("DatosNuevos","select 56 as codigo, 'chile' as chile ")
MyTableInstance.setMappingAuto()
if (!MyTableInstance.executeOnlyInsert("Final"))
Control.RaiseError(s"User: Error al intentar insertar paises(${MyTableInstance.Error_Code}): ${MyTableInstance.Error_Text}")
...
- executeOnlyUpdate(NewAlias: String, storageLevelOfDF: org.apache.spark.storage.StorageLevel = null): Boolean: Ejecuta la carga de datos en modo solo update. Para las tablas de tipo "Transaction" no es posible llamar este método. Para las tablas de tipo "Master" y "Reference" actualiza la información de los datos actuales, sin marcar registros como eliminados ni agregar datos nuevos. El resultante de la ejecución queda disponible en el atributo "DataFramehuemul", y también se puede acceder a el mediante spark-SQL utilizando el alias del parámetro "NewAlias". El parámetro opcional storageLevelOfDF (desde versión 1.3) permite dejar resultado del DataFrame Final en memoria, lo que optimiza las tareas posteriores de DataQuality y validación de integridad referencia.
val MyTableInstance = new tbl_NombreTabla(huemulBigDataGov, Control)
MyTableInstance.DF_from_SQL("DatosNuevos","select 56 as codigo, 'chile' as chile ")
MyTableInstance.setMappingAuto()
if (!MyTableInstance.executeOnlyUpdate("Final"))
Control.RaiseError(s"User: Error al intentar actualizar paises(${MyTableInstance.Error_Code}): ${MyTableInstance.Error_Text}")
...
- executeSelectiveUpdate(NewAlias: String, PartitionValueForSelectiveUpdate: String, storageLevelOfDF: org.apache.spark.storage.StorageLevel = null): Boolean: Hay veces en que solo queremos actualizar algunos campos y algunos registros en particular, por ejemplo actualizaciones poco frecuentes, o incluso correcciones puntuales de datos. Supongamos que tenemos una tabla de paises con código y nombre, pero además tenemos atributos adicionales que son actualizados por otros procesos (por ejemplo la población). La población estará en un fichero independiente, donde no estará el 100% de los paises todas la veces, sino que en cada carga se pueden cargar 1 o 2 paises, y se puede actualizar en cualquier momento. El comportamiento de la actualización es similar para las tablas "Master", "Reference" y "Transaction". Para el caso de las tablas de tipo "Transaction", se debe especificar la partición en particular que se desea actualizar. El parámetro opcional storageLevelOfDF (desde versión 1.3) permite dejar resultado del DataFrame Final en memoria, lo que optimiza las tareas posteriores de DataQuality y validación de integridad referencia.
val MyTableInstance = new tbl_NombreTabla(huemulBigDataGov, Control)
MyTableInstance.DF_from_SQL("DatosNuevos","select 56 as codigo, 17000000 as poblacion ")
MyTableInstance.codigo.setMapping("codigo")
MyTableInstance.poblacion .setMapping("poblacion ")
if (!MyTableInstance.executeSelectiveUpdate("Final"))
Control.RaiseError(s"User: Error al intentar actualizar registros de poblacion (${MyTableInstance.Error_Code}): ${MyTableInstance.Error_Text}")
...
- RaiseError(txt: String, code: Integer): Permite lanzar un error con la descripción "txt" y un código de error "code".
Atributos y métodos disponibles para obtener información de la tabla.
- TableName: Obtiene el nombre de la tabla
- GetTable(): String Obtiene el nombre de la tabla con la base de datos incluida (DataBase.TableName), para ser usada en consultas de Spark-SQL.
- GetCurrentDataBase(): String: Retorna el nombre de la base de datos en Hive, en función del "environment" donde está ejecutando la aplicación.
- DataFramehuemul: huemul_DataFrame: Si se accede antes de ejecutar la carga de datos, obtendrá acceso a los datos que serán procesados (DataFrame). Si se accede posterior a la ejecución de los datos (después de invocar al método executeFull, executeOnlyInsert o executeOnlyUpdate), entonces obtiene acceso al resultado del procesamiento.
- Error_isError: Boolean: Indica si tuvo algún error en el procesamiento de los datos
- Error_Text: String: Si tuvo algún error, obtiene el mensaje de error.
- Error_Code: Integer: Si tuvo algún error, retorna el código del error.