From 70c4951bdb441f8c7ae1c1927f505fc043b80db3 Mon Sep 17 00:00:00 2001 From: zhenyus Date: Sun, 11 May 2025 22:33:47 +0800 Subject: [PATCH] feat: add event cleanup configuration and functionality Signed-off-by: zhenyus --- .../configs/configs.example.yaml | 6 ++- apps/gitea-webhook-ambassador/main.go | 42 ++++++++++++++++--- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/apps/gitea-webhook-ambassador/configs/configs.example.yaml b/apps/gitea-webhook-ambassador/configs/configs.example.yaml index 55ff13e6..734cacc8 100644 --- a/apps/gitea-webhook-ambassador/configs/configs.example.yaml +++ b/apps/gitea-webhook-ambassador/configs/configs.example.yaml @@ -47,4 +47,8 @@ worker: poolSize: 10 queueSize: 100 maxRetries: 3 - retryBackoff: 1 \ No newline at end of file + retryBackoff: 1 + +eventCleanup: + interval: 3600 + expireAfter: 7200 \ No newline at end of file diff --git a/apps/gitea-webhook-ambassador/main.go b/apps/gitea-webhook-ambassador/main.go index 7124add9..1c14578c 100644 --- a/apps/gitea-webhook-ambassador/main.go +++ b/apps/gitea-webhook-ambassador/main.go @@ -53,6 +53,11 @@ type Configuration struct { MaxRetries int `yaml:"maxRetries" default:"3" validate:"gte=0"` RetryBackoff int `yaml:"retryBackoff" default:"1" validate:"gt=0"` // seconds } `yaml:"worker"` + + EventCleanup struct { + Interval int `yaml:"interval" default:"3600"` // seconds + ExpireAfter int `yaml:"expireAfter" default:"7200"` // seconds + } `yaml:"eventCleanup"` } // ProjectConfig represents the configuration for a specific repository @@ -150,6 +155,9 @@ func main() { // Setup config file watcher for auto-reload setupConfigWatcher(*configFile) + // Start event cleanup goroutine + go cleanupEvents() + // Configure HTTP client with timeout configMutex.RLock() httpClient = &http.Client{ @@ -299,6 +307,12 @@ func loadConfig(file string) error { if newConfig.Worker.RetryBackoff == 0 { newConfig.Worker.RetryBackoff = 1 } + if newConfig.EventCleanup.Interval == 0 { + newConfig.EventCleanup.Interval = 3600 + } + if newConfig.EventCleanup.ExpireAfter == 0 { + newConfig.EventCleanup.ExpireAfter = 7200 + } // Handle legacy configuration format (where Projects is map[string]string) // This is to maintain backward compatibility with existing configs @@ -525,11 +539,7 @@ func handleWebhook(w http.ResponseWriter, r *http.Request) { } // Store in processed events with a TTL (we'll use a goroutine to remove after 1 hour) - processedEvents.Store(eventID, true) - go func(key string) { - time.Sleep(1 * time.Hour) - processedEvents.Delete(key) - }(eventID) + processedEvents.Store(eventID, time.Now()) // Check if we have a Jenkins job mapping for this repository configMutex.RLock() @@ -742,3 +752,25 @@ func logError(format string, v ...interface{}) { // Error level logs are always shown logger.Printf("[ERROR] "+format, v...) } + +func cleanupEvents() { + for { + configMutex.RLock() + interval := time.Duration(config.EventCleanup.Interval) * time.Second + expireAfter := time.Duration(config.EventCleanup.ExpireAfter) * time.Second + configMutex.RUnlock() + + time.Sleep(interval) + + now := time.Now() + processedEvents.Range(func(key, value interface{}) bool { + if timestamp, ok := value.(time.Time); ok { + if now.Sub(timestamp) > expireAfter { + processedEvents.Delete(key) + logDebug("Cleaned up expired event: %v", key) + } + } + return true + }) + } +}