package scheduler import ( "context" "git.dragse.it/anthrove/e621-to-graph/pkg/e621" "git.dragse.it/anthrove/e621-to-graph/pkg/util/queue" log "github.com/sirupsen/logrus" "golang.org/x/time/rate" "reflect" ) type Scheduler struct { queue queue.Queue limiter *rate.Limiter } func NewScheduler() *Scheduler { scheduler := &Scheduler{ queue: queue.NewQueue(), limiter: nil, } go scheduler.StartExecutionHandler() return scheduler } func Schedule[T e621.DataType](s *Scheduler, t e621.Task[T], username string, apiKey string) func() (T, error) { channel := make(chan e621.DataResponse[T]) schedulerTask := NewSchedulerTaskImpl[T](t, channel, username, apiKey) log.WithFields(log.Fields{ "type": reflect.TypeOf(*new(T)), }).Debug("scheduler: pushing task to queue") err := s.queue.Push(schedulerTask) if err != nil { return func() (T, error) { var nil T return nil, err } } log.WithFields(log.Fields{ "type": reflect.TypeOf(*new(T)), }).Trace("scheduler: pushed task to queue") return func() (T, error) { log.Trace("scheduler: getting data from channel") data := <-channel return data.Data, data.Error } } func (s *Scheduler) SetLimiter(limiter *rate.Limiter) { log.WithFields(log.Fields{ "burst": limiter.Burst(), "limit": limiter.Limit(), "tokens": limiter.Tokens(), }).Debug("scheduler: setting limiter") s.limiter = limiter } func (s *Scheduler) StartExecutionHandler() { for { if s.limiter != nil { s.limiter.Wait(context.Background()) } s.queue.WaitForElement() log.Trace("scheduler: element found") task, err := s.queue.Pop() if err != nil { continue } GetAPIRequest(task) } }