Skip to content

Commit

Permalink
Add copy_table_from_file to Script::Utils
Browse files Browse the repository at this point in the history
admin/MBImport.pl and admin/replication/ImportReplicationChanges contained very
similar implementations of `ImportTable`, so it would be ideal to share them.
I'd also like to use the same functionality in a future commit (to load
dbmirror2 packets into temporary tables).

The implementations in these two files did diverge slightly. For one,
MBImport.pl's allowed fixing broken UTF-8 byte sequences. I'm not sure how
necessary that is in 2024, or what the historical reasons for adding it were,
but I kept the functionality behind a flag in `copy_table_from_file`.

MBImport.pl's also supported the flags `$delete_first` (to empty the table
before importing) and `$fProgress` (to control whether progress is shown).
I've basically kept all of MBImport.pl's code, with these features behind
`%opts` flags.

I kept the definitions of `ImportTable`, but they now call
`copy_table_from_file` internally. I couldn't replace all of the `ImportTable`
calls with direct calls to `copy_table_from_file`, because `ImportTable` also
updates statistics local to each file and has a different return value.
  • Loading branch information
mwiencek committed Mar 8, 2024
1 parent fe40684 commit 2b06d5d
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 157 deletions.
109 changes: 16 additions & 93 deletions admin/MBImport.pl
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@
use Getopt::Long;
use DBDefs;
use Sql;
use MusicBrainz::Script::Utils qw( is_table_empty );
use MusicBrainz::Script::Utils qw(
copy_table_from_file
is_table_empty
);
use MusicBrainz::Server::Replication qw( :replication_type );
use MusicBrainz::Server::Constants qw( @FULL_TABLE_LIST );

use aliased 'MusicBrainz::Server::DatabaseConnectionFactory' => 'Databases';

my ($fHelp, $fIgnoreErrors);
my $tmpdir = '/tmp';
my $fProgress = -t STDOUT;
my $fFixUTF8 = 0;
my $skip_ensure_editor = 0;
my $update_replication_control = 1;
Expand Down Expand Up @@ -215,104 +217,25 @@ sub usage

exit($errors ? 1 : 0);



sub ImportTable
{
my ($table, $file) = @_;

print localtime() . " : load $table\n";

my $rows = 0;

my $t1 = [gettimeofday];
my $interval;

my $size = -s($file)
or return 1;

my $p = sub {
my ($pre, $post) = @_;
no integer;
printf $pre.'%-30.30s %9d %3d%% %9d'.$post,
$table, $rows, int(100 * tell(LOAD) / $size),
$rows / ($interval||1);
};

$OUTPUT_AUTOFLUSH = 1;

eval
{
# open in :bytes mode (always keep byte octets), to allow fixing of invalid
# UTF-8 byte sequences in --fix-broken-utf8 mode.
# in default mode, the Pg driver will take care of the UTF-8 transformation
# and croak on any invalid UTF-8 character
open(LOAD, '<:bytes', $file) or die "open $file: $OS_ERROR";

# If you're looking at this code because your import failed, maybe
# with an error like this:
# ERROR: copy: line 1, Missing data for column "automodsaccepted"
# then the chances are it's because the data you're trying to load
# doesn't match the structure of the database you're trying to load it
# into. Please make sure you've got the right copy of the server
# code, as described in the INSTALL file.

$sql->begin;
$sql->do("DELETE FROM $table") if $delete_first;
my $dbh = $sql->dbh; # issues a ping, must be done before COPY
$sql->do("COPY $table FROM stdin");

$p->('', '') if $fProgress;
my $t;

use Encode;
while (<LOAD>)
{
$t = $_;
if ($fFixUTF8) {
# replaces any invalid UTF-8 character with special 0xFFFD codepoint
# and warn on any such occurence
$t = Encode::decode('UTF-8', $t, Encode::FB_DEFAULT | Encode::WARN_ON_ERR);
} else {
$t = Encode::decode('UTF-8', $t, Encode::FB_CROAK);
}
if (!$dbh->pg_putcopydata($t))
{
print 'ERROR while processing: ', $t;
die;
}

++$rows;
unless ($rows & 0xFFF)
{
$interval = tv_interval($t1);
$p->("\r", '') if $fProgress;
}
}
$dbh->pg_putcopyend() or die;
$interval = tv_interval($t1);
$p->(($fProgress ? "\r" : ''), sprintf(" %.2f sec\n", $interval));

close LOAD
or die $OS_ERROR;

$sql->commit;

die 'Error loading data'
if -f $file and is_table_empty($sql, $table);
my $rows = copy_table_from_file(
$sql, $table, $file,
delete_first => $delete_first,
fix_utf8 => $fFixUTF8,
ignore_errors => $fIgnoreErrors,
);

if ($rows) {
++$tables;
$totalrows += $rows;

1;
};

return 1 unless $EVAL_ERROR;
warn "Error loading $file: $EVAL_ERROR";
$sql->rollback;

++$errors, return 0 if $fIgnoreErrors;
exit 1;
return 1;
} else {
++$errors;
return 0;
}
}

sub ImportAllTables
Expand Down
78 changes: 14 additions & 64 deletions admin/replication/ImportReplicationChanges
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use FindBin;
use lib "$FindBin::Bin/../../lib";

