From aedd2a5f730cf558470a9052bbe43a63d3d6ae69 Mon Sep 17 00:00:00 2001 From: Serge Bazanski Date: Mon, 16 Jun 2025 10:27:06 +0200 Subject: [PATCH] add stream module --- README.md | 22 ++++++++ fafomo.go | 156 ++++++++------------------------------------------- presence.go | 135 ++++++++++++++++++++++++++++++++++++++++++++ streaming.go | 45 +++++++++++++++ 4 files changed, 226 insertions(+), 132 deletions(-) create mode 100644 README.md create mode 100644 presence.go create mode 100644 streaming.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..2ac98ee --- /dev/null +++ b/README.md @@ -0,0 +1,22 @@ +fafomo +====== + +The FAFO Matrix Box Amalgam. + +Needs a username/password - ask leah or q3k. Use with `-matrix_username` and `-matrix_password_file` flags. + +Made up of two modules: presence and streaming. Can be run separately on separate machines. + +Presence +-------- + +Spams the presence room (currently, the 'Local' room) with updates from at.lab.fa-fo.de. Needs `-presence_matrix_room` and `-presence_backend_url` to be set. + +Currently running on conway.i.fa-fo.de. + +Stream +------ + +Reads messages posted in the 'Stream' room, and calls `/home/u/speak.sh` on every message with the text to be spoken as an argument. Needs `-stream_matrix_room` to be set (to the _internal_ Matrix room ID). + +Running on demand on the streambox. diff --git a/fafomo.go b/fafomo.go index 1dda2a9..c2780df 100644 --- a/fafomo.go +++ b/fafomo.go @@ -14,11 +14,8 @@ package main import ( "context" - "encoding/json" "errors" "flag" - "fmt" - "net/http" "os" "os/signal" "strings" @@ -26,115 +23,27 @@ import ( "k8s.io/klog" "maunium.net/go/mautrix" - "maunium.net/go/mautrix/event" "maunium.net/go/mautrix/id" ) -var folks map[string]struct{} - -type JSONTop struct { - Users []JSONUser `json:"users"` -} - -type JSONUser struct { - Login string `json:"login"` -} - -func ircquote(s string) string { - if len(s) < 1 { - return s // bad luck, V - } - - ZWJ := "\u200d" - return s[0:1] + ZWJ + s[1:] -} - -func listFmt(list []string) string { - var listQuoted []string - for _, l := range list { - listQuoted = append(listQuoted, ircquote(l)) - } - return "[ " + strings.Join(listQuoted, " ") + " ]" -} - -func update(ctx context.Context) (string, error) { - req, err := http.NewRequestWithContext(ctx, "GET", flagBackendURL, nil) - if err != nil { - return "", err - } - res, err := http.DefaultClient.Do(req) - if err != nil { - return "", err - } - defer res.Body.Close() - - var users JSONTop - - if res.StatusCode != 200 { - return "", fmt.Errorf("unexpected API status code %d", res.StatusCode) - } - if err := json.NewDecoder(res.Body).Decode(&users); err != nil { - return "", fmt.Errorf("could not decode API response: %v", err) - } - - var left []string - var arrived []string - var unchanged []string - - newFolks := make(map[string]struct{}) - for _, user := range users.Users { - newFolks[user.Login] = struct{}{} - if _, ok := folks[user.Login]; !ok { - arrived = append(arrived, user.Login) - } else { - unchanged = append(unchanged, user.Login) - } - } - for user, _ := range folks { - if _, ok := newFolks[user]; !ok { - left = append(left, user) - } - } - - folks = newFolks - - // no change - if len(arrived) == 0 && len(left) == 0 { - return "", nil - } - - var parts []string - if len(arrived) > 0 { - parts = append(parts, fmt.Sprintf("arrived: %s", listFmt(arrived))) - } - if len(left) > 0 { - parts = append(parts, fmt.Sprintf("left: %s", listFmt(left))) - } - if len(unchanged) > 0 { - adjective := "also" - if len(left) > 0 && len(arrived) == 0 { - adjective = "still" - } - parts = append(parts, fmt.Sprintf("%s there: %s", adjective, listFmt(unchanged))) - } - - return strings.Join(parts, "; "), nil -} - var ( flagMatrixHomeserver = "matrix.org" flagMatrixUsername = "@fafomo:matrix.org" flagMatrixPasswordFile = "password.txt" - flagMatrixRoom string - flagBackendURL string + + flagPresenceMatrixRoom string + flagPresenceBackendURL string + + flagStreamMatrixRoom string ) func main() { flag.StringVar(&flagMatrixHomeserver, "matrix_homeserver", flagMatrixHomeserver, "Address of Matrix homeserver") flag.StringVar(&flagMatrixUsername, "matrix_username", flagMatrixUsername, "Matrix login username") flag.StringVar(&flagMatrixPasswordFile, "matrix_password_file", flagMatrixPasswordFile, "Path to file containing matrix login password") - flag.StringVar(&flagMatrixRoom, "matrix_room", flagMatrixRoom, "Matrix room MXID") - flag.StringVar(&flagBackendURL, "backend_url", flagBackendURL, "Checkinator (backend) addresss") + flag.StringVar(&flagPresenceMatrixRoom, "presence_matrix_room", flagPresenceMatrixRoom, "Matrix room MXID") + flag.StringVar(&flagPresenceBackendURL, "presence_backend_url", flagPresenceBackendURL, "Checkinator (backend) addresss") + flag.StringVar(&flagStreamMatrixRoom, "stream_matrix_room", flagStreamMatrixRoom, "Matrix room MXID") flag.Parse() for _, s := range []struct { @@ -144,8 +53,6 @@ func main() { {flagMatrixHomeserver, "matrix_homeserver"}, {flagMatrixUsername, "matrix_username"}, {flagMatrixPasswordFile, "matrix_password_file"}, - {flagMatrixRoom, "matrix_room"}, - {flagBackendURL, "backend_url"}, } { if s.value == "" { klog.Exitf("-%s must be set", s.name) @@ -155,10 +62,6 @@ func main() { startCtx, startCtxC := context.WithTimeout(context.Background(), 20*time.Second) defer startCtxC() - if _, err := update(startCtx); err != nil { - klog.Exitf("Initial update failed: %v", err) - } - client, err := mautrix.NewClient(flagMatrixHomeserver, id.UserID(flagMatrixUsername), "") if err != nil { klog.Exitf("NewClient failed: %v", err) @@ -183,11 +86,6 @@ func main() { client.AccessToken = login.AccessToken - syncer := client.Syncer.(*mautrix.DefaultSyncer) - syncer.OnEventType(event.EventMessage, func(ctx context.Context, evt *event.Event) { - klog.V(1).Infof("Sender: %s, type: %s, id: %s, body: %q", evt.Sender, evt.Type, evt.ID, evt.Content.AsMessage().Body) - }) - klog.Infof("Now running...") ctx, ctxC := signal.NotifyContext(context.Background(), os.Interrupt) @@ -196,37 +94,31 @@ func main() { // Run Matrix sync process in the background. syncDone := make(chan struct{}) go func() { - err = client.SyncWithContext(ctx) + err := client.SyncWithContext(ctx) defer close(syncDone) if err != nil && !errors.Is(err, ctx.Err()) { klog.Exitf("Sync failed: %v", err) } }() - // Update space members every minute. - ticker := time.NewTicker(60 * time.Second) -process: - for { - select { - case <-ticker.C: - ctx, _ := context.WithTimeout(ctx, 5*time.Second) - - message, err := update(ctx) - if err != nil { - klog.Errorf("Update failed: %v", err) - } else if message != "" { - klog.Infof("Sent update: %s\n", message) - _, err := client.SendText(ctx, id.RoomID(flagMatrixRoom), message) - if err != nil { - klog.Errorf("Failed to send event: %v\n", err) - } - } - - case <-ctx.Done(): - break process + if flagPresenceMatrixRoom != "" || flagPresenceBackendURL != "" { + if flagPresenceMatrixRoom == "" || flagPresenceBackendURL == "" { + klog.Exitf("If presence_matrix_room is set, presence_backend_url must be set (and vice versa)") } + klog.Infof("Starting presence module.") + go presence(ctx, client) + } else { + klog.Infof("NOT starting presence module.") } + if flagStreamMatrixRoom != "" { + klog.Infof("Starting streaming module.") + go streaming(ctx, client) + } else { + klog.Infof("NOT starting streaming module.") + } + + <-ctx.Done() klog.Infof("Waiting for graceful sync before exiting...") <-syncDone klog.Infof("Done.") diff --git a/presence.go b/presence.go new file mode 100644 index 0000000..40dc844 --- /dev/null +++ b/presence.go @@ -0,0 +1,135 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "k8s.io/klog" + "maunium.net/go/mautrix" + "maunium.net/go/mautrix/id" +) + +func presence(ctx context.Context, client *mautrix.Client) { + if _, err := update(ctx); err != nil { + klog.Exitf("Initial update failed: %v", err) + } + + // Update space members every minute. + ticker := time.NewTicker(60 * time.Second) +process: + for { + select { + case <-ticker.C: + ctx, _ := context.WithTimeout(ctx, 5*time.Second) + + message, err := update(ctx) + if err != nil { + klog.Errorf("Update failed: %v", err) + } else if message != "" { + klog.Infof("Sent update: %s\n", message) + _, err := client.SendText(ctx, id.RoomID(flagPresenceMatrixRoom), message) + if err != nil { + klog.Errorf("Failed to send event: %v\n", err) + } + } + + case <-ctx.Done(): + break process + } + } +} + +var folks map[string]struct{} + +type JSONTop struct { + Users []JSONUser `json:"users"` +} + +type JSONUser struct { + Login string `json:"login"` +} + +func ircquote(s string) string { + if len(s) < 1 { + return s // bad luck, V + } + + ZWJ := "\u200d" + return s[0:1] + ZWJ + s[1:] +} + +func listFmt(list []string) string { + var listQuoted []string + for _, l := range list { + listQuoted = append(listQuoted, ircquote(l)) + } + return "[ " + strings.Join(listQuoted, " ") + " ]" +} + +func update(ctx context.Context) (string, error) { + req, err := http.NewRequestWithContext(ctx, "GET", flagPresenceBackendURL, nil) + if err != nil { + return "", err + } + res, err := http.DefaultClient.Do(req) + if err != nil { + return "", err + } + defer res.Body.Close() + + var users JSONTop + + if res.StatusCode != 200 { + return "", fmt.Errorf("unexpected API status code %d", res.StatusCode) + } + if err := json.NewDecoder(res.Body).Decode(&users); err != nil { + return "", fmt.Errorf("could not decode API response: %v", err) + } + + var left []string + var arrived []string + var unchanged []string + + newFolks := make(map[string]struct{}) + for _, user := range users.Users { + newFolks[user.Login] = struct{}{} + if _, ok := folks[user.Login]; !ok { + arrived = append(arrived, user.Login) + } else { + unchanged = append(unchanged, user.Login) + } + } + for user, _ := range folks { + if _, ok := newFolks[user]; !ok { + left = append(left, user) + } + } + + folks = newFolks + + // no change + if len(arrived) == 0 && len(left) == 0 { + return "", nil + } + + var parts []string + if len(arrived) > 0 { + parts = append(parts, fmt.Sprintf("arrived: %s", listFmt(arrived))) + } + if len(left) > 0 { + parts = append(parts, fmt.Sprintf("left: %s", listFmt(left))) + } + if len(unchanged) > 0 { + adjective := "also" + if len(left) > 0 && len(arrived) == 0 { + adjective = "still" + } + parts = append(parts, fmt.Sprintf("%s there: %s", adjective, listFmt(unchanged))) + } + + return strings.Join(parts, "; "), nil +} diff --git a/streaming.go b/streaming.go new file mode 100644 index 0000000..c3578ba --- /dev/null +++ b/streaming.go @@ -0,0 +1,45 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/exec" + "time" + + "k8s.io/klog" + "maunium.net/go/mautrix" + "maunium.net/go/mautrix/event" +) + +func streaming(ctx context.Context, client *mautrix.Client) { + start := time.Now() + + var nameLastMentioned time.Time + var previousNameMentioned string + + syncer := client.Syncer.(*mautrix.DefaultSyncer) + syncer.OnEventType(event.EventMessage, func(ctx context.Context, evt *event.Event) { + if evt.Timestamp < start.UnixMilli() { + return + } + if evt.Type == event.EventMessage && evt.RoomID.String() == flagStreamMatrixRoom { + line := evt.Content.AsMessage().Body + + name := evt.Sender.Localpart() + if name != previousNameMentioned || time.Since(nameLastMentioned) > time.Minute { + line = fmt.Sprintf("%s says: %s", name, line) + nameLastMentioned = time.Now() + previousNameMentioned = name + } + + klog.Info(line) + cmd := exec.CommandContext(ctx, "/home/u/speak.sh", line) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Run() + } + }) + + <-ctx.Done() +}