improved how throttling based on the websocket works
This commit is contained in:
		
							parent
							
								
									a8be74d77e
								
							
						
					
					
						commit
						6fa7f50894
					
				
							
								
								
									
										23
									
								
								wstunneld.js
									
									
									
									
									
								
							
							
						
						
									
										23
									
								
								wstunneld.js
									
									
									
									
									
								
							| @ -182,9 +182,18 @@ module.exports.create = function (copts) { | |||||||
| 
 | 
 | ||||||
|       token.pausedConns = []; |       token.pausedConns = []; | ||||||
|       ws._socket.on('drain', function () { |       ws._socket.on('drain', function () { | ||||||
|  |         // the websocket library has it's own buffer apart from node's socket buffer, but that one
 | ||||||
|  |         // is much more difficult to watch, so we watch for the lower level buffer to drain and
 | ||||||
|  |         // then check to see if the upper level buffer is still too full to write to. Note that
 | ||||||
|  |         // the websocket library buffer has something to do with compression, so I'm not requiring
 | ||||||
|  |         // that to be 0 before we start up again.
 | ||||||
|  |         if (ws.bufferedAmount > 128*1024) { | ||||||
|  |           return; | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|         token.pausedConns.forEach(function (conn) { |         token.pausedConns.forEach(function (conn) { | ||||||
|           if (!conn.manualPause) { |           if (!conn.manualPause) { | ||||||
|             console.log('resuming', conn.tunnelCid, 'now that the web socket has caught up'); |             // console.log('resuming', conn.tunnelCid, 'now that the web socket has caught up');
 | ||||||
|             conn.resume(); |             conn.resume(); | ||||||
|           } |           } | ||||||
|         }); |         }); | ||||||
| @ -309,16 +318,18 @@ module.exports.create = function (copts) { | |||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         browserConn.write(opts.data); |         browserConn.write(opts.data); | ||||||
|  |         // tunnelRead is how many bytes we've read from the tunnel, and written to the browser.
 | ||||||
|  |         browserConn.tunnelRead = (browserConn.tunnelRead || 0) + opts.data.byteLength; | ||||||
|         // If we have more than 1MB buffered data we need to tell the other side to slow down.
 |         // If we have more than 1MB buffered data we need to tell the other side to slow down.
 | ||||||
|         // Once we've finished sending what we have we can tell the other side to keep going.
 |         // Once we've finished sending what we have we can tell the other side to keep going.
 | ||||||
|         // If we've already sent the 'pause' message though don't send it again, because we're
 |         // If we've already sent the 'pause' message though don't send it again, because we're
 | ||||||
|         // probably just dealing with data queued before our message got to them.
 |         // probably just dealing with data queued before our message got to them.
 | ||||||
|         if (!browserConn.remotePaused && browserConn.bufferSize > 1024*1024) { |         if (!browserConn.remotePaused && browserConn.bufferSize > 1024*1024) { | ||||||
|           sendTunnelMsg(opts, null, 'pause'); |           sendTunnelMsg(opts, browserConn.tunnelRead, 'pause'); | ||||||
|           browserConn.remotePaused = true; |           browserConn.remotePaused = true; | ||||||
| 
 | 
 | ||||||
|           browserConn.once('drain', function () { |           browserConn.once('drain', function () { | ||||||
|             sendTunnelMsg(opts, null, 'resume'); |             sendTunnelMsg(opts, browserConn.tunnelRead, 'resume'); | ||||||
|             browserConn.remotePaused = false; |             browserConn.remotePaused = false; | ||||||
|           }); |           }); | ||||||
|         } |         } | ||||||
| @ -433,7 +444,7 @@ module.exports.create = function (copts) { | |||||||
|     console.log('[pipeWs] browser is', cid, 'home-cloud is', packer.socketToId(remote.ws.upgradeReq.socket)); |     console.log('[pipeWs] browser is', cid, 'home-cloud is', packer.socketToId(remote.ws.upgradeReq.socket)); | ||||||
| 
 | 
 | ||||||
|     function sendWs(data, serviceOverride) { |     function sendWs(data, serviceOverride) { | ||||||
|       if (remote.ws && !conn.tunnelClosing) { |       if (remote.ws && (!conn.tunnelClosing || serviceOverride)) { | ||||||
|         try { |         try { | ||||||
|           remote.ws.send(packer.pack(browserAddr, data, serviceOverride), { binary: true }); |           remote.ws.send(packer.pack(browserAddr, data, serviceOverride), { binary: true }); | ||||||
|           // If we can't send data over the websocket as fast as this connection can send it to us
 |           // If we can't send data over the websocket as fast as this connection can send it to us
 | ||||||
| @ -442,8 +453,8 @@ module.exports.create = function (copts) { | |||||||
|           // to make things more fair so a connection doesn't get stuck waiting for everyone else
 |           // to make things more fair so a connection doesn't get stuck waiting for everyone else
 | ||||||
|           // to finish because it got caught on the boundary. Also if serviceOverride is set it
 |           // to finish because it got caught on the boundary. Also if serviceOverride is set it
 | ||||||
|           // means the connection is over, so no need to pause it.
 |           // means the connection is over, so no need to pause it.
 | ||||||
|           if (!serviceOverride && (remote.pausedConns.length || remote.ws._socket.bufferSize > 1024*1024)) { |           if (!serviceOverride && (remote.pausedConns.length || remote.ws.bufferedAmount > 1024*1024)) { | ||||||
|             console.log('pausing', cid, 'to allow web socket to catch up'); |             // console.log('pausing', cid, 'to allow web socket to catch up');
 | ||||||
|             conn.pause(); |             conn.pause(); | ||||||
|             remote.pausedConns.push(conn); |             remote.pausedConns.push(conn); | ||||||
|           } |           } | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user