Commit 438c6a5f authored by Loïck Bonniot's avatar Loïck Bonniot

[p] Add common package and update gRPC api

- common package now contains a waitingGroup, a useful struct to deal
  with synchronization
- updated version of gRPC api should be more efficient for signature
  initialization (untested)
parent 5966dad3
......@@ -31,6 +31,7 @@ Unit tests:
- "go test -coverprofile dfssp_user.part -v dfss/dfssp/user"
- "go test -coverprofile dfssp_contract.part -v dfss/dfssp/contract"
- "go test -coverprofile dfssp_templates.part -v dfss/dfssp/templates"
- "go test -coverprofile dfssp_common.part -v dfss/dfssp/common"
- "go test -coverprofile dfssd.part -v dfss/dfssd"
- "go test -coverprofile dfssc_common.part -v dfss/dfssc/common"
- "go test -coverprofile dfssc_security.part -v dfss/dfssc/security"
......
This diff is collapsed.
......@@ -7,8 +7,8 @@ service Platform {
rpc Auth(AuthRequest) returns (RegisteredUser) {}
rpc Unregister(Empty) returns (ErrorCode) {}
rpc PostContract(PostContractRequest) returns (ErrorCode) {}
rpc JoinSignature(JoinSignatureRequest) returns (ErrorCode) {}
rpc ReadySign(ReadySignRequest) returns (ErrorCode) {}
rpc JoinSignature(JoinSignatureRequest) returns (stream UserConnected) {}
rpc ReadySign(ReadySignRequest) returns (LaunchSignature) {} // Warning, LaunchSignature can be emitted with a very high delay
}
// RegisterRequest message contains the client's email adress and his
......@@ -70,7 +70,26 @@ message JoinSignatureRequest {
uint32 port = 2;
}
// UserConnected is emitted by the platform to the client to announce a new client connection
message UserConnected {
ErrorCode errorCode = 1;
string contractUuid = 2;
message User {
bytes KeyHash = 1;
string email = 2;
string ip = 3;
uint32 port = 4;
}
}
// ReadySignRequest contains the contract unique identitier that is ready to be signed
message ReadySignRequest {
string contractUuid = 1;
}
// LaunchSignature is emitted by the platform when every signers are ready
message LaunchSignature {
ErrorCode errorCode = 1;
string signatureUuid = 2;
repeated string keyHash = 3;
}
package common
import (
"container/list"
"sync"
)
type waitingGroup struct {
channels *list.List
oldMessages []interface{}
mutex sync.Mutex
}
// WaitingGroupMap is a synchronisation tool for goroutines.
// It enables several goroutines to wait for other goroutines in a specific "room".
//
// After joining a room, a goroutine can broadcast or wait for events.
// To avoid memory leaks, always call Unjoin when leaving a goroutine.
//
// See group_test.go for some examples.
type WaitingGroupMap struct {
data map[string]*waitingGroup
mutex sync.Mutex
}
// NewWaitingGroupMap returns a ready to use WaitingGroupMap.
func NewWaitingGroupMap() *WaitingGroupMap {
return &WaitingGroupMap{
data: make(map[string]*waitingGroup),
}
}
// Join permits the current goroutine to join a room.
// It returns the listenning channel and a slice containing messages already sent by other members of the room.
// The room is automatically created if unknown.
func (g *WaitingGroupMap) Join(room string) (listen chan interface{}, oldMessages []interface{}) {
// Check if the current waiting group knows this room
g.mutex.Lock()
_, present := g.data[room]
if !present {
g.data[room] = &waitingGroup{
channels: list.New(),
oldMessages: make([]interface{}, 0),
}
}
g.mutex.Unlock()
g.data[room].mutex.Lock()
listen = make(chan interface{}, 100)
g.data[room].channels.PushBack(listen)
oldMessages = g.data[room].oldMessages
g.data[room].mutex.Unlock()
return
}
// Unjoin remove the given chan from the current room, freeing memory if needed.
// If there is nobody remaining in the room, it is destroyed by calling Close.
func (g *WaitingGroupMap) Unjoin(room string, i chan interface{}) {
g.data[room].mutex.Lock()
for e := g.data[room].channels.Front(); e != nil; e = e.Next() {
if e.Value == i {
g.data[room].channels.Remove(e) // Remove element from list
break
}
}
if g.data[room].channels.Len() == 0 {
g.Close(room)
return
}
g.data[room].mutex.Unlock()
}
// Broadcast emits a message to every member of the room, including the sender.
func (g *WaitingGroupMap) Broadcast(room string, value interface{}) {
g.data[room].mutex.Lock()
g.data[room].oldMessages = append(g.data[room].oldMessages, value)
g.data[room].mutex.Unlock()
for e := g.data[room].channels.Front(); e != nil; e = e.Next() {
e.Value.(chan interface{}) <- value
}
}
// Close removes the room from the current WaitingGroupMap, closing all opened channels,
// and clearing oldMessages.
func (g *WaitingGroupMap) Close(room string) {
for e := g.data[room].channels.Front(); e != nil; e = e.Next() {
close(e.Value.(chan interface{}))
}
delete(g.data, room)
}
// CloseAll clears every available room in the current WaitingGroupMap.
func (g *WaitingGroupMap) CloseAll() {
for k := range g.data {
g.Close(k)
}
}
package common
import (
"sync"
"testing"
"time"
)
func TestWaitingGroup(t *testing.T) {
nb := 60
w := NewWaitingGroupMap()
waitGroup := &sync.WaitGroup{} // test only
waitGroup.Add(nb)
// Spawn nb emitters waiting for (nb-1) other emitters
for i := 0; i < nb; i++ {
go func(i int) {
// Add some virtual latency
time.Sleep(time.Duration(i) * time.Millisecond)
// Join the waitingGroupMap
myChan, nbs := w.Join("A")
w.Broadcast("A", i)
// Wait for other msg
for m := range myChan {
nbs = append(nbs, m)
if len(nbs) == nb {
break
}
}
// Free the waitingGroupMap
w.Unjoin("A", myChan)
waitGroup.Done()
}(i)
}
waitGroup.Wait() // test only, wait for test to fully happen
}
func TestCloseWaitingGroup(t *testing.T) {
w := NewWaitingGroupMap()
waitGroup := &sync.WaitGroup{} // test only
waitGroup.Add(1)
go func() {
myChan, _ := w.Join("A")
for _ = range myChan {
t.Fatal("Should not be here")
}
// No need to call Unjoin here: if we do, we will try to unjoin a unknown room
waitGroup.Done()
}()
time.Sleep(10 * time.Millisecond)
w.CloseAll()
waitGroup.Wait()
}
......@@ -60,15 +60,15 @@ func (s *platformServer) PostContract(ctx context.Context, in *api.PostContractR
// JoinSignature handler
//
// Handle incoming JoinSignatureRequest messages
func (s *platformServer) JoinSignature(ctx context.Context, in *api.JoinSignatureRequest) (*api.ErrorCode, error) {
func (s *platformServer) JoinSignature(in *api.JoinSignatureRequest, stream api.Platform_JoinSignatureServer) error {
// TODO
return nil, nil
return nil
}
// ReadySign handler
//
// Handle incoming ReadySignRequest messages
func (s *platformServer) ReadySign(ctx context.Context, in *api.ReadySignRequest) (*api.ErrorCode, error) {
func (s *platformServer) ReadySign(ctx context.Context, in *api.ReadySignRequest) (*api.LaunchSignature, error) {
// TODO
return nil, nil
}
......
......@@ -52,15 +52,15 @@ func (s *mockServer) PostContract(ctx context.Context, in *api.PostContractReque
// JoinSignature handler
//
// Handle incoming JoinSignatureRequest messages
func (s *mockServer) JoinSignature(ctx context.Context, in *api.JoinSignatureRequest) (*api.ErrorCode, error) {
func (s *mockServer) JoinSignature(in *api.JoinSignatureRequest, stream api.Platform_JoinSignatureServer) error {
// TODO
return nil, nil
return nil
}
// ReadySign handler
//
// Handle incoming ReadySignRequest messages
func (s *mockServer) ReadySign(ctx context.Context, in *api.ReadySignRequest) (*api.ErrorCode, error) {
func (s *mockServer) ReadySign(ctx context.Context, in *api.ReadySignRequest) (*api.LaunchSignature, error) {
// TODO
return nil, nil
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment