-
Notifications
You must be signed in to change notification settings - Fork 0
/
http_sig_utils.rb
121 lines (102 loc) · 2.45 KB
/
http_sig_utils.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
require 'json'
require 'ssh_data'
require 'http_signatures'
class FingerprintKeyStore < HttpSignatures::KeyStore
def initialize(key_hash)
@fpidx = {}
super(key_hash)
key_hash.each do |id, k|
kk = SSHData::PublicKey.parse_openssh(k.fetch(:public_key))
@fpidx[kk.fingerprint(md5: true).downcase] = id
end
end
def fetch(id)
return @keys.fetch(id) if @keys.has_key?(id)
fp = id.split('/').last.downcase
return @keys.fetch(@fpidx[fp]) if @fpidx.has_key?(fp)
raise KeyError.new("key not found: #{id}")
end
end
class RackMessageWrapper
def initialize(request:)
@req = request
end
def fetch(key)
@req.get_header("HTTP_#{key.upcase.gsub('-','_')}")
end
def [](key)
fetch(key)
end
def method
@req.get_header('REQUEST_METHOD')
end
def path
@req.get_header('REQUEST_PATH')
end
end
class SSOVerification
def initialize(request:, admin_users: [])
@req = request
@admins = admin_users
@user = @req.get_header('HTTP_X_UQ_USER')
blob = @req.get_header('HTTP_X_KVD_PAYLOAD')
begin
@blob = JSON.parse(blob, :symbolize_names => true)
rescue Exception
@blob = nil
end
end
def valid?
not @user.nil? and not @blob.nil?
end
def is_admin?
@admins.include?(@user) or @blob[:groups].include?('eait:itig')
end
def key_info
{:type => :sso, :user => @user, :admin => is_admin?}
end
end
class AuthzVerification < HttpSignatures::Verification
def initialize(message:, key_store:, required_headers:[])
super(message: message, key_store: key_store)
@reqhdrs = required_headers
end
def key_id
parsed_parameters['keyId']
end
def key_info
ki = key.secret.dup
ki.delete(:private_key)
return ki
end
def signature_header_present?
hdr = fetch_header('Authorization')
if not hdr.nil? and hdr =~ /^Signature /
true
else
false
end
end
def signed_header?(hdr)
signedhdrs = header_list.to_a
signedhdrs.include?(hdr)
end
def valid?
begin
return false if not super
signedhdrs = header_list.to_a
@reqhdrs.each { |h| return false unless signedhdrs.include?(h) }
return true
rescue KeyError
return false
end
end
def signature_header
val = fetch_header('Authorization')
val =~ /^Signature +(.+)$/
$1
end
def parsed_parameters
@_parsed_parameters ||= HttpSignatures::SignatureParametersParser.new(signature_header).parse
end
end