Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how can I create external table with selected parquet files in apache spark thrift server #12749

Open
Suraj-estuate opened this issue Jan 31, 2025 · 0 comments

Comments

@Suraj-estuate
Copy link

Suraj-estuate commented Jan 31, 2025

Image

**i have 4 paquet files
i have stored the the two paequet file path in list
paequet_files=[/home/rajesh/yashwanth/af_files/thurs_1/parquet/OPTIM_UK_ADDRESSES.parquet/part-00000-1c615841-690e-40f3-9ee3-9d2eff405d82-c000.zstd.parquet,/home/rajesh/yashwanth/af_files/thurs_1/parquet/OPTIM_UK_ADDRESSES.parquet/part-00000-1e0ed4ac-bf95-4606-9b71-3c135fac4290-c000.zstd.parquet]

how can i create external table using appache_spark_thrift server for these two paequet_files in a table name OPTIM_UK_ADDRESSES**

i was able to do with directory ,where all the four paquet files where created in external table with the table name OPTIM_UK_ADDRESSES
these im my directory code
Method to connect to thrift server
def thrift_server():
"""
This function establishes a connection with the Thrift server.

If the server details are not available, it returns False.
Otherwise, it returns a session connection.
"""

archive_viewer_log = application_logger()

ormclass_dict = get_orm_classes()
Queryservermaster = ormclass_dict["Queryservermaster"]

from src.connections.connect_db import engine

connection = None

Session = sessionmaker(bind=engine)
session = Session()

server_details = session.query(
    Queryservermaster.serverip,
    Queryservermaster.httpport,
    Queryservermaster.username,
    Queryservermaster.psswrd,
).first()

if server_details is not None:
    host = str(server_details.serverip)
    port = str(server_details.httpport)
    username = str(server_details.username)

    psswrd = psswrd_decrypt(str(server_details.psswrd))

    try:
        fileName = read_files_from_parent_dir(3)

        ssl_val = with_open_read_json_file(fileName)

        connect_server = connect(
            host=host,
            port=port,
            auth_mechanism=properties["queryserver"]["auth_mechanism"],
            user=username,
            password=psswrd,
            use_ssl=ssl_val["queryserver"]["use_ssl"],
            timeout=None,
        )

        connection = connect_server.cursor()

    except TTransportException as e:
        archive_viewer_log.exception(e, exc_info=True)


return connection

def create_schema(application_name):

try:

from src.connections.connect_thrift import thrift_server
connection = thrift_server()
connection.execute('CREATE DATABASE IF NOT EXISTS '+application_name)
connection.execute('USE '+ application_name)

return connection

def create_specific_external_table(
db, connection, application_name, tablename, parquet_location, queryServerId
):
"""
Method to create external table based on tablename.
return : "error" - if parquet file is not found in specified location.
return : None - if parquet file is found in specified location.
"""
archive_viewer_log = application_logger()
try:
archive_viewer_log_info = process_logger()
response = connection_test(db, queryServerId)
if not response:
archive_viewer_log.error(debugmessages["debug_messages"]["4005"])
exceptions(404, "404-B", errormessages["errormessagecode"]["714"])
else:
return_msg = None

        create_table = (
            """create table if not exists """
            + tablename
            + """ using PARQUET location """
            + """'"""
            + str(parquet_location)
            + """'"""
        )

        try:
            connection.execute(create_table)
            archive_viewer_log_info.info(f"Table {tablename} created successfully.")

            try:
                connection.execute("MSCK REPAIR TABLE " + application_name + "." + tablename)
            except OperationalError as ex:
                # Ignore exceptions from MSCK REPAIR TABLE
                pass
            except Exception:
                archive_viewer_log.exception(ex, exc_info=True)

            try:
                connection.execute(
                    "REFRESH TABLE " + application_name + "." + tablename
                )
                archive_viewer_log_info.info(f"REFRESH TABLE executed successfully for {tablename}.")

            except Exception as ex:
                archive_viewer_log.exception(ex, exc_info=True)
        except Exception as er:
            exception_string = str(er)
            archive_viewer_log.error(f"Error creating table {tablename}: {str(er)}")
            if errormessages["errormessagecode"]["850"] in exception_string:
                return_msg = messages["messagecode"]["106"]

        return return_msg
except Exception as e:
    archive_viewer_log.exception(e, exc_info=True)
    archive_viewer_log.error(debugmessages["debug_messages"]["1929"])

parameter
parquet_location=/home/rajesh/yashwanth/af_files/thurs_1/parquet/OPTIM_UK_ADDRESSES.parquet
table_name=OPTIM_UK_ADDRESSES
application name = devita

if it is possible to create external table with selected parquet files how can i create external table with selected parquet files

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant