GoLang Channels und SSE Connections?

1 Antwort

Vom Fragesteller als hilfreich ausgezeichnet

Fangen wir am Anfang an.

132: main()
135: router.HandleFunc("/sse/", sseHandler)

hier wird ein HTTP Server erstellt, und zu dem Pfad /sse/ die Funktion sseHandler registriert. Wir müssen uns also "sseHandler" ansehen

102: sseHandler

hier haben wir einen Standard HTTP Request Handler.

  • Es wird eine Client ID aus dem Pfad extrahiert
  • Zu unserer "SSE Con", die wir uns später genauer anschauen, wird ein Client hinzugefügt
  • In einer For Loop (line 118) und einem Select statement wird gewartet bis 1. eine neue Nachricht an den Client durch den Channel gesendet wird oder 2. die Verbindung geschlossen wird. Hier haben wir quasi schon die Antwort wo eine "message" aus dem Go channel an den Client gelangt. Der channel wird von "104: ch := sseConn.addClient(id)" zurück gegeben, also schauen wir uns diese Methode an.

noch mal die for loop und das select statement aus sseHandler beginnend in Zeile 118 (w ist der http.ResponseWriter, womit wir über die HTTP Verbindung Daten senden können).

for {
		select {
		case message := <-*ch:
			fmt.Println("case message... sending message")
			fmt.Println(message)
			fmt.Fprintf(w, message)
			flusher.Flush()
		case <-r.Context().Done():
			fmt.Println("Client closed connection")
			return
		}
	}
21: (p *SSEConn) addClient(id string) *chan string

hier wird die addClient Methode auf den Typ *SSEConn implementiert. Also eine Methode um einer *SSEConn einen Client hinzuzufügen

  • der Mutex wird gelockt, um zu vermeiden, dass irgendwelche Operationen in verschiedenen Threads sich behindern können
  • nach einem defer mit etwas logging und um sicher zu stellen, dass der mutex wieder unlocked wird…
  • … finden wir heraus, dass in Zeile 32 auf die "clients" map zugegriffen wird, die übergebene ID wird als key genutzt, und wenn kein (Slice mit) Channel(s) existiert, wird ein neuer erstellt
  • Hier habe ich gesehen, dass ich selber einen Fehler gemacht habe, die map bezieht sich gar nicht auf einen einzelnen Channel sondern einen Slice mit Channels! Von der Methode wird allerdings ein Pointer auf den neuen, gerade erstellte Channel zurück gegeben.
Zwischenergebnis

Jetzt haben wir so ziemlich das Geheimnis gelüftet, wie man ja schon an dem Typen SSEConn (line 12) erkannt hat bestimmt eine "SSEConn" aus einem Mutex zur Synchronisation und einer Map aus IDs und mehreren Channel. Aber jetzt wissen wir auch wie diese genutzt werden. Wie wir oben in SSE Handler gesehen haben, werden dort die Messages aus einem channel zu der offenen HTTP Verbindung an den Client gesendet, bis diese geschlossen wird. Jetzt liegt es natürlich nahe, dass es eine Methode gibt um an alle Channel zu einer ID eine Nachricht zu senden.

Zurück zu SSE Handler, in Zeile 111:  sseConn.removeClient(id, *ch) finden wir wie zu erwarten auch eine Methode um einen Channel wieder einer ID zu entfernen, sobald die HTTP Verbindung geschlossen wird. Ich denke die remove Client Methode ist uns egal, wir können uns ja sehr gut erschließen was sie macht (den channel entfernen und eventuell cleanup falls kein channel mehr existiert).

Schauen wir uns also an wo Nachrichten an Clients gesendet werden. Wir erinnern uns, es gab auch einen zweiten HTTP Handler in Main. Zurück zur Main. Hier schauen wir uns noch mal den Handler zu /time/ "getTime" an

94: getTime

wie auch in SSE Handler ein Standard HTTP Request Handler.

  • Auch hier entnehmen wir wieder eine ID aus der URL.
  • Als nächstes nehmen wir die aktuelle zeit und speichern sie in "msg"
  • Und wie zu erwarten wird hier eine Nachricht gesendet, über die Broadcast Methode der SSE Conn: nämlich die aktuelle Zeit. Also schauen wir uns zuletzt die Broadcast Methode an.
78: (p *SSEConn) broadcast(id string, data, event string)
  • wieder wird unser Mutex gelockt und in einem Defer wieder unlocked
  • Klar, wir nehmen die ID um den Slice an Channel zu finden, mit etwas error handling falls es zu der ID keine Channel gibt. In dem Fall passiert einfach nichts (return in Zeile 84).
  • Ansonsten loopen wir über alle Channel und senden an jede Verbindung (also an jeden Channel) die msg, also in diesem konkreten Fall die Zeit aus "getTime"
  • Wie wir schon oben gesehen haben, wartet in Zeile 120 innerhalb des sseHandler auf neue Nachrichten, und sendet sie in der zu /sse/ aufgebauten http Verbindung an den jeweiligen Client zurück. Da sseHandler ja eine for loop offen häft, bis die Verbindung beendet wird, wird der Client dann über die HTTP Verbindung die neue Nachricht empfangen.
