Skip to content

Commit 2268704

Browse files
committed
test parsing of parquet file
1 parent 99ca9fa commit 2268704

File tree

8 files changed

+208
-62
lines changed

8 files changed

+208
-62
lines changed

ApplicationLibCode/Commands/ApplicationCommands/RicSumoDataFeature.cpp

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "Sumo/RimSumoConnector.h"
2727

2828
#include "RiaLogging.h"
29+
#include "RifOsduWellLogReader.h"
2930
#include <QAction>
3031

3132
CAF_CMD_SOURCE_INIT( RicSumoDataFeature, "RicSumoDataFeature" );
@@ -64,6 +65,10 @@ SimpleDialog::SimpleDialog( QWidget* parent )
6465
connect( parquetDownloadButton, &QPushButton::clicked, this, &SimpleDialog::onParquetClicked );
6566
layout->addWidget( parquetDownloadButton );
6667

68+
showContentParquetButton = new QPushButton( "Show Content Parquet", this );
69+
connect( showContentParquetButton, &QPushButton::clicked, this, &SimpleDialog::onShowContentParquetClicked );
70+
layout->addWidget( showContentParquetButton );
71+
6772
okButton = new QPushButton( "OK", this );
6873
connect( okButton, &QPushButton::clicked, this, &SimpleDialog::onOkClicked );
6974
layout->addWidget( okButton );
@@ -196,12 +201,27 @@ void SimpleDialog::onParquetClicked()
196201

197202
if ( !m_sumoConnector->blobIds().empty() )
198203
{
199-
m_sumoConnector->requestParquet( m_sumoConnector->blobIds().back() );
204+
m_sumoConnector->requestBlobDownload( m_sumoConnector->blobIds().back() );
200205

201206
label->setText( "Requesting blob ID for vector name (see log for response" );
202207
}
203208
}
204209

210+
//--------------------------------------------------------------------------------------------------
211+
///
212+
//--------------------------------------------------------------------------------------------------
213+
void SimpleDialog::onShowContentParquetClicked()
214+
{
215+
if ( m_sumoConnector->blobContents().empty() ) return;
216+
217+
auto blob = m_sumoConnector->blobContents().back();
218+
219+
auto content = blob.contents;
220+
// TODO: show content using parquet reader
221+
auto tableText = RifOsduWellLogReader::readSummaryData( content );
222+
RiaLogging::info( tableText );
223+
}
224+
205225
//--------------------------------------------------------------------------------------------------
206226
///
207227
//--------------------------------------------------------------------------------------------------
@@ -229,6 +249,8 @@ void SimpleDialog::onTokenReady( const QString& token )
229249

230250
QSettings settings;
231251
settings.setValue( m_registryKeyBearerToken_DEBUG_ONLY, token );
252+
253+
m_sumoConnector->setToken( token );
232254
}
233255

234256
void SimpleDialog::onOkClicked()

ApplicationLibCode/Commands/ApplicationCommands/RicSumoDataFeature.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class SimpleDialog : public QDialog
4747
void onVectorNamesClicked();
4848
void onFindBlobIdClicked();
4949
void onParquetClicked();
50+
void onShowContentParquetClicked();
5051

5152
bool isTokenValid();
5253

@@ -60,6 +61,7 @@ class SimpleDialog : public QDialog
6061
QPushButton* vectorNamesButton;
6162
QPushButton* blobIdButton;
6263
QPushButton* parquetDownloadButton;
64+
QPushButton* showContentParquetButton;
6365

6466
QPointer<RimSumoConnector> m_sumoConnector;
6567

ApplicationLibCode/FileInterface/RifArrowTools.cpp

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,38 @@ std::vector<double> RifArrowTools::convertChunkedArrayToStdVector( const std::sh
6060
}
6161

