package plug import ( "context" "log" "git.dragse.it/anthrove/otter-space-sdk/v2/pkg/database" "git.dragse.it/anthrove/otter-space-sdk/v2/pkg/models" "google.golang.org/protobuf/types/known/timestamppb" gRPC "git.dragse.it/anthrove/plug-sdk/v2/pkg/grpc" gonanoid "github.com/matoous/go-nanoid/v2" ) type server struct { gRPC.UnimplementedPlugConnectorServer ctx map[string]context.CancelFunc database database.OtterSpace taskExecutionFunction TaskExecution sendMessageExecution SendMessageExecution getMessageExecution GetMessageExecution } func NewGrpcServer(database database.OtterSpace, taskExecutionFunction TaskExecution, sendMessageExecution SendMessageExecution, getMessageExecution GetMessageExecution) gRPC.PlugConnectorServer { return &server{ ctx: make(map[string]context.CancelFunc), database: database, taskExecutionFunction: taskExecutionFunction, sendMessageExecution: sendMessageExecution, getMessageExecution: getMessageExecution, } } func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) (*gRPC.PlugTaskStatus, error) { var anthroveUser models.User var plugTaskState gRPC.PlugTaskStatus var err error id, err := gonanoid.New() if err != nil { return nil, err } plugTaskState.TaskId = id plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING anthroveUser.ID = models.AnthroveUserID(creation.UserId) // gRPC closes the context after the call ended. So the whole scrapping stopped without waiting // by using this method we assign a new context to each new request we get. // This can be used for example to close the context with the given id ctx, cancel := context.WithCancel(context.Background()) s.ctx[id] = cancel go func() { // FIXME: better implement this methode, works for now but needs refactoring err := s.taskExecutionFunction(ctx, s.database, creation.UserSourceName, anthroveUser, creation.DeepScrape, creation.ApiKey, func() { s.removeTask(id) }) if err != nil { log.Print(err) } }() return &plugTaskState, nil } func (s *server) TaskStatus(_ context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) { var plugTaskState gRPC.PlugTaskStatus _, found := s.ctx[task.TaskId] plugTaskState.TaskId = task.TaskId plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING if !found { plugTaskState.TaskState = gRPC.PlugTaskState_UNKNOWN } return &plugTaskState, nil } func (s *server) TaskCancel(_ context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) { var plugTaskState gRPC.PlugTaskStatus plugTaskState.TaskState = gRPC.PlugTaskState_STOPPED plugTaskState.TaskId = task.TaskId s.removeTask(task.TaskId) return &plugTaskState, nil } func (s *server) removeTask(taskID string) { fn, exists := s.ctx[taskID] if !exists { return } fn() delete(s.ctx, taskID) } func (s *server) Ping(_ context.Context, request *gRPC.PingRequest) (*gRPC.PongResponse, error) { response := gRPC.PongResponse{ Message: request.Message, Timestamp: timestamppb.Now(), } return &response, nil } func (s *server) SendMessage(ctx context.Context, request *gRPC.SendMessageRequest) (*gRPC.SendMessageResponse, error) { messageResponse := gRPC.SendMessageResponse{Success: true} err := s.sendMessageExecution(ctx, request.UserSourceId, request.UserSourceName, request.Message) if err != nil { return nil, err } return &messageResponse, nil } func (s *server) GetUserMessages(ctx context.Context, request *gRPC.GetMessagesRequest) (*gRPC.GetMessagesResponse, error) { messageResponse := gRPC.GetMessagesResponse{} messages, err := s.getMessageExecution(ctx, request.UserSourceId, request.UserSourceName) if err != nil { return nil, err } for _, message := range messages { messageResponse.Messages = append(messageResponse.Messages, &gRPC.Message{ FromUserSourceId: request.UserSourceId, FromUserSourceName: request.UserSourceName, CreatedAt: message.createdAt, Body: message.body, Title: message.title, }) } return &messageResponse, nil }