api-call-system-#10 (#13)

Co-authored-by: Fenpaws <soxx@fenpa.ws>
Reviewed-on: anthrove/e621-to-graph#13
Reviewed-by: SoXX <fenpaws@noreply.localhost>
Co-authored-by: Lennard Brinkhaus <lennard.brinkhaus@noreply.localhost>
Co-committed-by: Lennard Brinkhaus <lennard.brinkhaus@noreply.localhost>
This commit is contained in:
Lennard Brinkhaus 2023-07-17 08:10:13 +00:00 committed by SoXX
parent f4b73034b8
commit 31bd0cf639
15 changed files with 441 additions and 138 deletions

View File

@ -6,12 +6,14 @@ import (
"e621_to_neo4j/e621"
"e621_to_neo4j/services"
"fmt"
log "github.com/sirupsen/logrus"
"net/http"
)
// UserHandler is the handler for the user API
func UserHandler(ctx context.Context, graphConnection database.GraphConnection, e621Client *e621.Client) func(response http.ResponseWriter, request *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
log.Println("Request")
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
fmt.Fprintf(w, "Only POST requests are allowed")

View File

@ -12,6 +12,10 @@ type neo4jConnection struct {
driver neo4j.DriverWithContext
}
func NewNeo4JConnection() database.GraphConnection {
return &neo4jConnection{}
}
func (c *neo4jConnection) CheckUserToPostLink(ctx context.Context, e621PostID int64, e621UserID int64) (bool, error) {
return RelationshipCheckUserToPost(ctx, c.driver, e621PostID, e621UserID)
}
@ -57,10 +61,6 @@ func (c *neo4jConnection) Connect(ctx context.Context, endpoint string, username
return nil
}
func NewNeo4JConnection() database.GraphConnection {
return &neo4jConnection{}
}
func useConsoleLogger(level neo4j.LogLevel) func(config *config.Config) {
return func(config *config.Config) {
config.Log = neo4j.ConsoleLogger(level)

View File

@ -1,10 +1,6 @@
package e621
import (
"e621_to_neo4j/utils"
"golang.org/x/time/rate"
"net/http"
)
import "golang.org/x/time/rate"
const (
baseURL = "https://e621.net"
@ -14,18 +10,16 @@ const (
type Client struct {
apiKey string
username string
client *http.Client
limiter *rate.Limiter
queue *utils.Queue
scheduler *Scheduler
}
// NewClient creates a new e621 API client.
func NewClient(apiKey string, username string) *Client {
scheduler := NewScheduler()
scheduler.SetLimiter(rate.NewLimiter(1, 2))
return &Client{
apiKey: apiKey,
username: username,
client: &http.Client{},
limiter: rate.NewLimiter(1, 2),
queue: &utils.Queue{},
scheduler: scheduler,
}
}

View File

@ -1,39 +1,14 @@
package e621
import (
"context"
"e621_to_neo4j/e621/models"
"fmt"
log "github.com/sirupsen/logrus"
)
// GetFavorites retrieves all favorites from the e621 API.
func (c *Client) GetFavorites(user models.E621User) ([]models.Post, error) {
page := 1
var allFavorites []models.Post
var URIPath string
for {
URIPath = fmt.Sprintf("favorites.json?user_id=%d&limit=%d&page=%d", user.ID, 320, page)
log.WithFields(log.Fields{
"id": user.ID,
"fav_page": page,
"uri": URIPath,
}).Debug("Requesting API for favorites")
favorite, err := ExecuteGetAPIRequest[models.PostResponseWrapper](c, URIPath)
if err != nil {
log.Printf(err.Error())
}
// Append the fetched posts to the result slice
allFavorites = append(allFavorites, favorite.Posts...)
// If no more posts are returned, return the accumulated favorites
if len(favorite.Posts) == 0 {
return allFavorites, nil
}
// Update the last post ID for the next page request
page += 1
}
func (c *Client) GetFavorites(_ context.Context, user models.E621User, page int64) func() (models.PostResponseWrapper, error) {
URIPath := fmt.Sprintf("favorites.json?user_id=%d&limit=%d&page=%d", user.ID, 320, page)
e621Task := NewE621ApiTask[models.PostResponseWrapper](URIPath)
return Schedule[models.PostResponseWrapper](c.scheduler, e621Task, c.username, c.apiKey)
}

View File

@ -1,45 +1,39 @@
package e621
import (
"context"
"encoding/json"
"e621_to_neo4j/utils"
"fmt"
"io"
log "github.com/sirupsen/logrus"
"net/http"
)
func ExecuteGetAPIRequest[dataType any](c *Client, URIPath string) (*dataType, error) {
func ExecuteGetAPIRequest(schedulerTask utils.SchedulerTask) {
var err error
c.limiter.Wait(context.Background())
url := fmt.Sprintf("%s/%s", baseURL, URIPath)
log.Debug("executing scheduler task")
url := fmt.Sprintf("%s/%s", baseURL, schedulerTask.UriPath())
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
schedulerTask.SendError(err)
return
}
req.Header.Set("User-Agent", "FavGetter (by Selloo)")
req.Header.Add("Accept", "application/json")
req.SetBasicAuth(c.username, c.apiKey)
req.SetBasicAuth(schedulerTask.BasicAuth())
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
client := http.Client{}
body, err := io.ReadAll(resp.Body)
resp, err := client.Do(req)
if err != nil {
return nil, err
schedulerTask.SendStatusCode(resp.StatusCode)
return
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to retrieve posts: %s", resp.Status)
schedulerTask.SendStatusCode(resp.StatusCode)
return
}
var r dataType
err = json.Unmarshal(body, &r)
if err != nil {
return nil, err
}
return &r, nil
schedulerTask.SendResponse(resp)
}

61
e621/scheduler.go Normal file
View File

@ -0,0 +1,61 @@
package e621
import (
"context"
"e621_to_neo4j/utils"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
)
type Scheduler struct {
queue utils.Queue
limiter *rate.Limiter
}
func NewScheduler() *Scheduler {
scheduler := &Scheduler{
queue: utils.NewQueue(),
limiter: nil,
}
go scheduler.StartExecutionHandler()
return scheduler
}
func Schedule[T utils.DataType](s *Scheduler, t utils.Task[T], username string, apiKey string) func() (T, error) {
channel := make(chan utils.DataResponse[T])
schedulerTask := NewSchedulerTaskImpl[T](t, channel, username, apiKey)
log.Debug("Psuh task")
err := s.queue.Push(schedulerTask)
log.Debug("Element pushed")
if err != nil {
return func() (T, error) {
var nil T
return nil, err
}
}
return func() (T, error) {
data := <-channel
return data.Data, data.Error
}
}
func (s *Scheduler) SetLimiter(limiter *rate.Limiter) {
s.limiter = limiter
}
func (s *Scheduler) StartExecutionHandler() {
for {
if s.limiter != nil {
s.limiter.Wait(context.Background())
}
s.queue.WaitForElement()
log.Debug("element found")
task, err := s.queue.Pop()
if err != nil {
continue
}
ExecuteGetAPIRequest(task)
}
}

42
e621/schedulertask.go Normal file
View File

@ -0,0 +1,42 @@
package e621
import (
"e621_to_neo4j/utils"
"net/http"
)
type schedulerTaskImpl[T utils.DataType] struct {
task utils.Task[T]
channel chan utils.DataResponse[T]
username string
apiKey string
}
func (s schedulerTaskImpl[T]) BasicAuth() (string, string) {
return s.username, s.apiKey
}
func NewSchedulerTaskImpl[T utils.DataType](task utils.Task[T], channel chan utils.DataResponse[T], username string, apiKey string) utils.SchedulerTask {
return &schedulerTaskImpl[T]{
task: task,
channel: channel,
username: username,
apiKey: apiKey,
}
}
func (s schedulerTaskImpl[T]) SendError(err error) {
s.channel <- s.task.HandleError(err)
}
func (s schedulerTaskImpl[T]) UriPath() string {
return s.task.UriPath()
}
func (s schedulerTaskImpl[T]) SendStatusCode(statusCode int) {
s.channel <- s.task.HandleStatusCode(statusCode)
}
func (s schedulerTaskImpl[T]) SendResponse(response *http.Response) {
s.channel <- s.task.HandleResponse(response)
}

56
e621/task.go Normal file
View File

@ -0,0 +1,56 @@
package e621
import (
"e621_to_neo4j/utils"
"encoding/json"
"net/http"
)
type e621APITask[T utils.DataType] struct {
uri string
}
func (e e621APITask[T]) UriPath() string {
return e.uri
}
func (e e621APITask[T]) HandleStatusCode(statusCode int) utils.DataResponse[T] {
var err error
switch statusCode {
case 421:
err = utils.RateLimitReachedError{}
case 424:
err = utils.InvalidParametersError{}
case 520:
err = utils.OriginConnectionTimeOutError{}
case 522:
err = utils.OriginConnectionTimeOutError{}
case 524:
err = utils.OriginConnectionTimeOutError{}
case 525:
err = utils.SSLHandshakeFailedError{}
default:
err = utils.StatusCodesToError(statusCode)
}
return utils.DataResponse[T]{Error: err}
}
func (e e621APITask[T]) HandleResponse(responseData *http.Response) utils.DataResponse[T] {
var data T
err := json.NewDecoder(responseData.Body).Decode(&data)
defer responseData.Body.Close()
if err != nil {
return utils.DataResponse[T]{Error: err}
}
return utils.DataResponse[T]{Data: data}
}
func (e e621APITask[T]) HandleError(error error) utils.DataResponse[T] {
return utils.DataResponse[T]{Error: error}
}
func NewE621ApiTask[T utils.DataType](uri string) utils.Task[T] {
return &e621APITask[T]{
uri: uri,
}
}

View File

@ -3,19 +3,11 @@ package e621
import (
"e621_to_neo4j/e621/models"
"fmt"
log "github.com/sirupsen/logrus"
)
// GetUserInfo retrieves the users information from e621 API.
func (c *Client) GetUserInfo(username string) (models.E621User, error) {
func (c *Client) GetUserInfo(username string) func() (models.E621User, error) {
URIPath := fmt.Sprintf("users/%s.json", username)
log.WithFields(log.Fields{
"username": username,
"uri": URIPath,
}).Debug("Requesting API for user details")
user, err := ExecuteGetAPIRequest[models.E621User](c, URIPath)
if err != nil {
return models.E621User{}, err
}
return *user, nil
e621Task := NewE621ApiTask[models.E621User](URIPath)
return Schedule[models.E621User](c.scheduler, e621Task, c.username, c.apiKey)
}

1
runMemgraphDev.cmd Normal file
View File

@ -0,0 +1 @@
docker run -it -p 7687:7687 -p 7444:7444 -p 3000:3000 -v mg_lib:/var/lib/memgraph memgraph/memgraph-platform

View File

@ -12,9 +12,13 @@ import (
func ScrapeUser(ctx context.Context, graphConnection database.GraphConnection, e621Client e621.Client, username string) error {
var err error
var page int64 = 1
var allFavorites []models.Post
e621User, err := e621Client.GetUserInfo(username)
AwaitE621User := e621Client.GetUserInfo(username)
e621User, err := AwaitE621User()
if err != nil {
log.Info(err)
return err
}
@ -42,21 +46,47 @@ func ScrapeUser(ctx context.Context, graphConnection database.GraphConnection, e
"id": e621User.ID,
}).Info("Getting favorites for user")
start := time.Now()
userFavorites, err := e621Client.GetFavorites(e621User)
for {
AwaitE621Favorites := e621Client.GetFavorites(ctx, e621User, page)
log.WithFields(log.Fields{
"id": e621User.ID,
"fav_page": page,
}).Debug("Requesting API for favorites")
favoritesPage, err := AwaitE621Favorites()
if err != nil {
log.Printf(err.Error())
}
// Append the fetched posts to the result slice
allFavorites = append(allFavorites, favoritesPage.Posts...)
// If no more posts are returned, return the accumulated favorites
if len(favoritesPage.Posts) == 0 {
break
}
// Update the last post ID for the next page request
page += 1
}
if err != nil {
log.Fatal(err)
}
log.WithFields(log.Fields{
"user": e621User.Name,
"id": e621User.ID,
"post_amount": len(userFavorites),
"post_amount": len(allFavorites),
"scrape_time": time.Since(start),
}).Info("Getting favorites for user")
startUploadPosts := time.Now()
// Uploads all Tags, Posts as Nodes to Neo4j
for i, post := range userFavorites {
for i, post := range allFavorites {
if exists, err := graphConnection.CheckUserToPostLink(ctx, post.ID, e621User.ID); err == nil && exists {
log.WithFields(log.Fields{
"user": e621User.Name,
@ -77,7 +107,7 @@ func ScrapeUser(ctx context.Context, graphConnection database.GraphConnection, e
"user": e621User.Name,
"id": e621User.ID,
"post_number": i,
"post_amount": len(userFavorites),
"post_amount": len(allFavorites),
"post_id": post.ID,
"upload_time": time.Since(start),
}).Debug("Uploading post")
@ -122,7 +152,7 @@ func ScrapeUser(ctx context.Context, graphConnection database.GraphConnection, e
"user": e621User.Name,
"id": e621User.ID,
"post_number": i,
"post_amount": len(userFavorites),
"post_amount": len(allFavorites),
"post_id": post.ID,
"upload_time": time.Since(start),
}).Debug("Making relationship")
@ -258,7 +288,7 @@ func uploadCopyrightTagRelationship(ctx context.Context, graphConnection databas
if err != nil {
return err
}
// log.Printf("Created PostToTagRelationship for post: %d to copyrigh tag: %s", post.ID, copyrightTag)
// log.Printf("Created PostToTagRelationship for post: %d to copyright tag: %s", post.ID, copyrightTag)
}
return nil

105
utils/error.go Normal file
View File

@ -0,0 +1,105 @@
package utils
import "fmt"
func StatusCodesToError(statusCode int) error {
var err error
switch statusCode {
case 403:
err = AccessDeniedError{}
case 404:
err = NotFoundError{}
case 412:
err = PreconditionFailedError{}
case 421:
err = RateLimitReachedError{}
case 424:
err = InvalidParametersError{}
case 500:
err = InternalServerError{}
case 502:
err = BadGatewayError{}
case 503:
err = ServiceUnavailableError{}
default:
err = fmt.Errorf("unhandels status code: %d", statusCode)
}
return err
}
type AccessDeniedError struct {
}
func (_ AccessDeniedError) Error() string {
return "access denied"
}
type NotFoundError struct {
}
func (_ NotFoundError) Error() string {
return "not found"
}
type PreconditionFailedError struct {
}
func (_ PreconditionFailedError) Error() string {
return "precondition failed"
}
type RateLimitReachedError struct {
}
func (_ RateLimitReachedError) Error() string {
return "rate limit reached"
}
type InvalidParametersError struct {
}
func (_ InvalidParametersError) Error() string {
return "invalide parameters"
}
type InternalServerError struct {
}
func (_ InternalServerError) Error() string {
return "internal server error"
}
type BadGatewayError struct {
}
func (_ BadGatewayError) Error() string {
return "bad gateway"
}
type ServiceUnavailableError struct {
}
func (_ ServiceUnavailableError) Error() string {
return "service unavailable"
}
type UnknownError struct {
}
func (_ UnknownError) Error() string {
return "unknown error"
}
type OriginConnectionTimeOutError struct {
}
func (_ OriginConnectionTimeOutError) Error() string {
return "origin connection time-out"
}
type SSLHandshakeFailedError struct {
}
func (_ SSLHandshakeFailedError) Error() string {
return "ssl handshake failed"
}

View File

@ -2,32 +2,43 @@ package utils
import (
"errors"
log "github.com/sirupsen/logrus"
)
type Task struct {
URIPath string `json:"url,omitempty" :"url"`
Methode string `json:"method,omitempty" :"method"`
Channel chan any `:"channel"`
}
type Queue struct {
elements []Task
tasks []SchedulerTask
notifyChannel chan bool
}
func (queue *Queue) Pop() (Task, error) {
if len(queue.elements) == 0 {
return Task{}, errors.New("try to remove an element of a empty queue")
func NewQueue() Queue {
return Queue{
notifyChannel: make(chan bool),
}
task := queue.elements[0]
queue.elements = queue.elements[1:]
}
// WaitForElement need to be called everytime before Popping an Element!
// Also, it is required to have this function called in a separate go-routine because Push use the NotifyChannel in the
// routine. So the thread waits, till WaitForElement pulls the Item from the channel
// FIXME this should be fixed in future. require more discussion what is the best way.
func (queue *Queue) WaitForElement() {
log.Debug("waiting for element")
_ = <-queue.notifyChannel
}
func (queue *Queue) Pop() (SchedulerTask, error) {
if len(queue.tasks) == 0 {
return nil, errors.New("try to remove an element of a empty queue")
}
task := queue.tasks[0]
queue.tasks = queue.tasks[1:]
return task, nil
}
func (queue *Queue) Push(task Task) error {
empty := Task{}
if task == empty {
func (queue *Queue) Push(task SchedulerTask) error {
if task == nil {
return errors.New("try to add task but task is empty")
}
queue.elements = append(queue.elements, task)
queue.tasks = append(queue.tasks, task)
queue.notifyChannel <- true
return nil
}

View File

@ -1,45 +1,65 @@
package utils
import (
"net/http"
"reflect"
"testing"
)
type schedulerTaskImplDummy struct {
}
func (s schedulerTaskImplDummy) BasicAuth() (string, string) {
return "", ""
}
func (s schedulerTaskImplDummy) UriPath() string {
return ""
}
func (s schedulerTaskImplDummy) SendError(_ error) {
return
}
func (s schedulerTaskImplDummy) SendStatusCode(_ int) {
return
}
func (s schedulerTaskImplDummy) SendResponse(_ *http.Response) {
return
}
func TestQueue_Pop(t *testing.T) {
type fields struct {
elements []Task
elements []SchedulerTask
}
tests := []struct {
name string
fields fields
want Task
want SchedulerTask
wantErr bool
}{
{
name: "Pop element of empty list",
fields: fields{},
want: Task{},
want: nil,
wantErr: true,
},
{
name: "Pop element of a filled list with three elements",
fields: fields{elements: []Task{
{URIPath: "https://e621.net0....", Methode: "GET", Channel: nil},
{URIPath: "https://e621.net1....", Methode: "GET", Channel: nil},
{URIPath: "https://e621.net2....", Methode: "GET", Channel: nil},
fields: fields{elements: []SchedulerTask{
schedulerTaskImplDummy{},
schedulerTaskImplDummy{},
schedulerTaskImplDummy{},
}},
want: Task{
URIPath: "https://e621.net0....",
Methode: "GET",
Channel: nil,
},
want: schedulerTaskImplDummy{},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
queue := &Queue{
elements: tt.fields.elements,
tasks: tt.fields.elements,
}
got, err := queue.Pop()
if (err != nil) != tt.wantErr {
@ -55,42 +75,32 @@ func TestQueue_Pop(t *testing.T) {
func TestQueue_Push(t *testing.T) {
t.Run("Push tasks to empty queue", func(t *testing.T) {
queue := Queue{elements: []Task{}}
task := Task{
URIPath: "http://e621.net0....",
Methode: "GET",
Channel: nil,
}
queue := Queue{tasks: []SchedulerTask{}, notifyChannel: make(chan bool)}
go queue.WaitForElement()
task := schedulerTaskImplDummy{}
err := queue.Push(task)
if err != nil {
t.Errorf("Push() error = %v", err)
}
if len(queue.elements) != 1 {
if len(queue.tasks) != 1 {
t.Errorf("Push() error = queue is not one")
}
if queue.elements[0] != task {
if queue.tasks[0] != task {
t.Errorf("Push() error = wrong queue task in queue")
}
})
t.Run("Push tasks to filled queue", func(t *testing.T) {
queue := Queue{elements: []Task{{
URIPath: "http://e621.net0....",
Methode: "GET",
Channel: nil,
}}}
task := Task{
URIPath: "http://e621.net1....",
Methode: "GET",
Channel: nil,
}
queue := Queue{tasks: []SchedulerTask{schedulerTaskImplDummy{}}, notifyChannel: make(chan bool)}
go queue.WaitForElement()
task := schedulerTaskImplDummy{}
err := queue.Push(task)
if err != nil {
t.Errorf("Push() error = %v", err)
}
if len(queue.elements) != 2 {
if len(queue.tasks) != 2 {
t.Errorf("Push() error = queue is not two")
}
if queue.elements[1] != task {
if queue.tasks[1] != task {
t.Errorf("Push() error = wrong queue task in queue")
}
})

30
utils/scheduler.go Normal file
View File

@ -0,0 +1,30 @@
package utils
import (
"e621_to_neo4j/e621/models"
"net/http"
)
type DataResponse[T DataType] struct {
Data T
Error error
}
type DataType interface {
models.E621User | models.PostResponseWrapper
}
type Task[T DataType] interface {
UriPath() string
HandleError(error error) DataResponse[T]
HandleStatusCode(statusCode int) DataResponse[T]
HandleResponse(responseData *http.Response) DataResponse[T]
}
type SchedulerTask interface {
UriPath() string
SendStatusCode(statusCode int)
SendError(err error)
SendResponse(response *http.Response)
BasicAuth() (string, string)
}