package plug import ( "context" "google.golang.org/protobuf/types/known/timestamppb" "log" "git.dragse.it/anthrove/otter-space-sdk/pkg/graph" "git.dragse.it/anthrove/otter-space-sdk/pkg/models" gRPC "git.dragse.it/anthrove/plug-sdk/pkg/grpc" gonanoid "github.com/matoous/go-nanoid/v2" ) type server struct { gRPC.UnimplementedPlugConnectorServer ctx map[string]context.CancelFunc database graph.OtterSpace taskExecutionFunction TaskExecution } func NewGrpcServer(database graph.OtterSpace, taskExecutionFunction TaskExecution) gRPC.PlugConnectorServer { return &server{ ctx: make(map[string]context.CancelFunc), database: database, taskExecutionFunction: taskExecutionFunction, } } func (s *server) TaskStart(ctx context.Context, creation *gRPC.PlugTaskCreation) (*gRPC.PlugTaskStatus, error) { var anthroveUser models.AnthroveUser var plugTaskState gRPC.PlugTaskStatus var err error id, err := gonanoid.New() if err != nil { return nil, err } plugTaskState.TaskId = id plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING anthroveUser.UserID = models.AnthroveUserID(creation.UserId) // gRPC closes the context after the call ended. So the whole scrapping stopped without waiting // by using this method we assign a new context to each new request we get. // This can be used for example to close the context with the given id ctx, cancel := context.WithCancel(context.Background()) s.ctx[id] = cancel go func() { // FIXME: better implement this methode, works for now but needs refactoring err := s.taskExecutionFunction(ctx, s.database, creation.UserSourceName, anthroveUser, creation.DeepScrape, func() { s.removeTask(id) }) if err != nil { log.Print(err) } }() return &plugTaskState, nil } func (s *server) TaskStatus(_ context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) { var plugTaskState gRPC.PlugTaskStatus _, found := s.ctx[task.TaskId] plugTaskState.TaskId = task.TaskId plugTaskState.TaskState = gRPC.PlugTaskState_RUNNING if !found { plugTaskState.TaskState = gRPC.PlugTaskState_UNKNOWN } return &plugTaskState, nil } func (s *server) TaskCancel(_ context.Context, task *gRPC.PlugTask) (*gRPC.PlugTaskStatus, error) { var plugTaskState gRPC.PlugTaskStatus plugTaskState.TaskState = gRPC.PlugTaskState_STOPPED plugTaskState.TaskId = task.TaskId s.removeTask(task.TaskId) return &plugTaskState, nil } func (s *server) removeTask(taskID string) { fn, exists := s.ctx[taskID] if !exists { return } fn() delete(s.ctx, taskID) } func (s *server) Ping(_ context.Context, request *gRPC.PingRequest) (*gRPC.PongResponse, error) { response := gRPC.PongResponse{ Message: request.Message, Timestamp: timestamppb.Now(), } return &response, nil }