Compare commits

...

7 Commits

24 changed files with 577 additions and 140 deletions

View File

@ -1,60 +0,0 @@
# Equivalent Makefile for Taskfile.yaml
.PHONY: ftl fmt tidy lint run-consumer run-feedgen-az run-api run-manager generate-env
# Default value for ARGS if not provided on the command line
ARGS ?=
# Runs fmt, tidy, and lint sequentially
ftl:
$(MAKE) fmt
$(MAKE) tidy
$(MAKE) lint
# Format Go code
fmt:
gofmt -w -d .
# Tidy Go modules
tidy:
go mod tidy
# Run golangci-lint
lint:
golangci-lint run
# Run the consumer application, loading environment from dotenv files
run-consumer:
set -a; \
. config/app/.consumer.env; \
. config/app/.mongodb.env; \
set +a; \
go run cmd/consumer/main.go $(ARGS)
# Run the feedgen-az application, loading environment from dotenv files
run-feedgen-az:
set -a; \
. config/app/feedgen/.az.env; \
. config/app/.mongodb.env; \
set +a; \
go run cmd/feedgen/az/main.go $(ARGS)
# Run the api application, loading environment from dotenv files
run-api:
set -a; \
. config/app/.api.env; \
. config/app/.mongodb.env; \
set +a; \
go run cmd/api/main.go
# Run the manager application with arguments (no dotenv)
run-manager:
go run cmd/manager/main.go $(ARGS)
# Generate env files from templates
generate-env:
cp config/app/consumer.env.example config/app/.consumer.env
cp config/app/api.env.example config/app/.api.env
cp config/app/mongodb.env.example config/app/.mongodb.env
cp config/app/feedgen/az.env.example config/app/feedgen/.az.env
cp config/mongodb/env.example config/mongodb/.env

View File