use Getopt::Long;
use MusicBrainz::Script::Utils qw( is_table_empty );
use MusicBrainz::Script::Utils qw(
copy_table_from_file
is_table_empty
);
use MusicBrainz::Server::Context;
use DBDefs;
use Sql;
Expand Down Expand Up @@ -105,72 +108,19 @@ sub ImportTable
{
my ($table, $file) = @_;

print localtime() . " : load $table\n";

my $rows = 0;

my $t1 = [gettimeofday];
my $interval;

my $size = -s($file) || 1;

my $p = sub {
my ($pre, $post) = @_;
no integer;
printf $pre.'%-30.30s %9d %3d%% %9d'.$post,
$table, $rows, int(100 * tell(LOAD) / $size),
$rows / ($interval||1);
};

$OUTPUT_AUTOFLUSH = 1;

eval
{
open(LOAD, '<:encoding(utf8)', $file) or die "open $file: $OS_ERROR";

$sql->begin;
my $dbh = $sql->dbh; # issues a ping, must be done before COPY
$sql->do("COPY $table FROM stdin");

$p->('', '');

while (<LOAD>)
{
$dbh->pg_putcopydata($_) or die;

++$rows;
unless ($rows & 0xFFF)
{
$interval = tv_interval($t1);
$p->("\r", '');
}
}

$dbh->pg_putcopyend() or die;

$interval = tv_interval($t1);
$p->("\r", sprintf(" %.2f sec\n", $interval));

close LOAD
or die $OS_ERROR;

$sql->commit;

die 'Error loading data'
if -f $file and is_table_empty($sql, $table);
my $rows = copy_table_from_file(
$sql, $table, $file,
ignore_errors => $fIgnoreErrors,
);

if ($rows) {
++$tables;
$totalrows += $rows;

1;
};

return 1 unless $EVAL_ERROR;
warn "Error loading $file: $EVAL_ERROR";
$sql->rollback;

++$errors, return 0 if $fIgnoreErrors;
exit 1;
return 1;
} else {
++$errors;
return 0;
}
}

sub ImportReplicationTables
Expand Down
113 changes: 113 additions & 0 deletions lib/MusicBrainz/Script/Utils.pm
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,132 @@ package MusicBrainz::Script::Utils;
use strict;
use warnings;

use Encode;
use English;
use Time::HiRes qw( gettimeofday tv_interval );

use feature 'state';

use base 'Exporter';

our @EXPORT_OK = qw(
copy_table_from_file
get_primary_keys
is_table_empty
log
retry
);

=sub copy_table_from_file
Imports C<$file> into C<$table> via PostgreSQL's C<COPY> statement.
Returns the number of rows imported.
=cut

sub copy_table_from_file {
my ($sql, $table, $file, %opts) = @_;

my $delete_first = $opts{delete_first};
my $fix_utf8 = $opts{fix_utf8};
my $ignore_errors = $opts{ignore_errors};
my $quiet = $opts{quiet};
my $show_progress = !$quiet && ($opts{show_progress} // (-t STDOUT));

print localtime() . " : load $table\n"
unless $quiet;

my $rows = 0;
my $t1 = [gettimeofday];
my $interval;

my $size = -s($file)
or return 1;

my $p = sub {
my ($pre, $post) = @_;
no integer;
printf $pre.'%-30.30s %9d %3d%% %9d'.$post,
$table, $rows, int(100 * tell(LOAD) / $size),
$rows / ($interval || 1);
};

$OUTPUT_AUTOFLUSH = 1;

eval {
# Open in :bytes mode (always keep byte octets), to allow fixing of
# invalid UTF-8 byte sequences in --fix-broken-utf8 mode.
# In default mode, the Pg driver will take care of the UTF-8
# transformation and croak on any invalid UTF-8 character.
open(LOAD, '<:bytes', $file) or die "open $file: $OS_ERROR";

# If you're looking at this code because your import failed, maybe
# with an error like this:
# ERROR: copy: line 1, Missing data for column "automodsaccepted"
# then the chances are it's because the data you're trying to load
# doesn't match the structure of the database you're trying to load
# it into. Please make sure you've got the right copy of the server
# code, as described in the INSTALL file.

$sql->begin;
$sql->do("DELETE FROM $table") if $delete_first;

my $dbh = $sql->dbh; # issues a ping, must be done before COPY
$sql->do("COPY $table FROM stdin");

$p->('', '') if $show_progress;

my $t;
while (<LOAD>) {
$t = $_;
if ($fix_utf8) {
# Replaces any invalid UTF-8 character with special 0xFFFD
# codepoint and warn on any such occurence.
$t = Encode::decode('UTF-8', $t,
Encode::FB_DEFAULT |
Encode::WARN_ON_ERR);
} else {
$t = Encode::decode('UTF-8', $t, Encode::FB_CROAK);
}
if (!$dbh->pg_putcopydata($t)) {
print 'ERROR while processing: ', $t;
die;
}

++$rows;
unless ($rows & 0xFFF) {
$interval = tv_interval($t1);
$p->("\r", '') if $show_progress;
}
}

$dbh->pg_putcopyend or die;

$interval = tv_interval($t1);
$p->(($show_progress ? "\r" : ''),
sprintf(" %.2f sec\n", $interval))
unless $quiet;

close LOAD
or die $OS_ERROR;

$sql->commit;

die 'Error loading data'
if -f $file and is_table_empty($sql, $table);

1;
};

return $rows unless $EVAL_ERROR;
warn "Error loading $file: $EVAL_ERROR";
$sql->rollback;

return 0 if $ignore_errors;
exit 1;
}

=sub get_primary_keys
Get a list of primary key column names for $schema.$table.
Expand Down

0 comments on commit 2b06d5d

Please sign in to comment.