Fazit
  • Alle guten Dinge sind drei: in der Loop/dem Select aus sseHandler wird die Nachricht aus jedem Channel zurück über die Verbindung aus dem Client gesendet :)
  • Es gibt eine ID die über die URL an den Server gesendet wird, zu jeder ID kann es beliebig viele Verbindungen geben und an jede ID können Nachrichten gebroadcastet werden
  • Jeder Channel stellt eine offene HTTP Verbindung dar
  • Das ist jetzt kein Code Review, ich habe auch nicht nach Fehlern gesucht oder mir überhaupt überleget ob das alles so funktioniert, eventuell könnte man aber einen Buffer einbauen, vermutlich wird ein Client die Verbindung so lange wie möglich offen halten und dann eine neue Verbindung aufbauen, es gibt also einen kurzen Moment wo er eventuell keine Nachrichten empfängt. Aus dem Kopf raus gibt es ja auch keinen Indicator ob überhaupt eine Nachricht über einen Broadcast gesendet wurde, also auch Code der die SSE Conn nutzt könnte dies vermutlich nicht implementieren. Etwas doof.
  • Ich stelle fest, dass ich nicht nur die Frage beantwortet habe, sondern den gesamten Code erklärt. Ich bin einfach gut.
Inkognito-Nutzer   08.04.2024, 20:10

Du bist ein Schatz. Hab ein bisschen weiter geguckt und das was ich komplett übersehen hab war der For-Loop, dadurch hab ich überhaupt nicht verstanden wie der http handler und somit der user an sich weiter mit den channeln interagieren solle - wäre ja dann direkt geschlossen wenn ich mich nicht täusche.

Könntest du mir noch erklären wieso man die SEEConn Instanz die über NewSEEConn erstellt wird nur als Pointer an die Funktionen gibt und nicht einfach "normal", sprich ohne das "&" und somit nicht als Pointer?

0
BeamerBen  08.04.2024, 20:15
@Inkognito-Fragesteller

Naja dann würde man die Instanz ja kopieren, das würde auch funktionieren weil innerhalb der NewSSEConn ja nie wieder was mit der erstellten Instanz passiert, aber wäre ineffizienter.

Und ja, wenn ich das richtig im Kopf habe wäre ohne die For Loop die Request direkt nach durchlauf der Funktion "beantwortet" und der Client würde nicht weiter auf Messages warten.

0
Inkognito-Nutzer   08.04.2024, 20:20
@Inkognito-Fragesteller

Oder wieso generell vieles über Pointer gemacht wird, auch zB der Channel der im http Handler an die removeClient Funktion übergeben wird...

0
Inkognito-Nutzer   08.04.2024, 20:21
@BeamerBen

Ah die Antwort hat gar nicht geladen, kannst du mir vielleicht noch die Frage aus dem anderem Kommentar beantworten? Bzw den Nachschub zur oberen Frage. Danke für deine Antworten bisher, wirklich sehr hilfreich um das Ganze nen bisschen besser zu verstehen

0
BeamerBen  08.04.2024, 23:37
@Inkognito-Fragesteller

Erst mal ganz wichtig, in Go wird alles pass by value gemacht. Also der Wert wird immer kopiert.

Bei Structs und ähnlichen Daten Typen gibt es zwei gute Gründe warum man das lieber als Pointer macht.

  1. verhinderst du, dass Daten kopiert werden müssen, das ist gerade bei größeren Daten wichtig weil ein Pointer dereferenzieren einfach "günstiger" ist als viele Daten zu kopieren.
  2. du erlaubst, dass eine Instanz eines Structs oder auch anderen Wertes von mehreren Stellen modifiziert werden kann.

Anders ist es aber bei Datentypen die selber quasi eine Referenz sind. Also Functions, Pointern, Interfaces, Maps, Slices und auch Channel. Diese haben selber nur Referenzen auf Daten und sind quasi auch nicht wirklich anders als ein Pointer. Das heißt auch, dass hier das dereferenzieren nicht wirklich günstiger wäre als die Daten zu kopieren. Hier war ich mir selber auch nicht sicher, und ich habe noch ma lauf Stack Overflow nachgeforscht, in der Praxis gibt es da wohl in der Regel keinen Grund einen Channel als Pointer zu passen. https://stackoverflow.com/questions/44351159/using-pointer-to-channel#44351176

Ich hab dazu auch mal ChatGPT gefragt, das hat ein paar Vermutungen geäußert die ich nicht so relevant finde, am ehesten noch dass man ja vielleicht an mehreren Stellen eine Referenz auf einen bestimmten Channel haben könnte den man austauschen möchte, aber dass es in der Praxis eigentlich eher unüblich ist (und ich bin selber auch nicht sicher wie gut das funktionieren würde). In diesem Code Beispiel was du gezeigt hast sehe ich tatsächlich keinen Grund einen Channel als Pointer zu returnen, also entweder ist das tatsächlich bad practice oder es gibt einen Grund der mein Wissen übersteigt.

1