-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
176 additions
and
176 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
using HTTP | ||
using JSON | ||
using LibPQ, Tables | ||
using ProgressMeter | ||
|
||
function connect_db(db_name) | ||
user = String(UInt8.([106])) * String(UInt8.([118])) * String(UInt8.([111])) | ||
password = user * String(UInt8.([33])) | ||
# host = "jvof" # on zodiac | ||
host = "jvox.vo.nao.ac.jp" # on the cluster | ||
|
||
url = "postgresql://" * user | ||
|
||
if password != "" | ||
url *= ":" * password | ||
end | ||
|
||
url *= "@" * host | ||
url *= "/" * db_name | ||
|
||
return LibPQ.Connection(url) | ||
end | ||
|
||
function get_datasets(conn, threshold) | ||
# threshold is given in GB | ||
|
||
# above the threshold | ||
strSQL = "select dataset_id, file_size, path from cube where binf1=1 and binf2=1 and binf3=1 and binf4=1 and file_size>=$(threshold)*1024*1024*1024. order by file_size desc;" | ||
|
||
# below the threshold but over 20GB | ||
# strSQL = "select dataset_id, file_size, path from cube where binf1=1 and binf2=1 and binf3=1 and binf4=1 and file_size<$(threshold)*1024*1024*1024. and file_size>=20*1024*1024*1024. order by file_size desc;" | ||
|
||
res = execute(conn, strSQL) | ||
data = columntable(res) | ||
|
||
return data | ||
end | ||
|
||
function get_dataset_url(datasetid) | ||
return "http://grid60:8080/fitswebql/FITSWebQL.html?db=alma&table=cube&datasetId=" * datasetid | ||
end | ||
|
||
function copy_dataset(datasetid, file_size, path) | ||
src = "/home/alma/" * path | ||
dst = "/mnt/fits/files/" * datasetid * ".fits" | ||
|
||
# check if the src file exists | ||
if !isfile(src) | ||
println("The source file $(src) does not exist. Skipping.") | ||
return false | ||
end | ||
|
||
# get the src filesize | ||
src_filesize = filesize(src) | ||
|
||
if src_filesize != file_size | ||
println("The source file $(src) has a different size than the database. Skipping.") | ||
return false | ||
end | ||
|
||
println("Copying dataset $(datasetid) with size $(round(file_size / 1024^3,digits=1)) GB from $(src) to $(dst)") | ||
|
||
# check if the dst file already exists | ||
if isfile(dst) | ||
# first check the file size | ||
dst_filesize = filesize(dst) | ||
|
||
if dst_filesize == src_filesize | ||
println("The destination file $(dst) already exists. Skipping.") | ||
return true | ||
end | ||
end | ||
|
||
# make a 256KB chunk | ||
chunk = 256 * 1024 | ||
|
||
p = Progress(file_size, 1, "Copying...") # minimum update interval: 1 second | ||
|
||
# copy the source file in chunks | ||
open(src, "r") do src_file | ||
open(dst, "w") do dst_file | ||
while !eof(src_file) | ||
write(dst_file, (read(src_file, chunk))) | ||
update!(p, position(src_file)) | ||
end | ||
end | ||
end | ||
|
||
return true | ||
end | ||
|
||
function poll_progress(datasetid) | ||
strURL = "http://grid60:8080/fitswebql/progress/" * datasetid | ||
|
||
resp = HTTP.get(strURL) | ||
# println(resp) | ||
|
||
if resp.status == 200 | ||
return JSON.parse(String(resp.body))["progress"] | ||
else | ||
return nothing | ||
end | ||
end | ||
|
||
function preload_dataset(datasetid) | ||
local progress, strURL | ||
|
||
strURL = get_dataset_url(datasetid) | ||
|
||
# access the FITSWEBQLSE | ||
resp = HTTP.get(strURL) | ||
|
||
# check the HTTP response code | ||
if resp.status != 200 | ||
println(resp) | ||
return | ||
end | ||
|
||
# repeatedly poll for progress | ||
while true | ||
progress = poll_progress(datasetid) | ||
|
||
if isnothing(progress) | ||
println("\nno progress") | ||
break | ||
end | ||
|
||
println("datasetid: ", datasetid, ", progress: ", Int(floor(progress)), "%") | ||
|
||
# throw a DomainError if the progress is over 100% (should not happen, I want to catch any logical bugs, network problems, etc.) | ||
if progress > 100 | ||
println("\nanomalous progress detected: $(progress)!") | ||
throw(DomainError(progress, "anomalous progress detected")) | ||
end | ||
|
||
if progress == 100 | ||
break | ||
else | ||
sleep(1) | ||
end | ||
|
||
end | ||
|
||
# then wait 30 seconds to allow for the 60s dataset timeout (avoid a RAM overload) | ||
# sleep(61) # or not ... | ||
end | ||
|
||
conn = connect_db("alma") | ||
|
||
threshold = 21 # GB | ||
|
||
datasets = get_datasets(conn, threshold) | ||
|
||
count = 5 | ||
ids = datasets[:dataset_id][1:count] | ||
sizes = datasets[:file_size][1:count] | ||
paths = datasets[:path][1:count] | ||
|
||
count = 1 | ||
total_count = length(ids) # number of datasets to preload | ||
|
||
for (datasetid, file_size, path) in zip(ids, sizes, paths) | ||
global count | ||
local cache_type | ||
|
||
println("#$count/$total_count :: $datasetid :: $(round(file_size / 1024^3,digits=1)) GB") | ||
copy_dataset(datasetid, file_size, path) | ||
|
||
# increment the index | ||
count = count + 1 | ||
end | ||
|
||
jobs = [@async preload_dataset(id) for id in ids] | ||
wait.(jobs) | ||
|
||
close(conn) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,176 +0,0 @@ | ||
using HTTP | ||
using JSON | ||
using LibPQ, Tables | ||
using ProgressMeter | ||
|
||
function connect_db(db_name) | ||
user = String(UInt8.([106])) * String(UInt8.([118])) * String(UInt8.([111])) | ||
password = user * String(UInt8.([33])) | ||
# host = "jvof" # on zodiac | ||
host = "jvox.vo.nao.ac.jp" # on the cluster | ||
|
||
url = "postgresql://" * user | ||
|
||
if password != "" | ||
url *= ":" * password | ||
end | ||
|
||
url *= "@" * host | ||
url *= "/" * db_name | ||
|
||
return LibPQ.Connection(url) | ||
end | ||
|
||
function get_datasets(conn, threshold) | ||
# threshold is given in GB | ||
|
||
# above the threshold | ||
strSQL = "select dataset_id, file_size, path from cube where binf1=1 and binf2=1 and binf3=1 and binf4=1 and file_size>=$(threshold)*1024*1024*1024. order by file_size desc;" | ||
|
||
# below the threshold but over 20GB | ||
# strSQL = "select dataset_id, file_size, path from cube where binf1=1 and binf2=1 and binf3=1 and binf4=1 and file_size<$(threshold)*1024*1024*1024. and file_size>=20*1024*1024*1024. order by file_size desc;" | ||
|
||
res = execute(conn, strSQL) | ||
data = columntable(res) | ||
|
||
return data | ||
end | ||
|
||
function get_dataset_url(datasetid) | ||
return "http://grid60:8080/fitswebql/FITSWebQL.html?db=alma&table=cube&datasetId=" * datasetid | ||
end | ||
|
||
function copy_dataset(datasetid, file_size, path) | ||
src = "/home/alma/" * path | ||
dst = "/mnt/fits/files/" * datasetid * ".fits" | ||
|
||
# check if the src file exists | ||
if !isfile(src) | ||
println("The source file $(src) does not exist. Skipping.") | ||
return false | ||
end | ||
|
||
# get the src filesize | ||
src_filesize = filesize(src) | ||
|
||
if src_filesize != file_size | ||
println("The source file $(src) has a different size than the database. Skipping.") | ||
return false | ||
end | ||
|
||
println("Copying dataset $(datasetid) with size $(round(file_size / 1024^3,digits=1)) GB from $(src) to $(dst)") | ||
|
||
# check if the dst file already exists | ||
if isfile(dst) | ||
# first check the file size | ||
dst_filesize = filesize(dst) | ||
|
||
if dst_filesize == src_filesize | ||
println("The destination file $(dst) already exists. Skipping.") | ||
return true | ||
end | ||
end | ||
|
||
# make a 256KB chunk | ||
chunk = 256 * 1024 | ||
|
||
p = Progress(file_size, 1, "Copying...") # minimum update interval: 1 second | ||
|
||
# copy the source file in chunks | ||
open(src, "r") do src_file | ||
open(dst, "w") do dst_file | ||
while !eof(src_file) | ||
write(dst_file, (read(src_file, chunk))) | ||
update!(p, position(src_file)) | ||
end | ||
end | ||
end | ||
|
||
return true | ||
end | ||
|
||
function poll_progress(datasetid) | ||
strURL = "http://grid60:8080/fitswebql/progress/" * datasetid | ||
|
||
resp = HTTP.get(strURL) | ||
# println(resp) | ||
|
||
if resp.status == 200 | ||
return JSON.parse(String(resp.body))["progress"] | ||
else | ||
return nothing | ||
end | ||
end | ||
|
||
function preload_dataset(datasetid) | ||
local progress, strURL | ||
|
||
strURL = get_dataset_url(datasetid) | ||
|
||
# access the FITSWEBQLSE | ||
resp = HTTP.get(strURL) | ||
|
||
# check the HTTP response code | ||
if resp.status != 200 | ||
println(resp) | ||
return | ||
end | ||
|
||
# repeatedly poll for progress | ||
while true | ||
progress = poll_progress(datasetid) | ||
|
||
if isnothing(progress) | ||
println("\nno progress") | ||
break | ||
end | ||
|
||
println("datasetid: ", datasetid, ", progress: ", Int(floor(progress)), "%") | ||
|
||
# throw a DomainError if the progress is over 100% (should not happen, I want to catch any logical bugs, network problems, etc.) | ||
if progress > 100 | ||
println("\nanomalous progress detected: $(progress)!") | ||
throw(DomainError(progress, "anomalous progress detected")) | ||
end | ||
|
||
if progress == 100 | ||
break | ||
else | ||
sleep(1) | ||
end | ||
|
||
end | ||
|
||
# then wait 30 seconds to allow for the 60s dataset timeout (avoid a RAM overload) | ||
# sleep(61) # or not ... | ||
end | ||
|
||
conn = connect_db("alma") | ||
|
||
threshold = 21 # GB | ||
|
||
datasets = get_datasets(conn, threshold) | ||
|
||
count = 5 | ||
ids = datasets[:dataset_id][1:count] | ||
sizes = datasets[:file_size][1:count] | ||
paths = datasets[:path][1:count] | ||
|
||
count = 1 | ||
total_count = length(ids) # number of datasets to preload | ||
|
||
for (datasetid, file_size, path) in zip(ids, sizes, paths) | ||
global count | ||
local cache_type | ||
|
||
println("#$count/$total_count :: $datasetid :: $(round(file_size / 1024^3,digits=1)) GB") | ||
copy_dataset(datasetid, file_size, path) | ||
|
||
# increment the index | ||
count = count + 1 | ||
end | ||
|
||
jobs = [@async preload_dataset(id) for id in ids] | ||
wait.(jobs) | ||
|
||
close(conn) | ||