diff --git a/api/user.go b/api/user.go index 2c8d261..6d908de 100644 --- a/api/user.go +++ b/api/user.go @@ -6,12 +6,14 @@ import ( "e621_to_neo4j/e621" "e621_to_neo4j/services" "fmt" + log "github.com/sirupsen/logrus" "net/http" ) // UserHandler is the handler for the user API func UserHandler(ctx context.Context, graphConnection database.GraphConnection, e621Client *e621.Client) func(response http.ResponseWriter, request *http.Request) { return func(w http.ResponseWriter, r *http.Request) { + log.Println("Request") if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) fmt.Fprintf(w, "Only POST requests are allowed") diff --git a/database/neo4j/impl.go b/database/neo4j/impl.go index ed587f2..c2035fa 100644 --- a/database/neo4j/impl.go +++ b/database/neo4j/impl.go @@ -12,6 +12,10 @@ type neo4jConnection struct { driver neo4j.DriverWithContext } +func NewNeo4JConnection() database.GraphConnection { + return &neo4jConnection{} +} + func (c *neo4jConnection) CheckUserToPostLink(ctx context.Context, e621PostID int64, e621UserID int64) (bool, error) { return RelationshipCheckUserToPost(ctx, c.driver, e621PostID, e621UserID) } @@ -57,10 +61,6 @@ func (c *neo4jConnection) Connect(ctx context.Context, endpoint string, username return nil } -func NewNeo4JConnection() database.GraphConnection { - return &neo4jConnection{} -} - func useConsoleLogger(level neo4j.LogLevel) func(config *config.Config) { return func(config *config.Config) { config.Log = neo4j.ConsoleLogger(level) diff --git a/e621/client.go b/e621/client.go index 8f647f6..4528118 100644 --- a/e621/client.go +++ b/e621/client.go @@ -1,10 +1,6 @@ package e621 -import ( - "e621_to_neo4j/utils" - "golang.org/x/time/rate" - "net/http" -) +import "golang.org/x/time/rate" const ( baseURL = "https://e621.net" @@ -12,20 +8,18 @@ const ( // Client represents the e621 API client. type Client struct { - apiKey string - username string - client *http.Client - limiter *rate.Limiter - queue *utils.Queue + apiKey string + username string + scheduler *Scheduler } // NewClient creates a new e621 API client. func NewClient(apiKey string, username string) *Client { + scheduler := NewScheduler() + scheduler.SetLimiter(rate.NewLimiter(1, 2)) return &Client{ - apiKey: apiKey, - username: username, - client: &http.Client{}, - limiter: rate.NewLimiter(1, 2), - queue: &utils.Queue{}, + apiKey: apiKey, + username: username, + scheduler: scheduler, } } diff --git a/e621/favorite.go b/e621/favorite.go index da7227d..76f34bf 100644 --- a/e621/favorite.go +++ b/e621/favorite.go @@ -1,39 +1,14 @@ package e621 import ( + "context" "e621_to_neo4j/e621/models" "fmt" - log "github.com/sirupsen/logrus" ) // GetFavorites retrieves all favorites from the e621 API. -func (c *Client) GetFavorites(user models.E621User) ([]models.Post, error) { - page := 1 - var allFavorites []models.Post - var URIPath string - - for { - URIPath = fmt.Sprintf("favorites.json?user_id=%d&limit=%d&page=%d", user.ID, 320, page) - log.WithFields(log.Fields{ - "id": user.ID, - "fav_page": page, - "uri": URIPath, - }).Debug("Requesting API for favorites") - favorite, err := ExecuteGetAPIRequest[models.PostResponseWrapper](c, URIPath) - if err != nil { - log.Printf(err.Error()) - } - - // Append the fetched posts to the result slice - allFavorites = append(allFavorites, favorite.Posts...) - - // If no more posts are returned, return the accumulated favorites - if len(favorite.Posts) == 0 { - return allFavorites, nil - } - - // Update the last post ID for the next page request - page += 1 - - } +func (c *Client) GetFavorites(_ context.Context, user models.E621User, page int64) func() (models.PostResponseWrapper, error) { + URIPath := fmt.Sprintf("favorites.json?user_id=%d&limit=%d&page=%d", user.ID, 320, page) + e621Task := NewE621ApiTask[models.PostResponseWrapper](URIPath) + return Schedule[models.PostResponseWrapper](c.scheduler, e621Task, c.username, c.apiKey) } diff --git a/e621/request.go b/e621/request.go index c43205b..147a9be 100644 --- a/e621/request.go +++ b/e621/request.go @@ -1,45 +1,39 @@ package e621 import ( - "context" - "encoding/json" + "e621_to_neo4j/utils" "fmt" - "io" + log "github.com/sirupsen/logrus" "net/http" ) -func ExecuteGetAPIRequest[dataType any](c *Client, URIPath string) (*dataType, error) { +func ExecuteGetAPIRequest(schedulerTask utils.SchedulerTask) { var err error - c.limiter.Wait(context.Background()) - url := fmt.Sprintf("%s/%s", baseURL, URIPath) + log.Debug("executing scheduler task") + + url := fmt.Sprintf("%s/%s", baseURL, schedulerTask.UriPath()) req, err := http.NewRequest("GET", url, nil) if err != nil { - return nil, err + schedulerTask.SendError(err) + return } req.Header.Set("User-Agent", "FavGetter (by Selloo)") req.Header.Add("Accept", "application/json") - req.SetBasicAuth(c.username, c.apiKey) + req.SetBasicAuth(schedulerTask.BasicAuth()) - resp, err := c.client.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() + client := http.Client{} - body, err := io.ReadAll(resp.Body) + resp, err := client.Do(req) if err != nil { - return nil, err + schedulerTask.SendStatusCode(resp.StatusCode) + return } if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("failed to retrieve posts: %s", resp.Status) + schedulerTask.SendStatusCode(resp.StatusCode) + return } - var r dataType - err = json.Unmarshal(body, &r) - if err != nil { - return nil, err - } - return &r, nil + schedulerTask.SendResponse(resp) } diff --git a/e621/scheduler.go b/e621/scheduler.go new file mode 100644 index 0000000..bb89fa6 --- /dev/null +++ b/e621/scheduler.go @@ -0,0 +1,61 @@ +package e621 + +import ( + "context" + "e621_to_neo4j/utils" + log "github.com/sirupsen/logrus" + "golang.org/x/time/rate" +) + +type Scheduler struct { + queue utils.Queue + limiter *rate.Limiter +} + +func NewScheduler() *Scheduler { + scheduler := &Scheduler{ + queue: utils.NewQueue(), + limiter: nil, + } + go scheduler.StartExecutionHandler() + return scheduler +} + +func Schedule[T utils.DataType](s *Scheduler, t utils.Task[T], username string, apiKey string) func() (T, error) { + channel := make(chan utils.DataResponse[T]) + schedulerTask := NewSchedulerTaskImpl[T](t, channel, username, apiKey) + log.Debug("Psuh task") + err := s.queue.Push(schedulerTask) + log.Debug("Element pushed") + + if err != nil { + return func() (T, error) { + var nil T + return nil, err + } + } + return func() (T, error) { + data := <-channel + return data.Data, data.Error + } +} + +func (s *Scheduler) SetLimiter(limiter *rate.Limiter) { + s.limiter = limiter +} + +func (s *Scheduler) StartExecutionHandler() { + for { + if s.limiter != nil { + s.limiter.Wait(context.Background()) + } + s.queue.WaitForElement() + log.Debug("element found") + task, err := s.queue.Pop() + if err != nil { + continue + } + ExecuteGetAPIRequest(task) + } + +} diff --git a/e621/schedulertask.go b/e621/schedulertask.go new file mode 100644 index 0000000..9cd6832 --- /dev/null +++ b/e621/schedulertask.go @@ -0,0 +1,42 @@ +package e621 + +import ( + "e621_to_neo4j/utils" + "net/http" +) + +type schedulerTaskImpl[T utils.DataType] struct { + task utils.Task[T] + channel chan utils.DataResponse[T] + username string + apiKey string +} + +func (s schedulerTaskImpl[T]) BasicAuth() (string, string) { + return s.username, s.apiKey +} + +func NewSchedulerTaskImpl[T utils.DataType](task utils.Task[T], channel chan utils.DataResponse[T], username string, apiKey string) utils.SchedulerTask { + return &schedulerTaskImpl[T]{ + task: task, + channel: channel, + username: username, + apiKey: apiKey, + } +} + +func (s schedulerTaskImpl[T]) SendError(err error) { + s.channel <- s.task.HandleError(err) +} + +func (s schedulerTaskImpl[T]) UriPath() string { + return s.task.UriPath() +} + +func (s schedulerTaskImpl[T]) SendStatusCode(statusCode int) { + s.channel <- s.task.HandleStatusCode(statusCode) +} + +func (s schedulerTaskImpl[T]) SendResponse(response *http.Response) { + s.channel <- s.task.HandleResponse(response) +} diff --git a/e621/task.go b/e621/task.go new file mode 100644 index 0000000..2f8397a --- /dev/null +++ b/e621/task.go @@ -0,0 +1,56 @@ +package e621 + +import ( + "e621_to_neo4j/utils" + "encoding/json" + "net/http" +) + +type e621APITask[T utils.DataType] struct { + uri string +} + +func (e e621APITask[T]) UriPath() string { + return e.uri +} + +func (e e621APITask[T]) HandleStatusCode(statusCode int) utils.DataResponse[T] { + var err error + switch statusCode { + case 421: + err = utils.RateLimitReachedError{} + case 424: + err = utils.InvalidParametersError{} + case 520: + err = utils.OriginConnectionTimeOutError{} + case 522: + err = utils.OriginConnectionTimeOutError{} + case 524: + err = utils.OriginConnectionTimeOutError{} + case 525: + err = utils.SSLHandshakeFailedError{} + default: + err = utils.StatusCodesToError(statusCode) + } + return utils.DataResponse[T]{Error: err} +} + +func (e e621APITask[T]) HandleResponse(responseData *http.Response) utils.DataResponse[T] { + var data T + err := json.NewDecoder(responseData.Body).Decode(&data) + defer responseData.Body.Close() + if err != nil { + return utils.DataResponse[T]{Error: err} + } + return utils.DataResponse[T]{Data: data} +} + +func (e e621APITask[T]) HandleError(error error) utils.DataResponse[T] { + return utils.DataResponse[T]{Error: error} +} + +func NewE621ApiTask[T utils.DataType](uri string) utils.Task[T] { + return &e621APITask[T]{ + uri: uri, + } +} diff --git a/e621/users.go b/e621/users.go index b431314..b99f2a9 100644 --- a/e621/users.go +++ b/e621/users.go @@ -3,19 +3,11 @@ package e621 import ( "e621_to_neo4j/e621/models" "fmt" - log "github.com/sirupsen/logrus" ) // GetUserInfo retrieves the users information from e621 API. -func (c *Client) GetUserInfo(username string) (models.E621User, error) { +func (c *Client) GetUserInfo(username string) func() (models.E621User, error) { URIPath := fmt.Sprintf("users/%s.json", username) - log.WithFields(log.Fields{ - "username": username, - "uri": URIPath, - }).Debug("Requesting API for user details") - user, err := ExecuteGetAPIRequest[models.E621User](c, URIPath) - if err != nil { - return models.E621User{}, err - } - return *user, nil + e621Task := NewE621ApiTask[models.E621User](URIPath) + return Schedule[models.E621User](c.scheduler, e621Task, c.username, c.apiKey) } diff --git a/runMemgraphDev.cmd b/runMemgraphDev.cmd new file mode 100644 index 0000000..4daae12 --- /dev/null +++ b/runMemgraphDev.cmd @@ -0,0 +1 @@ +docker run -it -p 7687:7687 -p 7444:7444 -p 3000:3000 -v mg_lib:/var/lib/memgraph memgraph/memgraph-platform \ No newline at end of file diff --git a/services/user.go b/services/manager.go similarity index 89% rename from services/user.go rename to services/manager.go index 9426441..84622ed 100644 --- a/services/user.go +++ b/services/manager.go @@ -12,9 +12,13 @@ import ( func ScrapeUser(ctx context.Context, graphConnection database.GraphConnection, e621Client e621.Client, username string) error { var err error + var page int64 = 1 + var allFavorites []models.Post - e621User, err := e621Client.GetUserInfo(username) + AwaitE621User := e621Client.GetUserInfo(username) + e621User, err := AwaitE621User() if err != nil { + log.Info(err) return err } @@ -42,21 +46,47 @@ func ScrapeUser(ctx context.Context, graphConnection database.GraphConnection, e "id": e621User.ID, }).Info("Getting favorites for user") start := time.Now() - userFavorites, err := e621Client.GetFavorites(e621User) + + for { + AwaitE621Favorites := e621Client.GetFavorites(ctx, e621User, page) + + log.WithFields(log.Fields{ + "id": e621User.ID, + "fav_page": page, + }).Debug("Requesting API for favorites") + + favoritesPage, err := AwaitE621Favorites() + if err != nil { + log.Printf(err.Error()) + } + + // Append the fetched posts to the result slice + allFavorites = append(allFavorites, favoritesPage.Posts...) + + // If no more posts are returned, return the accumulated favorites + if len(favoritesPage.Posts) == 0 { + break + } + + // Update the last post ID for the next page request + page += 1 + + } + if err != nil { log.Fatal(err) } log.WithFields(log.Fields{ "user": e621User.Name, "id": e621User.ID, - "post_amount": len(userFavorites), + "post_amount": len(allFavorites), "scrape_time": time.Since(start), }).Info("Getting favorites for user") startUploadPosts := time.Now() // Uploads all Tags, Posts as Nodes to Neo4j - for i, post := range userFavorites { + for i, post := range allFavorites { if exists, err := graphConnection.CheckUserToPostLink(ctx, post.ID, e621User.ID); err == nil && exists { log.WithFields(log.Fields{ "user": e621User.Name, @@ -77,7 +107,7 @@ func ScrapeUser(ctx context.Context, graphConnection database.GraphConnection, e "user": e621User.Name, "id": e621User.ID, "post_number": i, - "post_amount": len(userFavorites), + "post_amount": len(allFavorites), "post_id": post.ID, "upload_time": time.Since(start), }).Debug("Uploading post") @@ -122,7 +152,7 @@ func ScrapeUser(ctx context.Context, graphConnection database.GraphConnection, e "user": e621User.Name, "id": e621User.ID, "post_number": i, - "post_amount": len(userFavorites), + "post_amount": len(allFavorites), "post_id": post.ID, "upload_time": time.Since(start), }).Debug("Making relationship") @@ -258,7 +288,7 @@ func uploadCopyrightTagRelationship(ctx context.Context, graphConnection databas if err != nil { return err } - // log.Printf("Created PostToTagRelationship for post: %d to copyrigh tag: %s", post.ID, copyrightTag) + // log.Printf("Created PostToTagRelationship for post: %d to copyright tag: %s", post.ID, copyrightTag) } return nil diff --git a/utils/error.go b/utils/error.go new file mode 100644 index 0000000..784f81c --- /dev/null +++ b/utils/error.go @@ -0,0 +1,105 @@ +package utils + +import "fmt" + +func StatusCodesToError(statusCode int) error { + var err error + switch statusCode { + case 403: + err = AccessDeniedError{} + case 404: + err = NotFoundError{} + case 412: + err = PreconditionFailedError{} + case 421: + err = RateLimitReachedError{} + case 424: + err = InvalidParametersError{} + case 500: + err = InternalServerError{} + case 502: + err = BadGatewayError{} + case 503: + err = ServiceUnavailableError{} + default: + err = fmt.Errorf("unhandels status code: %d", statusCode) + } + return err +} + +type AccessDeniedError struct { +} + +func (_ AccessDeniedError) Error() string { + return "access denied" +} + +type NotFoundError struct { +} + +func (_ NotFoundError) Error() string { + return "not found" +} + +type PreconditionFailedError struct { +} + +func (_ PreconditionFailedError) Error() string { + return "precondition failed" +} + +type RateLimitReachedError struct { +} + +func (_ RateLimitReachedError) Error() string { + return "rate limit reached" +} + +type InvalidParametersError struct { +} + +func (_ InvalidParametersError) Error() string { + return "invalide parameters" +} + +type InternalServerError struct { +} + +func (_ InternalServerError) Error() string { + return "internal server error" +} + +type BadGatewayError struct { +} + +func (_ BadGatewayError) Error() string { + return "bad gateway" +} + +type ServiceUnavailableError struct { +} + +func (_ ServiceUnavailableError) Error() string { + return "service unavailable" +} + +type UnknownError struct { +} + +func (_ UnknownError) Error() string { + return "unknown error" +} + +type OriginConnectionTimeOutError struct { +} + +func (_ OriginConnectionTimeOutError) Error() string { + return "origin connection time-out" +} + +type SSLHandshakeFailedError struct { +} + +func (_ SSLHandshakeFailedError) Error() string { + return "ssl handshake failed" +} diff --git a/utils/queue.go b/utils/queue.go index a75e575..7b6f4f0 100644 --- a/utils/queue.go +++ b/utils/queue.go @@ -2,32 +2,43 @@ package utils import ( "errors" + log "github.com/sirupsen/logrus" ) -type Task struct { - URIPath string `json:"url,omitempty" :"url"` - Methode string `json:"method,omitempty" :"method"` - Channel chan any `:"channel"` -} - type Queue struct { - elements []Task + tasks []SchedulerTask + notifyChannel chan bool } -func (queue *Queue) Pop() (Task, error) { - if len(queue.elements) == 0 { - return Task{}, errors.New("try to remove an element of a empty queue") +func NewQueue() Queue { + return Queue{ + notifyChannel: make(chan bool), } - task := queue.elements[0] - queue.elements = queue.elements[1:] +} + +// WaitForElement need to be called everytime before Popping an Element! +// Also, it is required to have this function called in a separate go-routine because Push use the NotifyChannel in the +// routine. So the thread waits, till WaitForElement pulls the Item from the channel +// FIXME this should be fixed in future. require more discussion what is the best way. +func (queue *Queue) WaitForElement() { + log.Debug("waiting for element") + _ = <-queue.notifyChannel +} + +func (queue *Queue) Pop() (SchedulerTask, error) { + if len(queue.tasks) == 0 { + return nil, errors.New("try to remove an element of a empty queue") + } + task := queue.tasks[0] + queue.tasks = queue.tasks[1:] return task, nil } -func (queue *Queue) Push(task Task) error { - empty := Task{} - if task == empty { +func (queue *Queue) Push(task SchedulerTask) error { + if task == nil { return errors.New("try to add task but task is empty") } - queue.elements = append(queue.elements, task) + queue.tasks = append(queue.tasks, task) + queue.notifyChannel <- true return nil } diff --git a/utils/queue_test.go b/utils/queue_test.go index aaf1b43..aa323bf 100644 --- a/utils/queue_test.go +++ b/utils/queue_test.go @@ -1,45 +1,65 @@ package utils import ( + "net/http" "reflect" "testing" ) +type schedulerTaskImplDummy struct { +} + +func (s schedulerTaskImplDummy) BasicAuth() (string, string) { + return "", "" +} + +func (s schedulerTaskImplDummy) UriPath() string { + return "" +} + +func (s schedulerTaskImplDummy) SendError(_ error) { + return +} + +func (s schedulerTaskImplDummy) SendStatusCode(_ int) { + return +} + +func (s schedulerTaskImplDummy) SendResponse(_ *http.Response) { + return +} + func TestQueue_Pop(t *testing.T) { type fields struct { - elements []Task + elements []SchedulerTask } tests := []struct { name string fields fields - want Task + want SchedulerTask wantErr bool }{ { name: "Pop element of empty list", fields: fields{}, - want: Task{}, + want: nil, wantErr: true, }, { name: "Pop element of a filled list with three elements", - fields: fields{elements: []Task{ - {URIPath: "https://e621.net0....", Methode: "GET", Channel: nil}, - {URIPath: "https://e621.net1....", Methode: "GET", Channel: nil}, - {URIPath: "https://e621.net2....", Methode: "GET", Channel: nil}, + fields: fields{elements: []SchedulerTask{ + schedulerTaskImplDummy{}, + schedulerTaskImplDummy{}, + schedulerTaskImplDummy{}, }}, - want: Task{ - URIPath: "https://e621.net0....", - Methode: "GET", - Channel: nil, - }, + want: schedulerTaskImplDummy{}, wantErr: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { queue := &Queue{ - elements: tt.fields.elements, + tasks: tt.fields.elements, } got, err := queue.Pop() if (err != nil) != tt.wantErr { @@ -55,42 +75,32 @@ func TestQueue_Pop(t *testing.T) { func TestQueue_Push(t *testing.T) { t.Run("Push tasks to empty queue", func(t *testing.T) { - queue := Queue{elements: []Task{}} - task := Task{ - URIPath: "http://e621.net0....", - Methode: "GET", - Channel: nil, - } + queue := Queue{tasks: []SchedulerTask{}, notifyChannel: make(chan bool)} + go queue.WaitForElement() + task := schedulerTaskImplDummy{} err := queue.Push(task) if err != nil { t.Errorf("Push() error = %v", err) } - if len(queue.elements) != 1 { + if len(queue.tasks) != 1 { t.Errorf("Push() error = queue is not one") } - if queue.elements[0] != task { + if queue.tasks[0] != task { t.Errorf("Push() error = wrong queue task in queue") } }) t.Run("Push tasks to filled queue", func(t *testing.T) { - queue := Queue{elements: []Task{{ - URIPath: "http://e621.net0....", - Methode: "GET", - Channel: nil, - }}} - task := Task{ - URIPath: "http://e621.net1....", - Methode: "GET", - Channel: nil, - } + queue := Queue{tasks: []SchedulerTask{schedulerTaskImplDummy{}}, notifyChannel: make(chan bool)} + go queue.WaitForElement() + task := schedulerTaskImplDummy{} err := queue.Push(task) if err != nil { t.Errorf("Push() error = %v", err) } - if len(queue.elements) != 2 { + if len(queue.tasks) != 2 { t.Errorf("Push() error = queue is not two") } - if queue.elements[1] != task { + if queue.tasks[1] != task { t.Errorf("Push() error = wrong queue task in queue") } }) diff --git a/utils/scheduler.go b/utils/scheduler.go new file mode 100644 index 0000000..b59692d --- /dev/null +++ b/utils/scheduler.go @@ -0,0 +1,30 @@ +package utils + +import ( + "e621_to_neo4j/e621/models" + "net/http" +) + +type DataResponse[T DataType] struct { + Data T + Error error +} + +type DataType interface { + models.E621User | models.PostResponseWrapper +} + +type Task[T DataType] interface { + UriPath() string + HandleError(error error) DataResponse[T] + HandleStatusCode(statusCode int) DataResponse[T] + HandleResponse(responseData *http.Response) DataResponse[T] +} + +type SchedulerTask interface { + UriPath() string + SendStatusCode(statusCode int) + SendError(err error) + SendResponse(response *http.Response) + BasicAuth() (string, string) +}