2024-02-20 18:11:34 +00:00
|
|
|
package plug
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2024-07-08 12:56:17 +00:00
|
|
|
"log"
|
|
|
|
|
2024-07-06 22:26:24 +00:00
|
|
|
"git.dragse.it/anthrove/otter-space-sdk/v2/pkg/database"
|
|
|
|
"git.dragse.it/anthrove/otter-space-sdk/v2/pkg/models"
|
2024-03-05 20:40:42 +00:00
|
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
2024-02-20 18:11:34 +00:00
|
|
|
|
|
|
|
gRPC "git.dragse.it/anthrove/plug-sdk/pkg/grpc"
|
|
|
|
gonanoid "github.com/matoous/go-nanoid/v2"
|
|
|
|
)
|
|
|
|
|
|
|
|
type server struct {
|
|
|
|
gRPC.UnimplementedPlugConnectorServer
|
2024-02-20 20:12:49 +00:00
|
|
|
ctx map[string]context.CancelFunc
|
2024-07-06 22:26:24 +00:00
|
|
|
database database.OtterSpace
|
2024-02-20 20:12:49 +00:00
|
|
|
taskExecutionFunction TaskExecution
|
2024-07-08 12:56:17 +00:00
|
|
|
sendMessageExecution SendMessageExecution
|
2024-02-20 18:11:34 +00:00
|
|
|
}
|
|
|
|
|
2024-07-08 12:56:17 +00:00
|
|
|
func NewGrpcServer(database database.OtterSpace, taskExecutionFunction TaskExecution, sendMessageExecution SendMessageExecution) gRPC.PlugConnectorServer {
|
2024-02-20 18:11:34 +00:00
|
|
|
return &server{
|
2024-02-20 20:12:49 +00:00
|
|
|
ctx: make(map[string]context.CancelFunc),
|
|
|
|
database: database,
|
|
|
|
taskExecutionFunction: taskExecutionFunction,
|
2024-07-08 12:56:17 +00:00
|
|
|
sendMessageExecution: sendMessageExecution,
|
2024-02-20 18:11:34 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) (*gRPC.PlugTaskStatus, error) {
|
2024-07-06 22:26:24 +00:00
|
|
|
var anthroveUser models.User
|
2024-02-20 18:11:34 +00:00
|
|
|
var plugTaskState gRPC.PlugTaskStatus
|
|
|
|
var err error
|
|
|
|
|
|
|
|
id, err := gonanoid.New()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
plugTaskState.TaskId = id
|
|
|
|
plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING
|
|
|
|
|
2024-07-06 22:26:24 +00:00
|
|
|
anthroveUser.ID = models.AnthroveUserID(creation.UserId)
|
2024-02-20 18:11:34 +00:00
|
|
|
|
|
|
|
// gRPC closes the context after the call ended. So the whole scrapping stopped without waiting
|
|
|
|
// by using this method we assign a new context to each new request we get.
|
|
|
|
// This can be used for example to close the context with the given id
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
s.ctx[id] = cancel
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
// FIXME: better implement this methode, works for now but needs refactoring
|
2024-07-08 12:56:17 +00:00
|
|
|
err := s.taskExecutionFunction(ctx, s.database, creation.UserSourceName, anthroveUser, creation.DeepScrape, creation.ApiKey, func() {
|
2024-02-20 18:11:34 +00:00
|
|
|
s.removeTask(id)
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
log.Print(err)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
return &plugTaskState, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *server) TaskStatus(_ context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) {
|
|
|
|
var plugTaskState gRPC.PlugTaskStatus
|
|
|
|
|
|
|
|
_, found := s.ctx[task.TaskId]
|
|
|
|
plugTaskState.TaskId = task.TaskId
|
|
|
|
|
|
|
|
plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING
|
|
|
|
|
|
|
|
if !found {
|
|
|
|
plugTaskState.TaskState = gRPC.PlugTaskState_UNKNOWN
|
|
|
|
}
|
|
|
|
return &plugTaskState, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *server) TaskCancel(_ context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) {
|
|
|
|
var plugTaskState gRPC.PlugTaskStatus
|
|
|
|
|
|
|
|
plugTaskState.TaskState = gRPC.PlugTaskState_STOPPED
|
|
|
|
plugTaskState.TaskId = task.TaskId
|
|
|
|
|
|
|
|
s.removeTask(task.TaskId)
|
|
|
|
|
|
|
|
return &plugTaskState, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *server) removeTask(taskID string) {
|
|
|
|
fn, exists := s.ctx[taskID]
|
|
|
|
if !exists {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
fn()
|
|
|
|
delete(s.ctx, taskID)
|
|
|
|
}
|
2024-03-05 20:40:42 +00:00
|
|
|
|
|
|
|
func (s *server) Ping(_ context.Context, request *gRPC.PingRequest) (*gRPC.PongResponse, error) {
|
|
|
|
response := gRPC.PongResponse{
|
|
|
|
Message: request.Message,
|
|
|
|
Timestamp: timestamppb.Now(),
|
|
|
|
}
|
|
|
|
return &response, nil
|
|
|
|
}
|
2024-07-08 12:56:17 +00:00
|
|
|
|
|
|
|
func (s *server) SendMessage(ctx context.Context, request *gRPC.SendMessageRequest) (*gRPC.SendMessageResponse, error) {
|
|
|
|
messageResponse := gRPC.SendMessageResponse{Success: true}
|
|
|
|
|
|
|
|
err := s.sendMessageExecution(ctx, request.UserId, request.Message)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return &messageResponse, nil
|
|
|
|
}
|