-
Notifications
You must be signed in to change notification settings - Fork 0
/
example_pyflyt.jl
78 lines (64 loc) · 3.04 KB
/
example_pyflyt.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
using Revise
# import Pkg; Pkg.add("BayesianSafetyValidation")
using BayesianSafetyValidation
using Distributions
@with_kw mutable struct PyFlytWaypointSystem <: System.SystemParameters
x1 = 0.0
y1 = 0.0
z1 = 0.0
end
# function System.evaluate(sparams::PyFlytWaypointSystem, inputs::Vector; kwargs...)
# return [readchomp(`wsl python3 src/pyflyt_evaluate.py --waypoints $(x[1]) $(x[2]) $(x[3])`) == "success" for x in inputs]
# end
function System.evaluate(sparams::PyFlytWaypointSystem, inputs::Vector; kwargs...)
@info "Evaluating with parameters:"
@info "System Parameters: x1=$(sparams.x1), y1=$(sparams.y1), z1=$(sparams.z1)"
@info "Inputs: $inputs"
# Capture and print results from Python evaluations
results = [strip(readchomp(`wsl python3 src/pyflyt_evaluate.py --waypoints $(x[1]) $(x[2]) $(x[3])`)) for x in inputs]
for (i, result) in enumerate(results)
println("Raw result from Python evaluation for input $i: $result")
# !!! important: system convention: failure = 1, success = 0
status = if occursin("success", result)
0
elseif occursin("failure", result)
1
else
"unknown"
end
@info "Evaluation result for input $i: $status"
end
# bool array based on 1=failure, 0=success
final_results = [occursin("failure", result) ? 1 : 0 for result in results]
println("Final interpreted results for BSV: $final_results")
return final_results
end
# hold z constant and vary x/y
px1 = OperationalParameters("x1", [-5.0, 5.0], Normal(0.0, 2.0))
py1 = OperationalParameters("y1", [-5.0, 5.0], Normal(0.0, 2.0))
pz1 = OperationalParameters("z1", [1.5, 1.5], Normal(1.5, 0.5))
# px3 = OperationalParameters("x3", [-5.0, 5.0], Normal(0.0, 1.0))
# px4 = OperationalParameters("x4", [-5.0, 5.0], Normal(0.0, 1.0))
model = [px1, py1, pz1]
#sysparams:
system_params = PyFlytWaypointSystem()
@info "Fitting surrogate"
surrogate = bayesian_safety_validation(system_params, model; T=100, m=50, d=50)
@info "Done with surrogate"
# warm start - modify BSV to accept points
# surrogate = gp_fit(initialize_gp(; gp_args...), X, Y)
X_failures = falsification(surrogate.x, surrogate.y)
ml_failure = most_likely_failure(surrogate.x, surrogate.y, model)
p_failure = p_estimate(surrogate, model)
@info "Truth estimate"
#truth = truth_estimate(system_params, model) # when using simple systems
@info "Plotting"
# (gp, models, sparams; inds=[:, :, fill(1, length(models) - 2)...], num_steps=200, m=fill(num_steps, length(models)), latex_labels=true, show_data=false, overlay=false, tight=true, use_heatmap=false, hide_model=true, hide_ranges=false, titlefontsize=12, add_phantom_point=true)
p = plot_surrogate_truth_combined(surrogate, model, system_params; num_steps=10, show_data=true)
savefig(p, "pyflyt_bsv.png")
# t = plot_data!(surrogate.x, surrogate.y)
# savefig(t, "lunarlander_points.png")
j = plot_most_likely_failure(surrogate, model)
savefig(j, "pyflyt_mlf.png")
l = plot_distribution_of_failures(surrogate, model)
savefig(l, "dist_of_failures_pyflyt.png")