This repository has been archived on 2024-07-22. You can view files and clone it, but cannot push or open issues or pull requests.
plug-sdk/pkg/plug/grpc.go

104 lines
2.8 KiB
Go

package plug
import (
"context"
"google.golang.org/protobuf/types/known/timestamppb"
"log"
"git.dragse.it/anthrove/otter-space-sdk/pkg/graph"
"git.dragse.it/anthrove/otter-space-sdk/pkg/models"
gRPC "git.dragse.it/anthrove/plug-sdk/pkg/grpc"
gonanoid "github.com/matoous/go-nanoid/v2"
)
type server struct {
gRPC.UnimplementedPlugConnectorServer
ctx map[string]context.CancelFunc
database graph.OtterSpace
taskExecutionFunction TaskExecution
}
func NewGrpcServer(database graph.OtterSpace, taskExecutionFunction TaskExecution) gRPC.PlugConnectorServer {
return &server{
ctx: make(map[string]context.CancelFunc),
database: database,
taskExecutionFunction: taskExecutionFunction,
}
}
func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) (*gRPC.PlugTaskStatus, error) {
var anthroveUser models.AnthroveUser
var plugTaskState gRPC.PlugTaskStatus
var err error
id, err := gonanoid.New()
if err != nil {
return nil, err
}
plugTaskState.TaskId = id
plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING
anthroveUser.UserID = models.AnthroveUserID(creation.UserId)
// gRPC closes the context after the call ended. So the whole scrapping stopped without waiting
// by using this method we assign a new context to each new request we get.
// This can be used for example to close the context with the given id
ctx, cancel := context.WithCancel(context.Background())
s.ctx[id] = cancel
go func() {
// FIXME: better implement this methode, works for now but needs refactoring
err := s.taskExecutionFunction(ctx, s.database, creation.UserSourceName, anthroveUser, creation.DeepScrape, func() {
s.removeTask(id)
})
if err != nil {
log.Print(err)
}
}()
return &plugTaskState, nil
}
func (s *server) TaskStatus(_ context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) {
var plugTaskState gRPC.PlugTaskStatus
_, found := s.ctx[task.TaskId]
plugTaskState.TaskId = task.TaskId
plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING
if !found {
plugTaskState.TaskState = gRPC.PlugTaskState_UNKNOWN
}
return &plugTaskState, nil
}
func (s *server) TaskCancel(_ context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) {
var plugTaskState gRPC.PlugTaskStatus
plugTaskState.TaskState = gRPC.PlugTaskState_STOPPED
plugTaskState.TaskId = task.TaskId
s.removeTask(task.TaskId)
return &plugTaskState, nil
}
func (s *server) removeTask(taskID string) {
fn, exists := s.ctx[taskID]
if !exists {
return
}
fn()
delete(s.ctx, taskID)
}
func (s *server) Ping(_ context.Context, request *gRPC.PingRequest) (*gRPC.PongResponse, error) {
response := gRPC.PongResponse{
Message: request.Message,
Timestamp: timestamppb.Now(),
}
return &response, nil
}