6262
return result;
63-
};
63+
}
64+
65+
//--------------------------------------------------------------------------------------------------
66+
///
67+
//--------------------------------------------------------------------------------------------------
68+
std::vector<float> RifArrowTools::convertChunkedArrayToStdFloatVector( const std::shared_ptr<arrow::ChunkedArray>& column )
69+
{
70+
auto convertChunkToFloatVector = []( const std::shared_ptr<arrow::Array>& array ) -> std::vector<float>
71+
{
72+
std::vector<float> result;
73+
74+
auto arrowFloatArray = std::static_pointer_cast<arrow::FloatArray>( array );
75+
result.resize( arrowFloatArray->length() );
76+
for ( int64_t i = 0; i < arrowFloatArray->length(); ++i )
77+
{
78+
result[i] = arrowFloatArray->Value( i );
79+
}
80+
81+
return result;
82+
};
83+
84+
CAF_ASSERT( column->type()->id() == arrow::Type::FLOAT );
85+
86+
std::vector<float> result;
87+
88+
// Iterate over each chunk in the column
89+
for ( int i = 0; i < column->num_chunks(); ++i )
90+
{
91+
std::shared_ptr<arrow::Array> chunk = column->chunk( i );
92+
std::vector<float> chunk_vector = convertChunkToFloatVector( chunk );
93+
result.insert( result.end(), chunk_vector.begin(), chunk_vector.end() );
94+
}
95+
96+
return result;
97+
}

ApplicationLibCode/FileInterface/RifArrowTools.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ class RifArrowTools
3131
{
3232
public:
3333
static std::vector<double> convertChunkedArrayToStdVector( const std::shared_ptr<arrow::ChunkedArray>& column );
34+
static std::vector<float> convertChunkedArrayToStdFloatVector( const std::shared_ptr<arrow::ChunkedArray>& column );
3435
};

ApplicationLibCode/FileInterface/RifOsduWellLogReader.cpp

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,16 @@
2727

2828
#include "RifArrowTools.h"
2929

30+
#include "RifCsvDataTableFormatter.h"
31+
#include <algorithm>
3032
#include <arrow/array/array_primitive.h>
3133
#include <arrow/csv/api.h>
3234
#include <arrow/io/api.h>
3335
#include <arrow/scalar.h>
3436
#include <parquet/arrow/reader.h>
3537

38+
#pragma optimize( "", off )
39+
3640
//--------------------------------------------------------------------------------------------------
3741
///
3842
//--------------------------------------------------------------------------------------------------
@@ -72,3 +76,78 @@ std::pair<cvf::ref<RigOsduWellLogData>, QString> RifOsduWellLogReader::readWellL
7276

7377
return { logData, "" };
7478
}
79+
80+
//--------------------------------------------------------------------------------------------------
81+
///
82+
//--------------------------------------------------------------------------------------------------
83+
QString RifOsduWellLogReader::readSummaryData( const QByteArray& contents )
84+
{
85+
arrow::MemoryPool* pool = arrow::default_memory_pool();
86+
87+
std::shared_ptr<arrow::io::RandomAccessFile> input = std::make_shared<RifByteArrayArrowRandomAccessFile>( contents );
88+
89+
// Open Parquet file reader
90+
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
91+
if ( !parquet::arrow::OpenFile( input, pool, &arrow_reader ).ok() )
92+
{
93+
return {};
94+
}
95+
96+
// Read entire file as a single Arrow table
97+
std::shared_ptr<arrow::Table> table;
98+
if ( !arrow_reader->ReadTable( &table ).ok() )
99+
{
100+
return {};
101+
}
102+
103+
QString tableText;
104+
QTextStream stream( &tableText );
105+
RifCsvDataTableFormatter formatter( stream, ";" );
106+
107+
std::vector<RifTextDataTableColumn> header;
108+
for ( std::string columnName : table->ColumnNames() )
109+
{
110+
header.push_back( RifTextDataTableColumn( QString::fromStdString( columnName ) ) );
111+
}
112+
113+
formatter.header( header );
114+
115+
std::vector<std::vector<double>> columnVectors;
116+
117+
for ( std::string columnName : table->ColumnNames() )
118+
{
119+
std::shared_ptr<arrow::ChunkedArray> column = table->GetColumnByName( columnName );
120+
121+
auto columnType = column->type()->id();
122+
123+
if ( columnType == arrow::Type::DOUBLE )
124+
{
125+
std::vector<double> columnVector = RifArrowTools::convertChunkedArrayToStdVector( column );
126+
columnVectors.push_back( columnVector );
127+
}
128+
else if ( column->type()->id() == arrow::Type::FLOAT )
129+
{
130+
auto floatVector = RifArrowTools::convertChunkedArrayToStdFloatVector( column );
131+
std::vector<double> columnVector( floatVector.begin(), floatVector.end() );
132+
columnVectors.push_back( columnVector );
133+
}
134+
}
135+
136+
if ( columnVectors.empty() )
137+
{
138+
return {};
139+
}
140+
141+
for ( int i = 0; i < std::min( 20, int( columnVectors[0].size() ) ); i++ )
142+
{
143+
for ( int j = 0; j < columnVectors.size(); j++ )
144+
{
145+
formatter.add( columnVectors[j][i] );
146+
}
147+
formatter.rowCompleted();
148+
}
149+
150+
formatter.tableCompleted();
151+
152+
return tableText;
153+
}

