package worker import ( "sync" "time" "freeleaps.com/gitea-webhook-ambassador/internal/database" "freeleaps.com/gitea-webhook-ambassador/internal/jenkins" "freeleaps.com/gitea-webhook-ambassador/internal/logger" "github.com/panjf2000/ants/v2" ) // Pool represents a worker pool for processing Jenkins jobs type Pool struct { pool *ants.Pool jobQueue chan Job client *jenkins.Client db *database.DB maxRetries int retryDelay time.Duration } // Job represents a Jenkins job to be processed type Job struct { Name string Parameters jenkins.JobParameters EventID string RepositoryName string BranchName string CommitSHA string Attempts int } // Config holds the worker pool configuration type Config struct { PoolSize int QueueSize int MaxRetries int RetryBackoff time.Duration Client *jenkins.Client DB *database.DB } // Stats represents worker pool statistics type Stats struct { ActiveWorkers int QueueSize int } // processedEvents tracks processed webhook events for idempotency var processedEvents sync.Map // New creates a new worker pool func New(config Config) (*Pool, error) { pool, err := ants.NewPool(config.PoolSize, ants.WithNonblocking(true)) if err != nil { return nil, err } return &Pool{ pool: pool, jobQueue: make(chan Job, config.QueueSize), client: config.Client, db: config.DB, maxRetries: config.MaxRetries, retryDelay: config.RetryBackoff, }, nil } // Submit adds a job to the queue func (p *Pool) Submit(job Job) bool { // Check if we've already processed this event if _, exists := processedEvents.Load(job.EventID); exists { logger.Info("Skipping already processed event: %s", job.EventID) return true } // Store in processed events with a TTL processedEvents.Store(job.EventID, time.Now()) select { case p.jobQueue <- job: return true default: logger.Warn("Failed to queue job: queue full") return false } } // processQueue handles the job queue func (p *Pool) processQueue() { for job := range p.jobQueue { if err := p.pool.Submit(func() { p.processJob(job) }); err != nil { logger.Error("Failed to submit job: %v", err) } } } // processJob is the worker function that processes each job func (p *Pool) processJob(job Job) { err := p.client.TriggerJob(job.Name, job.Parameters) // Log the trigger attempt triggerLog := &database.TriggerLog{ RepositoryName: job.RepositoryName, BranchName: job.BranchName, CommitSHA: job.CommitSHA, JobName: job.Name, Status: "SUCCESS", } if err != nil { triggerLog.Status = "FAILED" triggerLog.ErrorMessage = err.Error() if job.Attempts < p.maxRetries { job.Attempts++ // Exponential backoff backoff := p.retryDelay << uint(job.Attempts-1) time.Sleep(backoff) select { case p.jobQueue <- job: logger.Info("Retrying job %s (attempt %d/%d) after %v", job.Name, job.Attempts, p.maxRetries, backoff) default: logger.Error("Failed to queue retry for job %s: queue full", job.Name) } } else { logger.Error("Job %s failed after %d attempts: %v", job.Name, job.Attempts, err) } } else { logger.Info("Successfully processed job %s for event %s", job.Name, job.EventID) } // Save trigger log to database if err := p.db.LogTrigger(triggerLog); err != nil { logger.Error("Failed to log trigger: %v", err) } } // Release releases the worker pool resources func (p *Pool) Release() { close(p.jobQueue) p.pool.Release() } // IsJenkinsConnected checks if Jenkins connection is working func (p *Pool) IsJenkinsConnected() bool { return p.client.IsConnected() } // GetStats returns the current worker pool statistics func (p *Pool) GetStats() Stats { running := p.pool.Running() return Stats{ ActiveWorkers: running, QueueSize: p.pool.Waiting(), } } // CleanupEvents removes expired events from the processedEvents map func CleanupEvents(expireAfter time.Duration) { for { time.Sleep(time.Hour) // Run cleanup every hour now := time.Now() processedEvents.Range(func(key, value interface{}) bool { if timestamp, ok := value.(time.Time); ok { if now.Sub(timestamp) > expireAfter { processedEvents.Delete(key) logger.Debug("Cleaned up expired event: %v", key) } } return true }) } }