add stream module

This commit is contained in:
Serge Bazanski 2025-06-16 10:27:06 +02:00
parent 992573a375
commit aedd2a5f73
4 changed files with 226 additions and 132 deletions

22
README.md Normal file
View file

@ -0,0 +1,22 @@
fafomo
======
The FAFO Matrix Box Amalgam.
Needs a username/password - ask leah or q3k. Use with `-matrix_username` and `-matrix_password_file` flags.
Made up of two modules: presence and streaming. Can be run separately on separate machines.
Presence
--------
Spams the presence room (currently, the 'Local' room) with updates from at.lab.fa-fo.de. Needs `-presence_matrix_room` and `-presence_backend_url` to be set.
Currently running on conway.i.fa-fo.de.
Stream
------
Reads messages posted in the 'Stream' room, and calls `/home/u/speak.sh` on every message with the text to be spoken as an argument. Needs `-stream_matrix_room` to be set (to the _internal_ Matrix room ID).
Running on demand on the streambox.

156
fafomo.go
View file

@ -14,11 +14,8 @@ package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"strings"
@ -26,115 +23,27 @@ import (
"k8s.io/klog"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
)
var folks map[string]struct{}
type JSONTop struct {
Users []JSONUser `json:"users"`
}
type JSONUser struct {
Login string `json:"login"`
}
func ircquote(s string) string {
if len(s) < 1 {
return s // bad luck, V
}
ZWJ := "\u200d"
return s[0:1] + ZWJ + s[1:]
}
func listFmt(list []string) string {
var listQuoted []string
for _, l := range list {
listQuoted = append(listQuoted, ircquote(l))
}
return "[ " + strings.Join(listQuoted, " ") + " ]"
}
func update(ctx context.Context) (string, error) {
req, err := http.NewRequestWithContext(ctx, "GET", flagBackendURL, nil)
if err != nil {
return "", err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer res.Body.Close()
var users JSONTop
if res.StatusCode != 200 {
return "", fmt.Errorf("unexpected API status code %d", res.StatusCode)
}
if err := json.NewDecoder(res.Body).Decode(&users); err != nil {
return "", fmt.Errorf("could not decode API response: %v", err)
}
var left []string
var arrived []string
var unchanged []string
newFolks := make(map[string]struct{})
for _, user := range users.Users {
newFolks[user.Login] = struct{}{}
if _, ok := folks[user.Login]; !ok {
arrived = append(arrived, user.Login)
} else {
unchanged = append(unchanged, user.Login)
}
}
for user, _ := range folks {
if _, ok := newFolks[user]; !ok {
left = append(left, user)
}
}
folks = newFolks
// no change
if len(arrived) == 0 && len(left) == 0 {
return "", nil
}
var parts []string
if len(arrived) > 0 {
parts = append(parts, fmt.Sprintf("arrived: %s", listFmt(arrived)))
}
if len(left) > 0 {
parts = append(parts, fmt.Sprintf("left: %s", listFmt(left)))
}
if len(unchanged) > 0 {
adjective := "also"
if len(left) > 0 && len(arrived) == 0 {
adjective = "still"
}
parts = append(parts, fmt.Sprintf("%s there: %s", adjective, listFmt(unchanged)))
}
return strings.Join(parts, "; "), nil
}
var (
flagMatrixHomeserver = "matrix.org"
flagMatrixUsername = "@fafomo:matrix.org"
flagMatrixPasswordFile = "password.txt"
flagMatrixRoom string
flagBackendURL string
flagPresenceMatrixRoom string
flagPresenceBackendURL string
flagStreamMatrixRoom string
)
func main() {
flag.StringVar(&flagMatrixHomeserver, "matrix_homeserver", flagMatrixHomeserver, "Address of Matrix homeserver")
flag.StringVar(&flagMatrixUsername, "matrix_username", flagMatrixUsername, "Matrix login username")
flag.StringVar(&flagMatrixPasswordFile, "matrix_password_file", flagMatrixPasswordFile, "Path to file containing matrix login password")
flag.StringVar(&flagMatrixRoom, "matrix_room", flagMatrixRoom, "Matrix room MXID")
flag.StringVar(&flagBackendURL, "backend_url", flagBackendURL, "Checkinator (backend) addresss")
flag.StringVar(&flagPresenceMatrixRoom, "presence_matrix_room", flagPresenceMatrixRoom, "Matrix room MXID")
flag.StringVar(&flagPresenceBackendURL, "presence_backend_url", flagPresenceBackendURL, "Checkinator (backend) addresss")
flag.StringVar(&flagStreamMatrixRoom, "stream_matrix_room", flagStreamMatrixRoom, "Matrix room MXID")
flag.Parse()
for _, s := range []struct {
@ -144,8 +53,6 @@ func main() {
{flagMatrixHomeserver, "matrix_homeserver"},
{flagMatrixUsername, "matrix_username"},
{flagMatrixPasswordFile, "matrix_password_file"},
{flagMatrixRoom, "matrix_room"},
{flagBackendURL, "backend_url"},
} {
if s.value == "" {
klog.Exitf("-%s must be set", s.name)
@ -155,10 +62,6 @@ func main() {
startCtx, startCtxC := context.WithTimeout(context.Background(), 20*time.Second)
defer startCtxC()
if _, err := update(startCtx); err != nil {
klog.Exitf("Initial update failed: %v", err)
}
client, err := mautrix.NewClient(flagMatrixHomeserver, id.UserID(flagMatrixUsername), "")
if err != nil {
klog.Exitf("NewClient failed: %v", err)
@ -183,11 +86,6 @@ func main() {
client.AccessToken = login.AccessToken
syncer := client.Syncer.(*mautrix.DefaultSyncer)
syncer.OnEventType(event.EventMessage, func(ctx context.Context, evt *event.Event) {
klog.V(1).Infof("Sender: %s, type: %s, id: %s, body: %q", evt.Sender, evt.Type, evt.ID, evt.Content.AsMessage().Body)
})
klog.Infof("Now running...")
ctx, ctxC := signal.NotifyContext(context.Background(), os.Interrupt)
@ -196,37 +94,31 @@ func main() {
// Run Matrix sync process in the background.
syncDone := make(chan struct{})
go func() {
err = client.SyncWithContext(ctx)
err := client.SyncWithContext(ctx)
defer close(syncDone)
if err != nil && !errors.Is(err, ctx.Err()) {
klog.Exitf("Sync failed: %v", err)
}
}()
// Update space members every minute.
ticker := time.NewTicker(60 * time.Second)
process:
for {
select {
case <-ticker.C:
ctx, _ := context.WithTimeout(ctx, 5*time.Second)
message, err := update(ctx)
if err != nil {
klog.Errorf("Update failed: %v", err)
} else if message != "" {
klog.Infof("Sent update: %s\n", message)
_, err := client.SendText(ctx, id.RoomID(flagMatrixRoom), message)
if err != nil {
klog.Errorf("Failed to send event: %v\n", err)
}
}
case <-ctx.Done():
break process
if flagPresenceMatrixRoom != "" || flagPresenceBackendURL != "" {
if flagPresenceMatrixRoom == "" || flagPresenceBackendURL == "" {
klog.Exitf("If presence_matrix_room is set, presence_backend_url must be set (and vice versa)")
}
klog.Infof("Starting presence module.")
go presence(ctx, client)
} else {
klog.Infof("NOT starting presence module.")
}
if flagStreamMatrixRoom != "" {
klog.Infof("Starting streaming module.")
go streaming(ctx, client)
} else {
klog.Infof("NOT starting streaming module.")
}
<-ctx.Done()
klog.Infof("Waiting for graceful sync before exiting...")
<-syncDone
klog.Infof("Done.")

135
presence.go Normal file
View file

@ -0,0 +1,135 @@
package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
"k8s.io/klog"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/id"
)
func presence(ctx context.Context, client *mautrix.Client) {
if _, err := update(ctx); err != nil {
klog.Exitf("Initial update failed: %v", err)
}
// Update space members every minute.
ticker := time.NewTicker(60 * time.Second)
process:
for {
select {
case <-ticker.C:
ctx, _ := context.WithTimeout(ctx, 5*time.Second)
message, err := update(ctx)
if err != nil {
klog.Errorf("Update failed: %v", err)
} else if message != "" {
klog.Infof("Sent update: %s\n", message)
_, err := client.SendText(ctx, id.RoomID(flagPresenceMatrixRoom), message)
if err != nil {
klog.Errorf("Failed to send event: %v\n", err)
}
}
case <-ctx.Done():
break process
}
}
}
var folks map[string]struct{}
type JSONTop struct {
Users []JSONUser `json:"users"`
}
type JSONUser struct {
Login string `json:"login"`
}
func ircquote(s string) string {
if len(s) < 1 {
return s // bad luck, V
}
ZWJ := "\u200d"
return s[0:1] + ZWJ + s[1:]
}
func listFmt(list []string) string {
var listQuoted []string
for _, l := range list {
listQuoted = append(listQuoted, ircquote(l))
}
return "[ " + strings.Join(listQuoted, " ") + " ]"
}
func update(ctx context.Context) (string, error) {
req, err := http.NewRequestWithContext(ctx, "GET", flagPresenceBackendURL, nil)
if err != nil {
return "", err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer res.Body.Close()
var users JSONTop
if res.StatusCode != 200 {
return "", fmt.Errorf("unexpected API status code %d", res.StatusCode)
}
if err := json.NewDecoder(res.Body).Decode(&users); err != nil {
return "", fmt.Errorf("could not decode API response: %v", err)
}
var left []string
var arrived []string
var unchanged []string
newFolks := make(map[string]struct{})
for _, user := range users.Users {
newFolks[user.Login] = struct{}{}
if _, ok := folks[user.Login]; !ok {
arrived = append(arrived, user.Login)
} else {
unchanged = append(unchanged, user.Login)
}
}
for user, _ := range folks {
if _, ok := newFolks[user]; !ok {
left = append(left, user)
}
}
folks = newFolks
// no change
if len(arrived) == 0 && len(left) == 0 {
return "", nil
}
var parts []string
if len(arrived) > 0 {
parts = append(parts, fmt.Sprintf("arrived: %s", listFmt(arrived)))
}
if len(left) > 0 {
parts = append(parts, fmt.Sprintf("left: %s", listFmt(left)))
}
if len(unchanged) > 0 {
adjective := "also"
if len(left) > 0 && len(arrived) == 0 {
adjective = "still"
}
parts = append(parts, fmt.Sprintf("%s there: %s", adjective, listFmt(unchanged)))
}
return strings.Join(parts, "; "), nil
}

45
streaming.go Normal file
View file

@ -0,0 +1,45 @@
package main
import (
"context"
"fmt"
"os"
"os/exec"
"time"
"k8s.io/klog"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/event"
)
func streaming(ctx context.Context, client *mautrix.Client) {
start := time.Now()
var nameLastMentioned time.Time
var previousNameMentioned string
syncer := client.Syncer.(*mautrix.DefaultSyncer)
syncer.OnEventType(event.EventMessage, func(ctx context.Context, evt *event.Event) {
if evt.Timestamp < start.UnixMilli() {
return
}
if evt.Type == event.EventMessage && evt.RoomID.String() == flagStreamMatrixRoom {
line := evt.Content.AsMessage().Body
name := evt.Sender.Localpart()
if name != previousNameMentioned || time.Since(nameLastMentioned) > time.Minute {
line = fmt.Sprintf("%s says: %s", name, line)
nameLastMentioned = time.Now()
previousNameMentioned = name
}
klog.Info(line)
cmd := exec.CommandContext(ctx, "/home/u/speak.sh", line)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Run()
}
})
<-ctx.Done()
}