ApplicationLibCode/FileInterface/RifOsduWellLogReader.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,6 @@ class RifOsduWellLogReader
3333
{
3434
public:
3535
static std::pair<cvf::ref<RigOsduWellLogData>, QString> readWellLogData( const QByteArray& contents );
36+
37+
static QString readSummaryData( const QByteArray& contents );
3638
};

ApplicationLibCode/ProjectDataModel/Sumo/RimSumoConnector.cpp

Lines changed: 43 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -104,17 +104,14 @@ void RimSumoConnector::requestFailed( const QAbstractOAuth::Error error )
104104
//--------------------------------------------------------------------------------------------------
105105
///
106106
//--------------------------------------------------------------------------------------------------
107-
void RimSumoConnector::parquetDownloadComplete( const QByteArray& contents, const QString& url )
107+
void RimSumoConnector::parquetDownloadComplete( const QString& blobId, const QByteArray& contents, const QString& url )
108108
{
109-
if ( m_redirect.isEmpty() )
110-
{
111-
m_parquetData = contents;
112-
}
113-
else
114-
{
115-
auto url = m_redirect;
116-
requestBlobWithoutTokenHeader( url );
117-
}
109+
SumoRedirect obj;
110+
obj.objectId = blobId;
111+
obj.contents = contents;
112+
obj.url = url;
113+
114+
m_redirectInfo.push_back( obj );
118115
}
119116