@ -2,6 +2,9 @@
version: "3"
vars:
DOCKER_REGISTRY: "git.aykhans.me/bsky/"
tasks:
ftl:
cmds:
@ -16,19 +19,19 @@ tasks:
lint: golangci-lint run
run-consumer:
cmd: go run cmd/consumer/main.go {{.CLI_ARGS}}
cmd: go run ./cmd/consumer {{.CLI_ARGS}}
dotenv:
- config/app/.consumer.env
- config/app/.mongodb.env
run-feedgen-az:
cmd: go run cmd/feedgen/az/main.go {{.CLI_ARGS}}
cmd: go run ./cmd/feedgen/az {{.CLI_ARGS}}
dotenv:
- config/app/feedgen/.az.env
- config/app/.mongodb.env
run-api:
cmd: go run cmd/api/main.go
cmd: go run ./cmd/api {{.CLI_ARGS}}
dotenv:
- config/app/.api.env
- config/app/.mongodb.env
@ -55,21 +58,55 @@ tasks:
docker-publish-api:
desc: Publish docker image for api service
vars:
GO_VERSION_FILE: ./cmd/api/version.go
IMAGE_NAME: feedgen-api
VERSION:
sh: grep -o 'const version = "[^"]*"' {{.GO_VERSION_FILE}} | grep -o '"[^"]*"' | tr -d '"'
VERSIONED_IMAGE: "{{.DOCKER_REGISTRY}}{{.IMAGE_NAME}}:{{.VERSION}}"
LATEST_IMAGE: "{{.DOCKER_REGISTRY}}{{.IMAGE_NAME}}:latest"
preconditions:
- test -f {{.GO_VERSION_FILE}}
- sh: '[ -n "{{.VERSION}}" ]'
msg: "Could not extract version from {{.GO_FILE}}"
cmds:
- docker build -t git.aykhans.me/bsky/feedgen-api:latest -f ./cmd/api/Dockerfile .
- docker push git.aykhans.me/bsky/feedgen-api:latest
- docker build -t {{.VERSIONED_IMAGE}} -f ./cmd/api/Dockerfile .
- docker tag {{.VERSIONED_IMAGE}} {{.LATEST_IMAGE}}
- docker push {{.VERSIONED_IMAGE}}
- docker push {{.LATEST_IMAGE}}
- echo "Published {{.VERSIONED_IMAGE}} and {{.LATEST_IMAGE}}"
docker-publish-consumer:
desc: Publish docker image for consumer service
vars:
GO_VERSION_FILE: ./cmd/consumer/version.go
IMAGE_NAME: feedgen-consumer
VERSION:
sh: grep -o 'const version = "[^"]*"' {{.GO_VERSION_FILE}} | grep -o '"[^"]*"' | tr -d '"'
VERSIONED_IMAGE: "{{.DOCKER_REGISTRY}}{{.IMAGE_NAME}}:{{.VERSION}}"
LATEST_IMAGE: "{{.DOCKER_REGISTRY}}{{.IMAGE_NAME}}:latest"
cmds:
- docker build -t git.aykhans.me/bsky/feedgen-consumer:latest -f ./cmd/consumer/Dockerfile .
- docker push git.aykhans.me/bsky/feedgen-consumer:latest
- docker build -t {{.VERSIONED_IMAGE}} -f ./cmd/consumer/Dockerfile .
- docker push {{.VERSIONED_IMAGE}}
- docker tag {{.VERSIONED_IMAGE}} {{.LATEST_IMAGE}}
- docker push {{.LATEST_IMAGE}}
- echo "Published {{.VERSIONED_IMAGE}} and {{.LATEST_IMAGE}}"
docker-publish-feedgen-az:
desc: Publish docker image for feedgen-az service
vars:
GO_VERSION_FILE: ./cmd/feedgen/az/version.go
IMAGE_NAME: feedgen-generator-az
VERSION:
sh: grep -o 'const version = "[^"]*"' {{.GO_VERSION_FILE}} | grep -o '"[^"]*"' | tr -d '"'
VERSIONED_IMAGE: "{{.DOCKER_REGISTRY}}{{.IMAGE_NAME}}:{{.VERSION}}"
LATEST_IMAGE: "{{.DOCKER_REGISTRY}}{{.IMAGE_NAME}}:latest"
cmds:
- docker build -t git.aykhans.me/bsky/feedgen-generator-az:latest -f ./cmd/feedgen/az/Dockerfile .
- docker push git.aykhans.me/bsky/feedgen-generator-az:latest
- docker build -t {{.VERSIONED_IMAGE}} -f ./cmd/feedgen/az/Dockerfile .
- docker push {{.VERSIONED_IMAGE}}
- docker tag {{.VERSIONED_IMAGE}} {{.LATEST_IMAGE}}
- docker push {{.LATEST_IMAGE}}
- echo "Published {{.VERSIONED_IMAGE}} and {{.LATEST_IMAGE}}"
docker-publish-manager:
desc: Publish docker image for manager service

View File

@ -6,7 +6,7 @@ COPY go.mod go.sum ./
COPY ../../pkg ./pkg
COPY ../../cmd/api ./cmd/api
RUN CGO_ENABLED=0 go build -ldflags "-s -w" -o api ./cmd/api/main.go
RUN CGO_ENABLED=0 go build -ldflags "-s -w" -o api ./cmd/api
FROM gcr.io/distroless/static-debian12:latest

View File

@ -2,8 +2,11 @@ package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"github.com/aykhans/bsky-feedgen/pkg/api"
@ -15,11 +18,21 @@ import (
_ "go.uber.org/automaxprocs"
)
type flags struct {
version bool
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go listenForTermination(func() { cancel() })
flags := getFlags()
if flags.version == true {
fmt.Printf("API version: %v\n", version)
os.Exit(0)
}
apiConfig, errMap := config.NewAPIConfig()
if errMap != nil {
logger.Log.Error("API ENV error", "error", errMap.ToStringMap())
@ -59,3 +72,33 @@ func listenForTermination(do func()) {
<-sigChan
do()
}
func getFlags() *flags {
flags := &flags{}
flag.Usage = func() {
fmt.Println(
`Usage:
consumer [flags]
Flags:
-version version information
-h, -help Display this help message`)
}
flag.BoolVar(&flags.version, "version", false, "print version information")
flag.Parse()
if args := flag.Args(); len(args) > 0 {
if len(args) == 1 {
fmt.Printf("unexpected argument: %s\n\n", args[0])
} else {
fmt.Printf("unexpected arguments: %v\n\n", strings.Join(args, ", "))
}
flag.CommandLine.Usage()
os.Exit(1)
}
return flags
}

3
cmd/api/version.go Normal file
View File

@ -0,0 +1,3 @@
package main
const version = "0.2.0"

View File

@ -6,7 +6,7 @@ COPY go.mod go.sum ./
COPY ../../pkg ./pkg
COPY ../../cmd/consumer ./cmd/consumer
RUN CGO_ENABLED=0 go build -ldflags "-s -w" -o consumer ./cmd/consumer/main.go
RUN CGO_ENABLED=0 go build -ldflags "-s -w" -o consumer ./cmd/consumer
FROM gcr.io/distroless/static-debian12:latest

View File

@ -20,42 +20,24 @@ import (
_ "go.uber.org/automaxprocs"
)
type flags struct {
version bool
cursorOption types.ConsumerCursor
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go listenForTermination(func() { cancel() })
flag.Usage = func() {
fmt.Println(
`Usage:
consumer [flags]
Flags:
-h, -help Display this help message
-cursor string Specify the starting point for data consumption (default: last-consumed)
Options:
last-consumed: Resume from the last processed data in storage
first-stream: Start from the beginning of the firehose
current-stream: Start from the current position in the firehose stream`)
flags := getFlags()
if flags.version == true {
fmt.Printf("Consumer version: %v\n", version)
os.Exit(0)
}
var cursorOption types.ConsumerCursor
flag.Var(&cursorOption, "cursor", "")
flag.Parse()
if args := flag.Args(); len(args) > 0 {
if len(args) == 1 {
fmt.Printf("unexpected argument: %s\n\n", args[0])
} else {
fmt.Printf("unexpected arguments: %v\n\n", strings.Join(args, ", "))
}
flag.CommandLine.Usage()
os.Exit(1)
}
if cursorOption == "" {
_ = cursorOption.Set("")
if flags.cursorOption == "" {
_ = flags.cursorOption.Set("")
}
consumerConfig, errMap := config.NewConsumerConfig()
@ -89,7 +71,7 @@ Flags:
ctx,
postCollection,
"wss://bsky.network",
cursorOption,
flags.cursorOption,
consumerConfig.PostMaxDate, // Save only posts created before PostMaxDate
10*time.Second, // Save consumed data to MongoDB every 10 seconds
)
@ -121,3 +103,39 @@ func listenForTermination(do func()) {
<-sigChan
do()
}
func getFlags() *flags {
flags := &flags{}
flag.Usage = func() {
fmt.Println(
`Usage:
consumer [flags]
Flags:
-version version information
-h, -help Display this help message
-cursor string Specify the starting point for data consumption (default: last-consumed)
Options:
last-consumed: Resume from the last processed data in storage
first-stream: Start from the beginning of the firehose
current-stream: Start from the current position in the firehose stream`)
}
flag.BoolVar(&flags.version, "version", false, "print version information")
flag.Var(&flags.cursorOption, "cursor", "Specify the starting point for data consumption")
flag.Parse()
if args := flag.Args(); len(args) > 0 {
if len(args) == 1 {
fmt.Printf("unexpected argument: %s\n\n", args[0])
} else {
fmt.Printf("unexpected arguments: %v\n\n", strings.Join(args, ", "))
}
flag.CommandLine.Usage()
os.Exit(1)
}
return flags
}

3
cmd/consumer/version.go Normal file
View File

@ -0,0 +1,3 @@
package main
const version = "0.1.0"

View File

@ -6,7 +6,7 @@ COPY go.mod go.sum ./
COPY ../../pkg ./pkg
COPY ../../cmd/feedgen/az ./cmd/feedgen/az
RUN CGO_ENABLED=0 go build -ldflags "-s -w" -o feedgen ./cmd/feedgen/az/main.go
RUN CGO_ENABLED=0 go build -ldflags "-s -w" -o feedgen ./cmd/feedgen/az
FROM gcr.io/distroless/static-debian12:latest

View File

@ -20,41 +20,24 @@ import (
_ "go.uber.org/automaxprocs"
)
type flags struct {
version bool
cursorOption types.GeneratorCursor
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go listenForTermination(func() { cancel() })
flag.Usage = func() {
fmt.Println(
`Usage:
feedgen-az [flags]
Flags:
-h, -help Display this help message
-cursor string Specify the starting point for feed data generation (default: last-generated)
Options:
last-generated: Resume from the last generated data in storage
first-post: Start from the beginning of the posts`)
flags := getFlags()
if flags.version == true {
fmt.Printf("Feedgen Az version: %v\n", version)
os.Exit(0)
}
var cursorOption types.GeneratorCursor
flag.Var(&cursorOption, "cursor", "")
flag.Parse()
if args := flag.Args(); len(args) > 0 {
if len(args) == 1 {
fmt.Printf("unexpected argument: %s\n\n", args[0])
} else {
fmt.Printf("unexpected arguments: %v\n\n", strings.Join(args, ", "))
}
flag.CommandLine.Usage()
os.Exit(1)
}
if cursorOption == "" {
_ = cursorOption.Set("")
if flags.cursorOption == "" {
_ = flags.cursorOption.Set("")
}
feedGenAzConfig, errMap := config.NewFeedGenAzConfig()
@ -89,7 +72,7 @@ Flags:
feedGeneratorAz := feedgenAz.NewGenerator(postCollection, feedAzCollection)
startCrons(ctx, feedGenAzConfig, feedGeneratorAz, feedAzCollection, cursorOption)
startCrons(ctx, feedGenAzConfig, feedGeneratorAz, feedAzCollection, flags.cursorOption)
logger.Log.Info("Cron jobs started")
<-ctx.Done()
@ -139,3 +122,38 @@ func listenForTermination(do func()) {
<-sigChan
do()
}
func getFlags() *flags {
flags := &flags{}
flag.Usage = func() {
fmt.Println(
`Usage:
feedgen-az [flags]
Flags:
-version version information
-h, -help Display this help message
-cursor string Specify the starting point for feed data generation (default: last-generated)
Options:
last-generated: Resume from the last generated data in storage
first-post: Start from the beginning of the posts`)
}
flag.BoolVar(&flags.version, "version", false, "print version information")
flag.Var(&flags.cursorOption, "cursor", "Specify the starting point for feed data generation")
flag.Parse()
if args := flag.Args(); len(args) > 0 {
if len(args) == 1 {
fmt.Printf("unexpected argument: %s\n\n", args[0])
} else {
fmt.Printf("unexpected arguments: %v\n\n", strings.Join(args, ", "))
}
flag.CommandLine.Usage()
os.Exit(1)
}
return flags
}

View File

@ -0,0 +1,3 @@
package main
const version = "0.1.0"

1
go.mod
View File

@ -21,6 +21,7 @@ require (
github.com/goccy/go-json v0.10.2 // indirect
github.com/gocql/gocql v1.7.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.2
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect

2
go.sum
View File

@ -46,6 +46,8 @@ github.com/gocql/gocql v1.7.0 h1:O+7U7/1gSN7QTEAaMEsJc1Oq2QHXvCWoF3DFK9HDHus=
github.com/gocql/gocql v1.7.0/go.mod h1:vnlvXyFZeLBF0Wy+RS8hrOdbn0UWsWtdg07XJnFxZ+4=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8=
github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/aykhans/bsky-feedgen/pkg/api/handler"
"github.com/aykhans/bsky-feedgen/pkg/api/middleware"
"github.com/aykhans/bsky-feedgen/pkg/config"
"github.com/aykhans/bsky-feedgen/pkg/feed"
"github.com/aykhans/bsky-feedgen/pkg/logger"
@ -23,14 +24,19 @@ func Run(
}
feedHandler := handler.NewFeedHandler(feeds, apiConfig.FeedgenPublisherDID)
authMiddleware := middleware.NewAuth(apiConfig.ServiceDID)
mux := http.NewServeMux()
mux.HandleFunc("GET /.well-known/did.json", baseHandler.GetWellKnownDIDDoc)
mux.HandleFunc("GET /xrpc/app.bsky.feed.describeFeedGenerator", feedHandler.DescribeFeeds)
mux.HandleFunc(
mux.Handle(
"GET /xrpc/app.bsky.feed.getFeedSkeleton",
feedHandler.GetFeedSkeleton,
authMiddleware.JWTAuthMiddleware(http.HandlerFunc(feedHandler.GetFeedSkeleton)),
)
mux.HandleFunc("GET /{feed}/users", feedHandler.GetAllUsers)
mux.HandleFunc("GET /{feed}/users/valid/", feedHandler.GetValidUsers)
mux.HandleFunc("GET /{feed}/users/invalid/", feedHandler.GetInvalidUsers)
httpServer := &http.Server{
Addr: fmt.Sprintf(":%d", apiConfig.APIPort),

View File

@ -10,6 +10,7 @@ import (
"github.com/aykhans/bsky-feedgen/pkg/api/middleware"
"github.com/aykhans/bsky-feedgen/pkg/api/response"
"github.com/aykhans/bsky-feedgen/pkg/feed"
generatorAz "github.com/aykhans/bsky-feedgen/pkg/generator/az"
"github.com/aykhans/bsky-feedgen/pkg/types"
"github.com/aykhans/bsky-feedgen/pkg/utils"
"github.com/bluesky-social/indigo/api/bsky"
@ -50,7 +51,7 @@ func (handler *FeedHandler) DescribeFeeds(w http.ResponseWriter, r *http.Request
}
func (handler *FeedHandler) GetFeedSkeleton(w http.ResponseWriter, r *http.Request) {
userDID, _ := r.Context().Value(middleware.UserDIDKey).(string)
userDID, _ := middleware.GetValue[string](r, middleware.UserDIDKey)
feedQuery := r.URL.Query().Get("feed")
if feedQuery == "" {
@ -99,3 +100,46 @@ func (handler *FeedHandler) GetFeedSkeleton(w http.ResponseWriter, r *http.Reque
Cursor: newCursor,
})
}
func (handler *FeedHandler) GetValidUsers(w http.ResponseWriter, r *http.Request) {
feed := r.PathValue("feed")
validUsers := make([]string, 0)
switch feed {
case "AzPulse":
validUsers = generatorAz.Users.GetValidUsers()
}
response.JSON(w, 200, response.M{
"feed": feed,
"users": validUsers,
})
}
func (handler *FeedHandler) GetInvalidUsers(w http.ResponseWriter, r *http.Request) {
feed := r.PathValue("feed")
invalidUsers := make([]string, 0)
switch feed {
case "AzPulse":
invalidUsers = generatorAz.Users.GetInvalidUsers()
}
response.JSON(w, 200, response.M{
"feed": feed,
"users": invalidUsers,
})
}
func (handler *FeedHandler) GetAllUsers(w http.ResponseWriter, r *http.Request) {
feed := r.PathValue("feed")
responseData := response.M{"feed": feed}
switch feed {
case "AzPulse":
responseData["valid_users"] = generatorAz.Users.GetValidUsers()
responseData["invalid_users"] = generatorAz.Users.GetInvalidUsers()
}
response.JSON(w, 200, responseData)
}

View File

@ -2,12 +2,73 @@ package middleware
import (
"context"
"crypto"
"errors"
"fmt"
"net/http"
"slices"
"strings"
"time"
"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/golang-jwt/jwt/v5"
"github.com/whyrusleeping/go-did"
)
const UserDIDKey ContextKey = "user_did"
func JWTAuthMiddleware(next http.Handler) http.Handler {
const (
authorizationHeaderName = "Authorization"
authorizationHeaderValuePrefix = "Bearer "
)
// Global (or dependency-injected) DID resolver with caching.
var didResolver *identity.CacheDirectory
func init() {
baseDir := identity.BaseDirectory{}
// Configure cache with appropriate TTLs.
// Capacity 0 means unlimited cache size.
// hitTTL: 24 hours for successful resolutions.
// errTTL: 5 minutes for failed resolutions.
// invalidHandleTTL: also 5 minutes for invalid handles.
resolver := identity.NewCacheDirectory(
&baseDir,
0, // Unlimited capacity
24*time.Hour, // hitTTL
5*time.Minute, // errTTL
5*time.Minute, // invalidHandleTTL
)
didResolver = &resolver
}
type AuthorizationError struct {
Message string
Err error
}
func (e *AuthorizationError) Error() string {
if e.Err != nil {
return fmt.Sprintf("%s: %v", e.Message, e.Err)
}
return e.Message
}
func (e *AuthorizationError) Unwrap() error {
return e.Err
}
type Auth struct {
serviceDID *did.DID
}
func NewAuth(serviceDID *did.DID) *Auth {
return &Auth{serviceDID}
}
func (auth *Auth) JWTAuthMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
@ -16,8 +77,100 @@ func JWTAuthMiddleware(next http.Handler) http.Handler {
return
}
// TODO: Add auth verification
ctx := context.WithValue(r.Context(), UserDIDKey, "")
userDID, _ := auth.validateAuth(r.Context(), r)
ctx := context.WithValue(r.Context(), UserDIDKey, userDID)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
// getDIDSigningKey resolves a DID and extracts its public signing key.
// It leverages indigo's identity package which handles multibase decoding and key parsing.
func (auth *Auth) getDIDSigningKey(ctx context.Context, did string) (crypto.PublicKey, error) {
atID, err := syntax.ParseAtIdentifier(did)
if err != nil {
return nil, fmt.Errorf("invalid DID syntax: %w", err)
}
// Use Lookup for bi-directional verification (handle -> DID -> handle).
// The `Lookup` method returns an `Identity` struct which contains `PublicKey()` method
// to get the signing key.
identity, err := didResolver.Lookup(ctx, *atID)
if err != nil {
return nil, fmt.Errorf("DID resolution failed for %s: %w", did, err)
}
if identity == nil || identity.DID.String() == "" {
return nil, fmt.Errorf("DID resolution returned empty identity for %s", did)
}
publicKey, err := identity.PublicKey()
if err != nil {
return nil, fmt.Errorf("failed to get signing key for DID %s: %w", did, err)
}
return publicKey, nil
}
// ValidateAuth validates the authorization header and returns the requester's DID.
func (auth *Auth) validateAuth(ctx context.Context, r *http.Request) (string, error) {
authHeader := r.Header.Get(authorizationHeaderName)
if authHeader == "" {
return "", &AuthorizationError{Message: "Authorization header is missing"}
}
if !strings.HasPrefix(authHeader, authorizationHeaderValuePrefix) {
return "", &AuthorizationError{Message: "Invalid authorization header format"}
}
jwtString := strings.TrimPrefix(authHeader, authorizationHeaderValuePrefix)
jwtString = strings.TrimSpace(jwtString)
claims := jwt.RegisteredClaims{}
keyFunc := func(token *jwt.Token) (any, error) {
regClaims, ok := token.Claims.(*jwt.RegisteredClaims)
if !ok {
return nil, fmt.Errorf("invalid JWT claims type")
}
issuerDID := regClaims.Issuer
if issuerDID == "" {
return nil, fmt.Errorf("JWT 'iss' claim is missing")
}
publicKey, err := auth.getDIDSigningKey(ctx, issuerDID)
if err != nil {
return nil, fmt.Errorf("failed to get signing key for DID %s: %w", issuerDID, err)
}
return publicKey, nil
}
token, err := jwt.ParseWithClaims(jwtString, &claims, keyFunc)
if err != nil {
if errors.Is(err, jwt.ErrTokenSignatureInvalid) {
return "", &AuthorizationError{Message: "Invalid signature", Err: err}
}
if errors.Is(err, jwt.ErrTokenExpired) {
return "", &AuthorizationError{Message: "Token expired", Err: err}
}
if errors.Is(err, jwt.ErrTokenNotValidYet) {
return "", &AuthorizationError{Message: "Token not valid yet", Err: err}
}
if errors.Is(err, jwt.ErrTokenMalformed) {
return "", &AuthorizationError{Message: "Malformed token", Err: err}
}
return "", &AuthorizationError{Message: "Failed to parse or validate JWT", Err: err}
}
if !token.Valid {
return "", &AuthorizationError{Message: "Token is invalid"}
}
if slices.Contains(claims.Audience, auth.serviceDID.String()) {
return "", &AuthorizationError{Message: fmt.Sprintf("Invalid audience (expected %s)", auth.serviceDID)}
}
// Return the issuer's DID.
return claims.Issuer, nil
}

View File

@ -1,3 +1,19 @@
package middleware
import (
"net/http"
"github.com/aykhans/bsky-feedgen/pkg/types"
)
type ContextKey string
func GetValue[T any](r *http.Request, key ContextKey) (T, error) {
value, ok := r.Context().Value(key).(T)
if ok == false {
var zero T
return zero, types.ErrNotfound
}
return value, nil
}

View File

@ -0,0 +1,89 @@
package middleware
// copied from https://gist.github.com/bnewbold/bc9b97c9b281295da1fa47c03b0b3c69
import (
"crypto"
"errors"
atcrypto "github.com/bluesky-social/indigo/atproto/crypto"
"github.com/golang-jwt/jwt/v5"
)
var (
SigningMethodES256K *SigningMethodAtproto
SigningMethodES256 *SigningMethodAtproto
)
type SigningMethodAtproto struct {
alg string
hash crypto.Hash
toOutSig toOutSig
sigLen int
}
type toOutSig func(sig []byte) []byte
func init() {
SigningMethodES256K = &SigningMethodAtproto{
alg: "ES256K",
hash: crypto.SHA256,
toOutSig: toES256K,
sigLen: 64,
}
jwt.RegisterSigningMethod(SigningMethodES256K.Alg(), func() jwt.SigningMethod {
return SigningMethodES256K
})
SigningMethodES256 = &SigningMethodAtproto{
alg: "ES256",
hash: crypto.SHA256,
toOutSig: toES256,
sigLen: 64,
}
jwt.RegisterSigningMethod(SigningMethodES256.Alg(), func() jwt.SigningMethod {
return SigningMethodES256
})
}
// Errors returned on different problems.
var (
ErrWrongKeyFormat = errors.New("wrong key type")
ErrBadSignature = errors.New("bad signature")
ErrVerification = errors.New("signature verification failed")
ErrFailedSigning = errors.New("failed generating signature")
ErrHashUnavailable = errors.New("hasher unavailable")
)
func (sm *SigningMethodAtproto) Verify(signingString string, sig []byte, key any) error {
pub, ok := key.(atcrypto.PublicKey)
if !ok {
return ErrWrongKeyFormat
}
if !sm.hash.Available() {
return ErrHashUnavailable
}
if len(sig) != sm.sigLen {
return ErrBadSignature
}
return pub.HashAndVerifyLenient([]byte(signingString), sig)
}
func (sm *SigningMethodAtproto) Sign(signingString string, key any) ([]byte, error) {
// TODO: implement signatures
return nil, ErrFailedSigning
}
func (sm *SigningMethodAtproto) Alg() string {
return sm.alg
}
func toES256K(sig []byte) []byte {
return sig[:64]
}
func toES256(sig []byte) []byte {
return sig[:64]
}

View File

@ -1,5 +1,9 @@
package consumer
// This file contains code for consuming and processing the Bluesky firehose event stream.
// Most of this implementation is copied and inspired from the original source at:
// https://github.com/bluesky-social/indigo/blob/main/cmd/beemo/firehose_consumer.go
import (
"bytes"
"context"

View File

@ -39,7 +39,7 @@ func (f *FeedAz) Describe(_ context.Context) bsky.FeedDescribeFeedGenerator_Feed
func (f *FeedAz) GetPage(
ctx context.Context,
_ string,
_ string, // user did
limit int64,
cursor string,
) ([]*bsky.FeedDefs_SkeletonFeedPost, *string, error) {

View File

@ -114,7 +114,7 @@ func (generator *Generator) IsValid(post *collections.Post) bool {
return false
}
if isValidUser := users.IsValid(post.DID); isValidUser != nil {
if isValidUser := Users.IsValid(post.DID); isValidUser != nil {
return *isValidUser
}

View File

@ -2,7 +2,7 @@ package az
import "github.com/aykhans/bsky-feedgen/pkg/generator"
var users = generator.Users{
var Users = generator.Users{
// Invalid
"did:plc:5zww7zorx2ajw7hqrhuix3ba": false,
"did:plc:c4vhz47h566t2ntgd7gtawen": false,
@ -14,6 +14,7 @@ var users = generator.Users{
"did:plc:5vwjnzaibnwscbbcvkzhy57v": false,
"did:plc:6mfp3coadoobuvlg6w2avw6x": false,
"did:plc:lm2uhaoqoe6yo76oeihndfyi": false,
"did:plc:vizwdor43adw3277u2kkrssd": false,
// Valid
"did:plc:jbt4qi6psd7rutwzedtecsq7": true,

View File

@ -4,6 +4,15 @@ import "github.com/aykhans/bsky-feedgen/pkg/utils"
type Users map[string]bool
// IsValid checks if a given DID exists in the Users map and returns its validity status.
//
// Parameters:
//
// did: The Decentralized Identifier string to check
//
// Returns:
// - *bool: A pointer to the validity status if the DID exists in the map
// - nil: If the DID does not exist in the map
func (u Users) IsValid(did string) *bool {
isValid, ok := u[did]
if ok == false {
@ -12,3 +21,49 @@ func (u Users) IsValid(did string) *bool {
return utils.ToPtr(isValid)
}
// GetValidUsers returns a slice of DIDs that are marked as valid in the Users map.
//
// Returns:
// - []string: A slice of valid DIDs, limited by the specified parameters
func (u Users) GetValidUsers() []string {
validUsers := make([]string, 0)
for did, isValid := range u {
if isValid {
validUsers = append(validUsers, did)
}
}
return validUsers
}
// GetInvalidUsers returns a slice of DIDs that are marked as invalid in the Users map.
//
// Returns:
// - []string: A slice of invalid DIDs, limited by the specified parameters
func (u Users) GetInvalidUsers() []string {
invalidUsers := make([]string, 0)
for did, isValid := range u {
if !isValid {
invalidUsers = append(invalidUsers, did)
}
}
return invalidUsers
}
// GetAll returns a slice of all DIDs in the Users map, regardless of validity status.
//
// Returns:
// - []string: A slice containing all DIDs in the map
func (u Users) GetAll() []string {
allUsers := make([]string, 0, len(u))
for did := range u {
allUsers = append(allUsers, did)
}
return allUsers
}

View File

@ -4,4 +4,5 @@ import "errors"
var (
ErrInternal = errors.New("internal error")
ErrNotfound = errors.New("not found")
)