fix: list activities

This commit is contained in:
Steven 2023-12-20 08:01:27 +08:00
parent d4c7de3916
commit bec2c15ac9
9 changed files with 126 additions and 127 deletions

View File

@ -4,12 +4,12 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"strconv"
"github.com/labstack/echo/v4" "github.com/labstack/echo/v4"
"github.com/mssola/useragent" "github.com/mssola/useragent"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
"github.com/yourselfhosted/slash/internal/util"
"github.com/yourselfhosted/slash/server/metric" "github.com/yourselfhosted/slash/server/metric"
"github.com/yourselfhosted/slash/store" "github.com/yourselfhosted/slash/store"
) )
@ -38,13 +38,13 @@ type AnalysisData struct {
func (s *APIV1Service) registerAnalyticsRoutes(g *echo.Group) { func (s *APIV1Service) registerAnalyticsRoutes(g *echo.Group) {
g.GET("/shortcut/:shortcutId/analytics", func(c echo.Context) error { g.GET("/shortcut/:shortcutId/analytics", func(c echo.Context) error {
ctx := c.Request().Context() ctx := c.Request().Context()
shortcutID, err := strconv.Atoi(c.Param("shortcutId")) shortcutID, err := util.ConvertStringToInt32(c.Param("shortcutId"))
if err != nil { if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("shortcut id is not a number: %s", c.Param("shortcutId"))).SetInternal(err) return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("shortcut id is not a number: %s", c.Param("shortcutId"))).SetInternal(err)
} }
activities, err := s.Store.ListActivities(ctx, &store.FindActivity{ activities, err := s.Store.ListActivities(ctx, &store.FindActivity{
Type: store.ActivityShortcutView, Type: store.ActivityShortcutView,
Where: []string{fmt.Sprintf("json_extract(payload, '$.shortcutId') = %d", shortcutID)}, PayloadShortcutID: &shortcutID,
}) })
if err != nil { if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to get activities, err: %s", err)).SetInternal(err) return echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to get activities, err: %s", err)).SetInternal(err)

View File

@ -320,7 +320,7 @@ func (s *APIV1Service) composeShortcut(ctx context.Context, shortcut *Shortcut)
activityList, err := s.Store.ListActivities(ctx, &store.FindActivity{ activityList, err := s.Store.ListActivities(ctx, &store.FindActivity{
Type: store.ActivityShortcutView, Type: store.ActivityShortcutView,
Level: store.ActivityInfo, Level: store.ActivityInfo,
Where: []string{fmt.Sprintf("json_extract(payload, '$.shortcutId') = %d", shortcut.ID)}, PayloadShortcutID: &shortcut.ID,
}) })
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Failed to list activities") return nil, errors.Wrap(err, "Failed to list activities")

View File

@ -275,7 +275,7 @@ func (s *APIV2Service) GetShortcutAnalytics(ctx context.Context, request *apiv2p
activities, err := s.Store.ListActivities(ctx, &store.FindActivity{ activities, err := s.Store.ListActivities(ctx, &store.FindActivity{
Type: store.ActivityShortcutView, Type: store.ActivityShortcutView,
Where: []string{fmt.Sprintf("json_extract(payload, '$.shortcutId') = %d", shortcut.Id)}, PayloadShortcutID: &shortcut.Id,
}) })
if err != nil { if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get activities, err: %v", err) return nil, status.Errorf(codes.Internal, "failed to get activities, err: %v", err)
@ -406,7 +406,7 @@ func (s *APIV2Service) convertShortcutFromStorepb(ctx context.Context, shortcut
activityList, err := s.Store.ListActivities(ctx, &store.FindActivity{ activityList, err := s.Store.ListActivities(ctx, &store.FindActivity{
Type: store.ActivityShortcutView, Type: store.ActivityShortcutView,
Level: store.ActivityInfo, Level: store.ActivityInfo,
Where: []string{fmt.Sprintf("json_extract(payload, '$.shortcutId') = %d", composedShortcut.Id)}, PayloadShortcutID: &composedShortcut.Id,
}) })
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Failed to list activities") return nil, errors.Wrap(err, "Failed to list activities")

View File

@ -60,8 +60,6 @@ func NewServer(ctx context.Context, profile *profile.Profile, store *store.Store
`"status":${status},"error":"${error}"}` + "\n", `"status":${status},"error":"${error}"}` + "\n",
})) }))
e.Use(middleware.Gzip())
e.Use(middleware.CORSWithConfig(middleware.CORSConfig{ e.Use(middleware.CORSWithConfig(middleware.CORSConfig{
Skipper: grpcRequestSkipper, Skipper: grpcRequestSkipper,
AllowOrigins: []string{"*"}, AllowOrigins: []string{"*"},

View File

@ -58,7 +58,7 @@ type Activity struct {
type FindActivity struct { type FindActivity struct {
Type ActivityType Type ActivityType
Level ActivityLevel Level ActivityLevel
Where []string PayloadShortcutID *int32
} }
func (s *Store) CreateActivity(ctx context.Context, create *Activity) (*Activity, error) { func (s *Store) CreateActivity(ctx context.Context, create *Activity) (*Activity, error) {

View File

@ -2,6 +2,7 @@ package postgres
import ( import (
"context" "context"
"fmt"
"strings" "strings"
"github.com/yourselfhosted/slash/store" "github.com/yourselfhosted/slash/store"
@ -42,8 +43,8 @@ func (d *DB) ListActivities(ctx context.Context, find *store.FindActivity) ([]*s
if find.Level != "" { if find.Level != "" {
where, args = append(where, "level = "+placeholder(len(args)+1)), append(args, find.Level.String()) where, args = append(where, "level = "+placeholder(len(args)+1)), append(args, find.Level.String())
} }
if find.Where != nil { if find.PayloadShortcutID != nil {
where = append(where, find.Where...) where, args = append(where, fmt.Sprintf("CAST(payload as JSON)->>'shortcutId' = %s", placeholder(len(args)+1))), append(args, *find.PayloadShortcutID)
} }
query := ` query := `

View File

@ -5,9 +5,9 @@ import (
"embed" "embed"
"fmt" "fmt"
"io/fs" "io/fs"
"os"
"regexp" "regexp"
"sort" "sort"
"strings"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -15,50 +15,88 @@ import (
"github.com/yourselfhosted/slash/store" "github.com/yourselfhosted/slash/store"
) )
const (
latestSchemaFileName = "LATEST.sql"
)
//go:embed migration //go:embed migration
var migrationFS embed.FS var migrationFS embed.FS
//go:embed seed //go:embed seed
var seedFS embed.FS var seedFS embed.FS
// Migrate applies the latest schema to the database.
func (d *DB) Migrate(ctx context.Context) error { func (d *DB) Migrate(ctx context.Context) error {
currentVersion := version.GetCurrentVersion(d.profile.Mode) if d.profile.IsDev() {
if d.profile.Mode == "prod" { return d.nonProdMigrate(ctx)
_, err := os.Stat(d.profile.DSN) }
if err != nil {
// If db file not exists, we should create a new one with latest schema. return d.prodMigrate(ctx)
if errors.Is(err, os.ErrNotExist) { }
if err := d.applyLatestSchema(ctx); err != nil {
return errors.Wrap(err, "failed to apply latest schema") func (d *DB) nonProdMigrate(ctx context.Context) error {
rows, err := d.db.QueryContext(ctx, "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';")
if err != nil {
return errors.Errorf("failed to query database tables: %s", err)
}
if rows.Err() != nil {
return errors.Errorf("failed to query database tables: %s", err)
}
defer rows.Close()
var tables []string
for rows.Next() {
var table string
err := rows.Scan(&table)
if err != nil {
return errors.Errorf("failed to scan table name: %s", err)
}
tables = append(tables, table)
}
if len(tables) != 0 {
return nil
}
println("no tables in the database. start migration")
buf, err := migrationFS.ReadFile("migration/dev/" + latestSchemaFileName)
if err != nil {
return errors.Errorf("failed to read latest schema file: %s", err)
}
stmt := string(buf)
if _, err := d.db.ExecContext(ctx, stmt); err != nil {
return errors.Errorf("failed to exec SQL %s: %s", stmt, err)
}
// In demo mode, we should seed the database.
if d.profile.Mode == "demo" {
if err := d.Seed(ctx); err != nil {
return errors.Wrap(err, "failed to seed")
}
}
return nil
}
func (d *DB) prodMigrate(ctx context.Context) error {
currentVersion := version.GetCurrentVersion(d.profile.Mode)
migrationHistoryList, err := d.ListMigrationHistories(ctx, &store.FindMigrationHistory{})
// If there is no migration history, we should apply the latest schema.
if err != nil || len(migrationHistoryList) == 0 {
buf, err := migrationFS.ReadFile("migration/prod/" + latestSchemaFileName)
if err != nil {
return errors.Errorf("failed to read latest schema file: %s", err)
}
stmt := string(buf)
if _, err := d.db.ExecContext(ctx, stmt); err != nil {
return errors.Errorf("failed to exec SQL %s: %s", stmt, err)
} }
// Upsert the newest version to migration_history.
if _, err := d.UpsertMigrationHistory(ctx, &store.UpsertMigrationHistory{ if _, err := d.UpsertMigrationHistory(ctx, &store.UpsertMigrationHistory{
Version: currentVersion, Version: currentVersion,
}); err != nil { }); err != nil {
return errors.Wrap(err, "failed to upsert migration history") return errors.Wrap(err, "failed to upsert migration history")
} }
} else {
return errors.Wrap(err, "failed to get db file stat")
}
} else {
// If db file exists, we should check if we need to migrate the database.
migrationHistoryList, err := d.ListMigrationHistories(ctx, &store.FindMigrationHistory{})
if err != nil {
return errors.Wrap(err, "failed to find migration history")
}
// If no migration history, we should apply the latest version migration and upsert the migration history.
if len(migrationHistoryList) == 0 {
minorVersion := version.GetMinorVersion(currentVersion)
if err := d.applyMigrationForMinorVersion(ctx, minorVersion); err != nil {
return errors.Wrapf(err, "failed to apply version %s migration", minorVersion)
}
_, err := d.UpsertMigrationHistory(ctx, &store.UpsertMigrationHistory{
Version: currentVersion,
})
if err != nil {
return errors.Wrap(err, "failed to upsert migration history")
}
return nil return nil
} }
@ -68,11 +106,12 @@ func (d *DB) Migrate(ctx context.Context) error {
} }
sort.Sort(version.SortVersion(migrationHistoryVersionList)) sort.Sort(version.SortVersion(migrationHistoryVersionList))
latestMigrationHistoryVersion := migrationHistoryVersionList[len(migrationHistoryVersionList)-1] latestMigrationHistoryVersion := migrationHistoryVersionList[len(migrationHistoryVersionList)-1]
if !version.IsVersionGreaterThan(version.GetSchemaVersion(currentVersion), latestMigrationHistoryVersion) {
return nil
}
if version.IsVersionGreaterThan(version.GetSchemaVersion(currentVersion), latestMigrationHistoryVersion) {
minorVersionList := getMinorVersionList()
println("start migrate") println("start migrate")
for _, minorVersion := range minorVersionList { for _, minorVersion := range getMinorVersionList() {
normalizedVersion := minorVersion + ".0" normalizedVersion := minorVersion + ".0"
if version.IsVersionGreaterThan(normalizedVersion, latestMigrationHistoryVersion) && version.IsVersionGreaterOrEqualThan(currentVersion, normalizedVersion) { if version.IsVersionGreaterThan(normalizedVersion, latestMigrationHistoryVersion) && version.IsVersionGreaterOrEqualThan(currentVersion, normalizedVersion) {
println("applying migration for", normalizedVersion) println("applying migration for", normalizedVersion)
@ -82,74 +121,35 @@ func (d *DB) Migrate(ctx context.Context) error {
} }
} }
println("end migrate") println("end migrate")
}
}
} else {
// In non-prod mode, we should always migrate the database.
if _, err := os.Stat(d.profile.DSN); errors.Is(err, os.ErrNotExist) {
if err := d.applyLatestSchema(ctx); err != nil {
return errors.Wrap(err, "failed to apply latest schema")
}
// In demo mode, we should seed the database.
if d.profile.Mode == "demo" {
if err := d.Seed(ctx); err != nil {
return errors.Wrap(err, "failed to seed")
}
}
}
}
return nil
}
const (
latestSchemaFileName = "LATEST.sql"
)
func (d *DB) applyLatestSchema(ctx context.Context) error {
schemaMode := "dev"
if d.profile.Mode == "prod" {
schemaMode = "prod"
}
latestSchemaPath := fmt.Sprintf("migration/%s/%s", schemaMode, latestSchemaFileName)
buf, err := migrationFS.ReadFile(latestSchemaPath)
if err != nil {
return errors.Wrapf(err, "failed to read latest schema %q", latestSchemaPath)
}
stmt := string(buf)
if err := d.execute(ctx, stmt); err != nil {
return errors.Wrapf(err, "migrate error: %s", stmt)
}
return nil return nil
} }
func (d *DB) applyMigrationForMinorVersion(ctx context.Context, minorVersion string) error { func (d *DB) applyMigrationForMinorVersion(ctx context.Context, minorVersion string) error {
filenames, err := fs.Glob(migrationFS, fmt.Sprintf("%s/%s/*.sql", "migration/prod", minorVersion)) filenames, err := fs.Glob(migrationFS, fmt.Sprintf("migration/prod/%s/*.sql", minorVersion))
if err != nil { if err != nil {
return errors.Wrap(err, "failed to read ddl files") return errors.Wrap(err, "failed to read ddl files")
} }
sort.Strings(filenames) sort.Strings(filenames)
migrationStmt := ""
// Loop over all migration files and execute them in order. // Loop over all migration files and execute them in order.
for _, filename := range filenames { for _, filename := range filenames {
buf, err := migrationFS.ReadFile(filename) buf, err := migrationFS.ReadFile(filename)
if err != nil { if err != nil {
return errors.Wrapf(err, "failed to read minor version migration file, filename=%s", filename) return errors.Wrapf(err, "failed to read minor version migration file, filename=%s", filename)
} }
stmt := string(buf) for _, stmt := range strings.Split(string(buf), ";") {
migrationStmt += stmt if strings.TrimSpace(stmt) == "" {
if err := d.execute(ctx, stmt); err != nil { continue
}
if _, err := d.db.ExecContext(ctx, stmt); err != nil {
return errors.Wrapf(err, "migrate error: %s", stmt) return errors.Wrapf(err, "migrate error: %s", stmt)
} }
} }
}
// Upsert the newest version to migration_history. // Upsert the newest version to migration_history.
version := minorVersion + ".0" version := minorVersion + ".0"
if _, err = d.UpsertMigrationHistory(ctx, &store.UpsertMigrationHistory{ if _, err = d.UpsertMigrationHistory(ctx, &store.UpsertMigrationHistory{Version: version}); err != nil {
Version: version,
}); err != nil {
return errors.Wrapf(err, "failed to upsert migration history with version: %s", version) return errors.Wrapf(err, "failed to upsert migration history with version: %s", version)
} }

View File

@ -118,27 +118,27 @@ func (d *DB) UpdateShortcut(ctx context.Context, update *store.UpdateShortcut) (
func (d *DB) ListShortcuts(ctx context.Context, find *store.FindShortcut) ([]*storepb.Shortcut, error) { func (d *DB) ListShortcuts(ctx context.Context, find *store.FindShortcut) ([]*storepb.Shortcut, error) {
where, args := []string{"1 = 1"}, []any{} where, args := []string{"1 = 1"}, []any{}
if v := find.ID; v != nil { if v := find.ID; v != nil {
where, args = append(where, fmt.Sprintf("id = $%d", len(args)+1)), append(args, *v) where, args = append(where, fmt.Sprintf("id = %s", placeholder(len(args)+1))), append(args, *v)
} }
if v := find.CreatorID; v != nil { if v := find.CreatorID; v != nil {
where, args = append(where, fmt.Sprintf("creator_id = $%d", len(args)+1)), append(args, *v) where, args = append(where, fmt.Sprintf("creator_id = %s", placeholder(len(args)+1))), append(args, *v)
} }
if v := find.RowStatus; v != nil { if v := find.RowStatus; v != nil {
where, args = append(where, fmt.Sprintf("row_status = $%d", len(args)+1)), append(args, *v) where, args = append(where, fmt.Sprintf("row_status = %s", placeholder(len(args)+1))), append(args, *v)
} }
if v := find.Name; v != nil { if v := find.Name; v != nil {
where, args = append(where, fmt.Sprintf("name = $%d", len(args)+1)), append(args, *v) where, args = append(where, fmt.Sprintf("name = %s", placeholder(len(args)+1))), append(args, *v)
} }
if v := find.VisibilityList; len(v) != 0 { if v := find.VisibilityList; len(v) != 0 {
list := []string{} list := []string{}
for _, visibility := range v { for _, visibility := range v {
list = append(list, fmt.Sprintf("$%d", len(args)+1)) list = append(list, placeholder(len(args)+1))
args = append(args, visibility) args = append(args, visibility)
} }
where = append(where, fmt.Sprintf("visibility IN (%s)", strings.Join(list, ","))) where = append(where, fmt.Sprintf("visibility IN (%s)", strings.Join(list, ",")))
} }
if v := find.Tag; v != nil { if v := find.Tag; v != nil {
where, args = append(where, fmt.Sprintf("tag LIKE $%d", len(args)+1)), append(args, "%"+*v+"%") where, args = append(where, fmt.Sprintf("tag LIKE %s", placeholder(len(args)+1))), append(args, "%"+*v+"%")
} }
rows, err := d.db.QueryContext(ctx, fmt.Sprintf(` rows, err := d.db.QueryContext(ctx, fmt.Sprintf(`

View File

@ -42,8 +42,8 @@ func (d *DB) ListActivities(ctx context.Context, find *store.FindActivity) ([]*s
if find.Level != "" { if find.Level != "" {
where, args = append(where, "level = ?"), append(args, find.Level.String()) where, args = append(where, "level = ?"), append(args, find.Level.String())
} }
if find.Where != nil { if find.PayloadShortcutID != nil {
where = append(where, find.Where...) where, args = append(where, "json_extract(payload, '$.shortcutId') = ?"), append(args, *find.PayloadShortcutID)
} }
query := ` query := `