-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds helper class for unbounded read test. #73
Adds helper class for unbounded read test. #73
Conversation
This module is similar to the BigQueryExample. A few changes to count the number of records and log them.
This test reads a simpleTable. Shell script and python script to check the number of records read.
This test reads a simpleTable. Shell script and python script to check the number of records read.
This test reads a simpleTable. Shell script and python script to check the number of records read.
This test reads a simpleTable. Shell script and python script to check the number of records read.
This test reads a simpleTable. Shell script and python script to check the number of records read.
This test reads a simpleTable. Shell script and python script to check the number of records read.
comments CODECOV_TOKEN usage.
…bounded and unbounded source.
…ble with complex schema.
…ble with complex schema.
…ds to different tables required for the e2e tests.
avro.io.DatumWriter(), | ||
self.schema, | ||
) | ||
self.table_type.write_rows( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
) | ||
writer.close() | ||
|
||
def transfer_avro_to_bq_table(self, thread_number): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transfer_avro_data_to_bq_table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or transfer_avro_rows_to_bq_table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
local_avro_file = self.avro_file_local.replace( | ||
'.', '_' + thread_number + '.' | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exactly is being done here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avro_file_local
has a generic names e.g. "filename.avro". But, we write and upload several
avro files concurrently, to prevent race conditions we write and read via separate
files having names according to the thread numbers. "filename.avro" is changed to
"filename_<thread_number>.avro"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you assign and keep track of "thread numbers"?
Also, as I understand it, thread number
is an identifier/suffix and should be named as such. This method (or file in general) does not need to care how the identifier/suffix is derived. Please consider renaming to filename_identifier
or filename_suffix
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The user/we (since this is for testing) can decide the number of threads we wanna deploy.
thread number is simply obtained from the iterator in a loop where we create the threads. In other words thread_number is simply an integer in [0,1,..., n-1] where n is the total number of threads we/user decides to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed now.
thread_number: The number of threads to perform the function concurrently | ||
to add the avro rows to. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really get this description. What does thread_number
represent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number of threads that concurrently perform the operation
. The
operation
here refers to generation of records, storing them to avro files,and uploading
them to a BQ table.
file_name = self.avro_file_local.replace('.', '_' + thread_number + '.') | ||
os.remove(file_name) | ||
|
||
def create_transfer_records( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need a better name here. This is ambiguous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
self.required_arguments = required_arguments | ||
self.acceptable_arguments = acceptable_arguments | ||
|
||
def __get_arguments(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to enforce name mangling
here. Single underscore prefix (indicating internal use
) is sufficient.
) from exc | ||
return argument_dictionary | ||
|
||
def __validate_arguments(self, arguments_dictionary): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…into nightly-tests-unbouned-read-1
…riting entries to various table types.
Removed name mangling.
) | ||
|
||
|
||
def generate_string(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: generate_random_string
is more precise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|
||
|
||
def generate_long(): | ||
return random.choice(range(0, 10000000)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can also do this instead random.randint(0, 10000000)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still seeing return random.choice(range(0, 10000000)
self.delete_local_file(avro_file_local_identifier) | ||
|
||
|
||
class ArgumentInputUtils: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is ok, but we can also use argparse: https://docs.python.org/3/library/argparse.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Seems more pythonic
…ment intake in table_read.sh
…ment intake in table_read.sh
Adds utils.py, containing helper functions for dynamic addition of data during unbounded read test.
Aslo modifies parse_logs.py to use the same for argument input.
/gcbrun