Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds python script for incremental partition insertion. #76

Conversation

prashastia
Copy link
Collaborator

insert_dynamic_partitions.py A python script to insert partitions incrementally into the partitioned table. Used by the e2e unbounded source testing script.

/gcbrun

This module is similar to the BigQueryExample. A few changes to count the number of records and log them.
This test reads a simpleTable.
Shell script and python script to check the number of records read.
This test reads a simpleTable.
Shell script and python script to check the number of records read.
This test reads a simpleTable.
Shell script and python script to check the number of records read.
This test reads a simpleTable.
Shell script and python script to check the number of records read.
This test reads a simpleTable.
Shell script and python script to check the number of records read.
This test reads a simpleTable.
Shell script and python script to check the number of records read.
comments CODECOV_TOKEN usage.
…ds to different tables required for the e2e tests.
parser.add_argument(
'--refresh_interval',
dest='refresh_interval',
help='.',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a proper description.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Comment on lines 26 to 27
'--now_timestamp',
dest='now_timestamp',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a great name. Something like execution_timestamp makes more sense. Btw, why do we need this argument?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the name. We need this argument to make sure we insert the partitions for the current date. If we provide a fixed date, then they will not be read (as partitions have been marked as closed) .

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current date can be found during execution using python's time or date libraries. Why do we need to take that as input?

Comment on lines 67 to 70
now_timestamp = args.now_timestamp
now_timestamp = datetime.datetime.strptime(
now_timestamp, '%Y-%m-%d'
).astimezone(datetime.timezone.utc)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either use different variables, or replace with:

    now_timestamp = datetime.datetime.strptime(
        args.now_timestamp, '%Y-%m-%d'
    ).astimezone(datetime.timezone.utc)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Comment on lines 83 to 88
simple_avro_schema_string = (
'{"namespace": "project.dataset","type": "record","name":'
' "table","doc": "Avro Schema for project.dataset.table",'
+ simple_avro_schema_fields_string
+ '}'
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace with:

simple_avro_schema_string = (
    '{"namespace": "project.dataset","type": "record","name":'
    ' "table","doc": "Avro Schema for project.dataset.table",'
    f'{simple_avro_schema_fields_string}'
    '}'
)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

avro_file_local_identifier = avro_file_local.replace(
'.', '_' + str(thread_number) + '.'
)
x = threading.Thread(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a proper name. x is not sufficient.

thread.join()

time_elapsed = time.time() - start_time
prev_partitions_offset += number_of_partitions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is prev_partitions_offset being incremented multiple times in the same iteration?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Within the same iteration we are adding rows spread amongst 1 or more partitions.
so that at a new read, we make sure that multiple partitions are being read from.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's take an example.

First iteration:

    for number_of_partitions in partitions:   # number_of_partitions is 2
        ...
        prev_partitions_offset += 1   # prev_partitions_offset is 1
        ...
        # called avro_to_bq_with_cleanup with partition_number as 1
        # called avro_to_bq_with_cleanup with partition_number as 2
        ...
        prev_partitions_offset += number_of_partitions   # prev_partitions_offset is 3
        ...

Second iteration:

    for number_of_partitions in partitions:   # number_of_partitions is 1
        ...
        prev_partitions_offset += 1   # prev_partitions_offset is 4
        ...
        # called avro_to_bq_with_cleanup with partition_number as 4
        ...
        prev_partitions_offset += number_of_partitions   # prev_partitions_offset is 5
        ...

Third iteration:

    for number_of_partitions in partitions:   # number_of_partitions is 2
        ...
        prev_partitions_offset += 1   # prev_partitions_offset is 6
        ...
        # called avro_to_bq_with_cleanup with partition_number as 6
        # called avro_to_bq_with_cleanup with partition_number as 7
        ...
        prev_partitions_offset += number_of_partitions   # prev_partitions_offset is 8
        ...

So, we've skipped partition offsets 3 and 5. If that is intentional, then why?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is correct.
So to maintain time consistency, I took time at UTC which would be 18:30 hrs.
So (18:30 + 2 = 20:30) to (18:30 + 3 = 21:30) will generate values in partitions 20hrs, 21 hrs.

So in next phase if we generate for 18:30 + 3 - 18:30 + 4, the partitions will clash.

I think this is getting too confusing.
I'll fix this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fixed now.

dataset_name = args.dataset_name
table_name = args.table_name

execution_timestamp = datetime.datetime.now(tz=datetime.timezone.utc).replace(hour=0,
Copy link
Collaborator Author

@prashastia prashastia Jan 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now generating midnight timestamp. So we can insert partitions incrementally irrespective of the time of the day the script is executed.

Comment on lines 93 to 95
number_of_rows_per_thread = int(
number_of_rows_per_partition / number_of_threads
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check out floor division:number_of_rows_per_partition // number_of_threads.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Comment on lines 19 to 21
# This is a buffer time to allow the read streams to be formed.
# Allows conflicting conditions by making sure that
# new (to be generated) partitions are not part of the current read stream.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffer time to ensure that new partitions are created after previous read session and before next split discovery.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

table_id,
)

# Insert in phases.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# Insert iteratively.
Iteration is more appropriate than phase. Please apply that to other comments in this loop.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

from utils import utils


def wait():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear to the caller how long the wait is. This function can be named sleepForMinutes or sleepForSeconds with an argument.

partitions = [2, 1, 2]
# Insert 10000 - 30000 rows per partition.
# So, in a read up to 60000 new rows are read.
number_of_rows_per_partition = random.randint(1, 3) * 10000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for randomizing this? Will an optional argument with a default value serve the purpose?

Copy link
Collaborator Author

@prashastia prashastia Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind randomization was to introduce a possibility of different number of rows being inserted at every execution.
But yeah we could hardcode a fixed value or even take it as an argument.

prev_partitions_offset += number_of_partitions
# We wait for the refresh to happen
# so that the data just created can be read.
while time_elapsed < float(60 * 2 * refresh_interval):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more efficient to sleep here for the required amount of time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The step before this includes the generation and insertion of rows to the BQ table. The time for this insertion is variable. In order to make sure next insertion does not take place before the next read stream formation, we explicitly wait for the stipulated amount of time to pass.

@jayehwhyehentee jayehwhyehentee merged commit ff65e15 into GoogleCloudDataproc:main Jan 8, 2024
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants