Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broad, sweeping improvements #37

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
a8ecd89
Improve retry system
brandonsturgeon Jun 18, 2023
ef50967
Add tests, refactor retries slightly
brandonsturgeon Jun 19, 2023
afa20d4
lol
brandonsturgeon Jun 19, 2023
da23cd9
Print anyway
brandonsturgeon Jun 19, 2023
bd933fc
wip
brandonsturgeon Jun 24, 2023
46d891d
Fully implement Range header
brandonsturgeon Jun 30, 2023
cb44110
Incr version, fix range header usage
brandonsturgeon Jun 30, 2023
5b236e2
Refine final chunk checking
brandonsturgeon Jun 30, 2023
c9f89b0
Fixes, cleanup
brandonsturgeon Jun 30, 2023
26bdb45
Change revision, print instead
brandonsturgeon Jun 30, 2023
6104db3
Increase default
brandonsturgeon Jun 30, 2023
0eee0e3
Header changes, new cache disable convar, debug prints
brandonsturgeon Jul 3, 2023
30ddea3
Add hacky fix for 404s
brandonsturgeon Jul 4, 2023
8e256af
Remove send delay
brandonsturgeon Jul 6, 2023
f1d89ab
WIP - raw text sending
brandonsturgeon Jul 7, 2023
0309ac1
Add timeout convar
brandonsturgeon Jul 17, 2023
c65aac6
Improve loading resilience, add HTTP override option
brandonsturgeon Jul 18, 2023
65e9a3b
Quiet print
brandonsturgeon Jul 19, 2023
8c72f32
Fix error call
brandonsturgeon Jul 20, 2023
5e00516
Allow disabling Ranges (#39)
brandonsturgeon Aug 1, 2023
8e6bbd5
Remove print
brandonsturgeon Sep 17, 2023
d607f2f
Get size on server, prevent duplicate sends
brandonsturgeon Sep 25, 2023
6d41bed
Small net fallback
brandonsturgeon Sep 27, 2023
61c8711
Remove prints - remove check
brandonsturgeon Sep 27, 2023
c64923f
Properly(?) handle physics objects
brandonsturgeon Nov 22, 2023
b9fdd99
Add clarification
brandonsturgeon Mar 5, 2024
bd44518
Fix missing cache key
brandonsturgeon Mar 11, 2024
c7c5463
Use SFS! (slight cleanup too)
brandonsturgeon May 23, 2024
0d370d1
Fix typo
brandonsturgeon May 24, 2024
3ea1fb8
add express.Seed (sv)
brandonsturgeon May 28, 2024
85b8614
Update sfs.lua
brandonsturgeon Aug 5, 2024
311beb7
Remove 404 retries, improve message about revision mismatch
brandonsturgeon Aug 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion lua/autorun/gm_express_init.lua
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
AddCSLuaFile( "includes/modules/pon.lua" )
express = {}
function express.Receive()
error( "express.Receive called before Express has loaded! Try using the ExpressLoaded hook to know when it's safe" )
end

AddCSLuaFile( "includes/modules/sfs.lua" )
include( "gm_express/sh_init.lua" )
376 changes: 376 additions & 0 deletions lua/autorun/netstream.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,376 @@
--A net extension which allows sending large streams of data without overflowing the reliable channel
--Keep it in lua/autorun so it will be shared between addons
AddCSLuaFile()

net.Stream = {}
net.Stream.ReadStreamQueues = {} --This holds a read stream for each player, or one read stream for the server if running on the CLIENT
net.Stream.WriteStreams = {} --This holds the write streams
net.Stream.SendSize = 20000 --This is the maximum size of each stream to send
net.Stream.Timeout = 30 --How long the data should exist in the store without being used before being destroyed
net.Stream.MaxServerReadStreams = 128 --The maximum number of keep-alives to have queued. This should prevent naughty players from flooding the network with keep-alive messages.
net.Stream.MaxServerChunks = 3200 --Maximum number of pieces the stream can send to the server. 64 MB
net.Stream.MaxTries = 3 --Maximum times the client may retry downloading the whole data
net.Stream.MaxKeepalive = 15 --Maximum times the client may request data stay live

net.Stream.ReadStream = {}
--Send the data sender a request for data
function net.Stream.ReadStream:Request()
if self.downloads == net.Stream.MaxTries * self.numchunks then self:Remove() return end
self.downloads = self.downloads + 1
-- print("Requesting",self.identifier,false,false,#self.chunks)

net.Start("NetStreamRequest")

Check warning on line 22 in lua/autorun/netstream.lua

View workflow job for this annotation

GitHub Actions / Lint / lint

"Space after parenthesis"

Style: Please add a space after the parenthesis
net.WriteUInt(self.identifier, 32)

Check warning on line 23 in lua/autorun/netstream.lua

View workflow job for this annotation

GitHub Actions / Lint / lint

"Space after parenthesis"

Style: Please add a space after the parenthesis

Check warning on line 23 in lua/autorun/netstream.lua

View workflow job for this annotation

GitHub Actions / Lint / lint

"Space before parenthesis"

Style: Please add a space before the parenthesis
net.WriteBit(false)

Check warning on line 24 in lua/autorun/netstream.lua

View workflow job for this annotation

GitHub Actions / Lint / lint

"Space after parenthesis"

Style: Please add a space after the parenthesis
net.WriteBit(false)

Check warning on line 25 in lua/autorun/netstream.lua

View workflow job for this annotation

GitHub Actions / Lint / lint

"Space after parenthesis"

Style: Please add a space after the parenthesis
net.WriteUInt(#self.chunks, 32)

Check warning on line 26 in lua/autorun/netstream.lua

View workflow job for this annotation

GitHub Actions / Lint / lint

"Space after parenthesis"

Style: Please add a space after the parenthesis

Check warning on line 26 in lua/autorun/netstream.lua

View workflow job for this annotation

GitHub Actions / Lint / lint

"Space before parenthesis"

Style: Please add a space before the parenthesis
if CLIENT then net.SendToServer() else net.Send(self.player) end

Check warning on line 27 in lua/autorun/netstream.lua

View workflow job for this annotation

GitHub Actions / Lint / lint

"Space after parenthesis"

Style: Please add a space after the parenthesis

Check warning on line 27 in lua/autorun/netstream.lua

View workflow job for this annotation

GitHub Actions / Lint / lint

"Space before parenthesis"

Style: Please add a space before the parenthesis

timer.Create("NetStreamReadTimeout" .. self.identifier, net.Stream.Timeout/2, 1, function() self:Request() end)

Check warning on line 29 in lua/autorun/netstream.lua

View workflow job for this annotation

GitHub Actions / Lint / lint

"Space after parenthesis"

Style: Please add a space after the parenthesis
end

--Received data so process it
function net.Stream.ReadStream:Read(size)
timer.Remove("NetStreamReadTimeout" .. self.identifier)

local progress = net.ReadUInt(32)
if self.chunks[progress] then return end

local crc = net.ReadString()
local data = net.ReadData(size)

if crc == util.CRC(data) then
self.chunks[progress] = data
else
pac.Message("net.Stream.ReadStream:Read(): hash received and hash of chunk do not match match")
end

if #self.chunks == self.numchunks then
self.returndata = table.concat(self.chunks)

if self.compressed then
self.returndata = util.Decompress(self.returndata)

if not self.returndata then
pac.Message("net.Stream.ReadStream:Read(): Failed to decompress data")
end
end

self:Remove()
else
self:Request()
end
end

--Gets the download progress
function net.Stream.ReadStream:GetProgress()
return #self.chunks/self.numchunks
end

--Pop the queue and start the next task
function net.Stream.ReadStream:Remove()
local ok, err = xpcall(self.callback, debug.traceback, self.returndata)
if not ok then ErrorNoHalt(err) end

net.Start("NetStreamRequest")
net.WriteUInt(self.identifier, 32)
net.WriteBit(false)
net.WriteBit(true)
if CLIENT then net.SendToServer() else net.Send(self.player) end

timer.Remove("NetStreamReadTimeout" .. self.identifier)
timer.Remove("NetStreamKeepAlive" .. self.identifier)

if self == self.queue[1] then
table.remove(self.queue, 1)
local nextInQueue = self.queue[1]
if nextInQueue then
timer.Remove("NetStreamKeepAlive" .. nextInQueue.identifier)
nextInQueue:Request()
else
net.Stream.ReadStreamQueues[self.player] = nil
end
else
for k, v in ipairs(self.queue) do
if v == self then
table.remove(self.queue, k)
break
end
end
end
end

net.Stream.ReadStream.__index = net.Stream.ReadStream

net.Stream.WriteStream = {}

-- The player wants some data
function net.Stream.WriteStream:Write(ply)
local progress = net.ReadUInt(32)+1
local chunk = self.chunks[progress]
if chunk then
self.clients[ply].progress = progress
net.Start("NetStreamDownload")
net.WriteUInt(#chunk.data, 32)
net.WriteUInt(progress, 32)
net.WriteString(chunk.crc)
net.WriteData(chunk.data, #chunk.data)
if CLIENT then net.SendToServer() else net.Send(ply) end
end
end

-- The player notified us they finished downloading or cancelled
function net.Stream.WriteStream:Finished(ply)
self.clients[ply].finished = true

if self.callback then
local ok, err = xpcall(self.callback, debug.traceback, ply)
if not ok then ErrorNoHalt(err) end
end
end

-- Get player's download progress
function net.Stream.WriteStream:GetProgress(ply)
return self.clients[ply].progress / #self.chunks
end

-- If the stream owner cancels it, notify everyone who is subscribed
function net.Stream.WriteStream:Remove()
local sendTo = {}
for ply, client in pairs(self.clients) do
if not client.finished then
client.finished = true
if ply:IsValid() then sendTo[#sendTo+1] = ply end
end
end

net.Start("NetStreamDownload")
net.WriteUInt(0, 32)
net.WriteUInt(self.identifier, 32)
if SERVER then net.Send(sendTo) else net.SendToServer() end
net.Stream.WriteStreams[self.identifier] = nil
end

net.Stream.WriteStream.__index = net.Stream.WriteStream

--Store the data and write the file info so receivers can request it.
local identifier = 1

function net.WriteStream(data, callback, dontcompress)
if not isstring(data) then
error("bad argument #1 to 'WriteStream' (string expected, got " .. type(data) .. ")", 2)
end

if callback ~= nil and not isfunction(callback) then
error("bad argument #2 to 'WriteStream' (function expected, got " .. type(callback) .. ")", 2)
end

local compressed = not dontcompress
if compressed then
data = util.Compress(data) or ""
end

if #data == 0 then
net.WriteUInt(0, 32)
return
end

local numchunks = math.ceil(#data / net.Stream.SendSize)
if CLIENT and numchunks > net.Stream.MaxServerChunks then
ErrorNoHalt("net.WriteStream request is too large! ", #data/1048576, "MiB")
net.WriteUInt(0, 32)
return
end

local chunks = {}
for i=1, numchunks do
local datachunk = string.sub(data, (i - 1) * net.Stream.SendSize + 1, i * net.Stream.SendSize)
chunks[i] = {
data = datachunk,
crc = util.CRC(datachunk),
}
end

local startid = identifier
while net.Stream.WriteStreams[identifier] do
identifier = identifier % 1024 + 1
if identifier == startid then
ErrorNoHalt("Netstream is full of WriteStreams!\n" .. debug.traceback() .. "\n")
net.WriteUInt(0, 32)
return
end
end

local stream = {
identifier = identifier,
chunks = chunks,
compressed = compressed,
numchunks = numchunks,
callback = callback,
clients = setmetatable({},{__index = function(t,k)
local r = {
finished = false,
downloads = 0,
keepalives = 0,
progress = 0,
} t[k]=r return r
end})
}

setmetatable(stream, net.Stream.WriteStream)

net.Stream.WriteStreams[identifier] = stream
timer.Create("NetStreamWriteTimeout" .. identifier, net.Stream.Timeout, 1, function() stream:Remove() end)

net.WriteUInt(numchunks, 32)
net.WriteUInt(identifier, 32)
net.WriteBool(compressed)

return stream
end

--If the receiver is a player then add it to a queue.
--If the receiver is the server then add it to a queue for each individual player
function net.ReadStream(ply, callback)
if CLIENT then
ply = NULL
else
if type(ply) ~= "Player" then
error("bad argument #1 to 'ReadStream' (Player expected, got " .. type(ply) .. ")", 2)
elseif not ply:IsValid() then
error("bad argument #1 to 'ReadStream' (Tried to use a NULL entity!)", 2)
end
end

if not isfunction(callback) then
error("bad argument #2 to 'ReadStream' (function expected, got " .. type(callback) .. ")", 2)
end

local queue = net.Stream.ReadStreamQueues[ply]
if queue then
if SERVER and #queue == net.Stream.MaxServerReadStreams then
ErrorNoHalt("Receiving too many ReadStream requests from ", ply)
return
end
else
queue = {} net.Stream.ReadStreamQueues[ply] = queue
end

local numchunks = net.ReadUInt(32)

if numchunks == nil then
return
elseif numchunks == 0 then
local ok, err = xpcall(callback, debug.traceback, "")
if not ok then ErrorNoHalt(err) end
return
end

if SERVER and numchunks > net.Stream.MaxServerChunks then
ErrorNoHalt("ReadStream requests from ", ply, " is too large! ", numchunks * net.Stream.SendSize / 1048576, "MiB")
return
end

local identifier = net.ReadUInt(32)
local compressed = net.ReadBool()
--print("Got info", numchunks, identifier, compressed)

for _, v in ipairs(queue) do
if v.identifier == identifier then
ErrorNoHalt("Tried to start a new ReadStream for an already existing stream!\n" .. debug.traceback() .. "\n")
return
end
end

local stream = {
identifier = identifier,
chunks = {},
compressed = compressed,
numchunks = numchunks,
callback = callback,
queue = queue,
player = ply,
downloads = 0
}

setmetatable(stream, net.Stream.ReadStream)

queue[#queue + 1] = stream
if #queue > 1 then
timer.Create("NetStreamKeepAlive" .. identifier, net.Stream.Timeout / 2, 0, function()
net.Start("NetStreamRequest")
net.WriteUInt(identifier, 32)
net.WriteBit(true)
if CLIENT then net.SendToServer() else net.Send(ply) end
end)
else
stream:Request()
end

return stream
end

if SERVER then

util.AddNetworkString("NetStreamRequest")
util.AddNetworkString("NetStreamDownload")

end

--Stream data is requested
net.Receive("NetStreamRequest", function(len, ply)

local identifier = net.ReadUInt(32)
local stream = net.Stream.WriteStreams[identifier]

if stream then
ply = ply or NULL
local client = stream.clients[ply]

if not client.finished then
local keepalive = net.ReadBit() == 1
if keepalive then
if client.keepalives < net.Stream.MaxKeepalive then
client.keepalives = client.keepalives + 1
timer.Adjust("NetStreamWriteTimeout" .. identifier, net.Stream.Timeout, 1)
end
else
local completed = net.ReadBit() == 1
if completed then
stream:Finished(ply)
else
if client.downloads < net.Stream.MaxTries * #stream.chunks then
client.downloads = client.downloads + 1
stream:Write(ply)
timer.Adjust("NetStreamWriteTimeout" .. identifier, net.Stream.Timeout, 1)
else
client.finished = true
end
end
end
end
end

end)

--Download the stream data
net.Receive("NetStreamDownload", function(len, ply)

ply = ply or NULL
local queue = net.Stream.ReadStreamQueues[ply]
if queue then
local size = net.ReadUInt(32)
if size > 0 then
queue[1]:Read(size)
else
local id = net.ReadUInt(32)
for k, v in ipairs(queue) do
if v.identifier == id then
v:Remove()
break
end
end
end
end

end)
Loading
Loading