120117
//--------------------------------------------------------------------------------------------------
@@ -333,78 +330,75 @@ void RimSumoConnector::requestBlobIdForEnsemble( const QString& caseId, const QS
333330
//--------------------------------------------------------------------------------------------------
334331
///
335332
//--------------------------------------------------------------------------------------------------
336-
void RimSumoConnector::requestParquet( const QString& blobId )
333+
void RimSumoConnector::requestBlobDownload( const QString& blobId )
337334
{
338335
QString url = constructDownloadUrl( m_server, blobId );
339336

340-
QNetworkRequest m_networkRequest;
341-
m_networkRequest.setUrl( url );
342-
m_networkRequest.setAttribute( QNetworkRequest::RedirectPolicyAttribute, QNetworkRequest::ManualRedirectPolicy );
337+
QNetworkRequest networkRequest;
338+
networkRequest.setUrl( url );
343339

344-
addStandardHeader( m_networkRequest, m_token, RiaDefines::contentTypeJson() );
340+
// Other redirection policies are NoLessSafeRedirectPolicy, SameOriginRedirectPolicy, UserVerifiedRedirectPolicy. They were tested, but
341+
// did not work. Use ManualRedirectPolicy instead, and inspect the reply for the redirection target.
342+
networkRequest.setAttribute( QNetworkRequest::RedirectPolicyAttribute, QNetworkRequest::ManualRedirectPolicy );
345343

346-
auto reply = m_networkAccessManager->get( m_networkRequest );
344+
addStandardHeader( networkRequest, m_token, RiaDefines::contentTypeJson() );
347345

348-
connect( this,
349-
SIGNAL( parquetDownloadFinished( const QByteArray&, const QString& ) ),
350-
this,
351-
SLOT( parquetDownloadComplete( const QByteArray&, const QString& ) ) );
346+
auto reply = m_networkAccessManager->get( networkRequest );
352347

353348
connect( reply,
354349
&QNetworkReply::finished,
355-
[this, reply, url]()
350+
[this, reply, blobId, url]()
356351
{
357352
if ( reply->error() == QNetworkReply::NoError )
358353
{
359354
auto contents = reply->readAll();
360355

361356
QVariant redirectUrl = reply->attribute( QNetworkRequest::RedirectionTargetAttribute );
362-
363-
// Post the request to the redirected URL
364-
// QNetworkRequest redirectRequest( redirectUrl.toUrl() );
365-
m_redirect = redirectUrl.toString();
366-
367-
emit parquetDownloadFinished( contents, url );
357+
if ( redirectUrl.isValid() )
358+
{
359+
requestBlobByRedirectUri( blobId, redirectUrl.toString() );
360+
}
361+
else
362+
{
363+
QString errorMessage = "Not able to parse and interpret valid redirect Url";
364+
RiaLogging::error( errorMessage );
365+
}
368366
}
369367
else
370368
{
371369
QString errorMessage = "Download failed: " + url + " failed." + reply->errorString();
372370
RiaLogging::error( errorMessage );
373-
emit parquetDownloadFinished( QByteArray(), errorMessage );
374371
}
375372
} );
376373
}
377374

378375
//--------------------------------------------------------------------------------------------------
379376
///
380377
//--------------------------------------------------------------------------------------------------
381-
void RimSumoConnector::requestBlobWithoutTokenHeader( const QString& url )
378+
void RimSumoConnector::requestBlobByRedirectUri( const QString& blobId, const QString& redirectUri )
382379
{
383-
QNetworkRequest m_networkRequest;
384-
m_networkRequest.setUrl( url );
380+
QNetworkRequest networkRequest;
381+
networkRequest.setUrl( redirectUri );
385382

386-
auto reply = m_networkAccessManager->get( m_networkRequest );
387-
388-
connect( this,
389-
SIGNAL( parquetDownloadFinished( const QByteArray&, const QString& ) ),
390-
this,
391-
SLOT( parquetDownloadComplete( const QByteArray&, const QString& ) ) );
383+
auto reply = m_networkAccessManager->get( networkRequest );
392384

393385
connect( reply,
394386
&QNetworkReply::finished,
395-
[this, reply, url]()
387+
[this, reply, blobId, redirectUri]()
396388
{
397389
if ( reply->error() == QNetworkReply::NoError )
398390
{
399391
auto contents = reply->readAll();
400392

401-
emit parquetDownloadFinished( contents, url );
393+
QString msg = "Received data from : " + redirectUri;
394+
RiaLogging::info( msg );
395+
396+
parquetDownloadComplete( blobId, contents, redirectUri );
402397
}
403398
else
404399
{
405-
QString errorMessage = "Download failed: " + url + " failed." + reply->errorString();
400+
QString errorMessage = "Download failed: " + redirectUri + " failed." + reply->errorString();
406401
RiaLogging::error( errorMessage );
407-
emit parquetDownloadFinished( QByteArray(), errorMessage );
408402
}
409403
} );
410404
}
@@ -763,3 +757,11 @@ std::vector<QString> RimSumoConnector::blobIds() const
763757
{
764758
return m_blobName;
765759
}
760+
761+
//--------------------------------------------------------------------------------------------------
762+
///
763+
//--------------------------------------------------------------------------------------------------
764+
std::vector<SumoRedirect> RimSumoConnector::blobContents() const
765+
{
766+
return m_redirectInfo;
767+
}

0 commit comments

Comments
 (0)