Skip to content

WIP: Websocket replacement for Eventstream #20543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
merge in #20544 and remove a lot of unnecessary logging
Signed-off-by: Andrew Thornton <[email protected]>
  • Loading branch information
zeripath committed Jul 30, 2022
commit 4100a72d1f9fc68371c33ed56e1b2c864bbbdefe
2 changes: 2 additions & 0 deletions modules/context/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (r *Response) Before(f func(ResponseWriter)) {
r.befores = append(r.befores, f)
}

// hijackerResponse wraps the Response to allow casting as a Hijacker if the underlying response is a hijacker
type hijackerResponse struct {
*Response
http.Hijacker
Expand All @@ -102,6 +103,7 @@ func NewResponse(resp http.ResponseWriter) ResponseWriter {
befores: make([]func(ResponseWriter), 0),
}
if ok {
// ensure that the Response we return is also hijackable
return hijackerResponse{
Response: response,
Hijacker: hijacker,
Expand Down
84 changes: 50 additions & 34 deletions routers/web/events/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ const (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10

maximumMessageSize = 2048
readMessageChanSize = 20 // <- I've put 20 here because it seems like a reasonable buffer but it may to increase
)

type readMessage struct {
Expand Down Expand Up @@ -89,38 +92,8 @@ func Websocket(ctx *context.Context) {
}
defer unregister()

readChan := make(chan readMessage, 20)
go func() {
defer conn.Close()
conn.SetReadLimit(2048)
if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
log.Error("unable to SetReadDeadline: %v", err)
return
}
conn.SetPongHandler(func(string) error {
if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
log.Error("unable to SetReadDeadline: %v", err)
}
return nil
})

for {
messageType, message, err := conn.ReadMessage()
readChan <- readMessage{
messageType: messageType,
message: message,
err: err,
}
if err != nil {
close(readChan)
return
}
if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
log.Error("unable to SetReadDeadline: %v", err)
return
}
}
}()
readMessageChan := make(chan readMessage, readMessageChanSize)
go readMessagesFromConnToChan(conn, readMessageChan)

pingTicker := time.NewTicker(pingPeriod)

Expand All @@ -139,6 +112,7 @@ func Websocket(ctx *context.Context) {
return
default:
}

if err := conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
log.Error("unable to SetWriteDeadline: %v", err)
return
Expand All @@ -147,10 +121,11 @@ func Websocket(ctx *context.Context) {
log.Error("unable to send PingMessage: %v", err)
return
}
case message, ok := <-readChan:
case message, ok := <-readMessageChan:
if !ok {
break
}

// ensure that we're not already cancelled
select {
case <-notify:
Expand All @@ -159,11 +134,14 @@ func Websocket(ctx *context.Context) {
return
default:
}

// FIXME: HANDLE MESSAGES
log.Info("Got Message: %d:%s:%v", message.messageType, message.message, message.err)
case event, ok := <-eventChan:
if !ok {
break
}

// ensure that we're not already cancelled
select {
case <-notify:
Expand All @@ -172,6 +150,8 @@ func Websocket(ctx *context.Context) {
return
default:
}

// Handle events
if event.Name == "logout" {
if ctx.Session.ID() == event.Data {
event = &eventsource.Event{
Expand All @@ -196,14 +176,50 @@ func Websocket(ctx *context.Context) {
}
}

func readMessagesFromConnToChan(conn *websocket.Conn, messageChan chan readMessage) {
defer func() {
close(messageChan) // Please note: this has to be within a wrapping anonymous func otherwise it will be evaluated when creating the defer
Copy link
Member

@silverwind silverwind Sep 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I didn't know this. Is this evaluation behaviour something specific to the close builtin?

_ = conn.Close()
}()
conn.SetReadLimit(maximumMessageSize)
if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
log.Error("unable to SetReadDeadline: %v", err)
return
}
conn.SetPongHandler(func(string) error {
if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
log.Error("unable to SetReadDeadline: %v", err)
}
return nil
})

for {
messageType, message, err := conn.ReadMessage()
messageChan <- readMessage{
messageType: messageType,
message: message,
err: err,
}
if err != nil {
// don't need to handle the error here as it is passed down the channel
return
}
if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
log.Error("unable to SetReadDeadline: %v", err)
return
}
}
}

func writeEvent(conn *websocket.Conn, event *eventsource.Event) error {
if err := conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
log.Error("unable to SetWriteDeadline: %v", err)
return err
}

w, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
log.Warn("Unable to get writer for websocket %v", err)
log.Error("Unable to get writer for websocket %v", err)
return err
}

Expand Down
21 changes: 5 additions & 16 deletions web_src/js/features/notification.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,13 @@ export function initNotificationsTable() {
}

async function receiveUpdateCount(data, document) {
console.log(data);
try {
console.log(window, document);
const notificationCount = document.querySelector('.notification_count');
if (data.Count > 0) {
notificationCount.classList.remove('hidden');
} else {
notificationCount.classList.add('hidden');
const notificationCounts = document.querySelectorAll('.notification_count');
for (const count of notificationCounts) {
count.classList.toggle('hidden', data.Count === 0);
count.textContent = `${data.Count}`;
}

notificationCount.textContent = `${data.Count}`;
console.log(notificationCount);
const oldDisplay = notificationCount.style.display;
notificationCount.style.display = 'none';
notificationCount.style.display = oldDisplay;
await updateNotificationTable();
} catch (error) {
console.error(error, data);
Expand Down Expand Up @@ -84,10 +76,8 @@ export function initNotificationCount() {
console.error('Unexpected event:', event);
return;
}
console.log(event);
console.log(currentDocument === document);
if (event.data.type === 'notification-count') {
const _promise = receiveUpdateCount(event.data.data, currentDocument).then(console.log('done'));
const _promise = receiveUpdateCount(event.data.data, currentDocument);
} else if (event.data.type === 'error') {
console.error(event.data);
} else if (event.data.type === 'logout') {
Expand All @@ -105,7 +95,6 @@ export function initNotificationCount() {
});
worker.port.close();
}
console.log('done eventlistenter');
});
worker.port.addEventListener('error', (e) => {
console.error(e);
Expand Down