diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 8caf77e54c0568453ef050d49b9d24760e771630..4ae394297a8d7d799d8304c3e1c4dfb5cc2f7be3 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -61,8 +61,8 @@ Integration tests: # Build binary - "cd $GOPATH/src/dfss/dfssd && go build -ldflags \"-r .\" -o dfssd" # Install binary - - "mv $GOPATH/src/dfss/dfssd/dfssd /bin/" - - "mv $GOPATH/src/dfss/dfssd/libqtdrv.ui.so.1 /lib/" + - "cp $GOPATH/src/dfss/dfssd/dfssd $GOPATH/bin/" + - "cp $GOPATH/src/dfss/dfssd/libqtdrv.ui.so.1 /lib/" # Start integration tests - "go test -v dfss/tests" diff --git a/dfssc/sign/protocol.go b/dfssc/sign/protocol.go index 7ca8ac6d3e60ceb4d869eab1084082d6bf4b3375..d0f944c7c40630f232ca88d2766f88a36b0877ad 100644 --- a/dfssc/sign/protocol.go +++ b/dfssc/sign/protocol.go @@ -23,8 +23,8 @@ func (m *SignatureManager) Sign() error { } m.makeSignersHashToIDMap() - m.cServerIface.incomingPromises = make(chan interface{}) - m.cServerIface.incomingSignatures = make(chan interface{}) + m.cServerIface.incomingPromises = make(chan interface{}, chanBufferSize) + m.cServerIface.incomingSignatures = make(chan interface{}, chanBufferSize) // Cooldown delay, let other clients wake-up their channels time.Sleep(time.Second) @@ -88,25 +88,30 @@ func (m *SignatureManager) makeSignersHashToIDMap() { } // promiseRound describes a promise round: reception and sending -// TODO better error management - this function should return `error` ! func (m *SignatureManager) promiseRound(pendingSet, sendSet []uint32, myID uint32) { // Reception of the due promises - // TODO this ctx needs a timeout ! for len(pendingSet) > 0 { - promise := (<-m.cServerIface.incomingPromises).(*cAPI.Promise) - senderID, exist := m.hashToID[fmt.Sprintf("%x", promise.Context.SenderKeyHash)] - if exist { - var err error - pendingSet, err = common.Remove(pendingSet, senderID) - if err != nil { - continue + select { + case promiseIface := <-m.cServerIface.incomingPromises: + promise := (promiseIface).(*cAPI.Promise) + senderID, exist := m.hashToID[fmt.Sprintf("%x", promise.Context.SenderKeyHash)] + if exist { + var err error + pendingSet, err = common.Remove(pendingSet, senderID) + if err != nil { + continue + } + m.archives.receivedPromises = append(m.archives.receivedPromises, promise) } - m.archives.receivedPromises = append(m.archives.receivedPromises, promise) + + case <-time.After(time.Minute): + // TODO contact TTP + return } } - c := make(chan *cAPI.Promise) + c := make(chan *cAPI.Promise, chanBufferSize) // Sending of due promises for _, id := range sendSet { go func(id uint32, m *SignatureManager) { diff --git a/dfssc/sign/signatures.go b/dfssc/sign/signatures.go index 0d5a0f85722597175ddee6f35202f7ed2ed6db54..c367b0f1c55f76ebf201b5ebc038ebbc8e70fffc 100644 --- a/dfssc/sign/signatures.go +++ b/dfssc/sign/signatures.go @@ -1,15 +1,16 @@ package sign import ( - "dfss/dfssc/common" "fmt" + "time" cAPI "dfss/dfssc/api" + "dfss/dfssc/common" ) // ExchangeAllSignatures creates and sends signatures to all the signers of the contract func (m *SignatureManager) ExchangeAllSignatures() error { - allReceived := make(chan error) + allReceived := make(chan error, chanBufferSize) go m.ReceiveAllSignatures(allReceived) myID, err := m.FindID() @@ -19,7 +20,7 @@ func (m *SignatureManager) ExchangeAllSignatures() error { // compute a set of all signers except me sendSet := common.GetAllButOne(m.sequence, myID) - errorChan := make(chan error) + errorChan := make(chan error, chanBufferSize) for _, id := range sendSet { go func(id uint32) { signature, err2 := m.CreateSignature(myID, id) @@ -71,13 +72,20 @@ func (m *SignatureManager) ReceiveAllSignatures(out chan error) { // compute a set of all signers except me pendingSet := common.GetAllButOne(m.sequence, myID) - // TODO this ctx needs a timeout ! for len(pendingSet) > 0 { - signature := (<-m.cServerIface.incomingSignatures).(*cAPI.Signature) - senderID, exist := m.hashToID[fmt.Sprintf("%x", signature.Context.SenderKeyHash)] - if exist { - pendingSet, _ = common.Remove(pendingSet, senderID) - m.archives.receivedSignatures = append(m.archives.receivedSignatures, signature) + select { + // Waiting for signatures from grpc handler + case signatureIface := <-m.cServerIface.incomingSignatures: + signature := (signatureIface).(*cAPI.Signature) + senderID, exist := m.hashToID[fmt.Sprintf("%x", signature.Context.SenderKeyHash)] + if exist { + pendingSet, _ = common.Remove(pendingSet, senderID) + m.archives.receivedSignatures = append(m.archives.receivedSignatures, signature) + } + + case <-time.After(time.Minute): + out <- fmt.Errorf("Signature reception timeout!") + return } } diff --git a/dfssc/sign/starter.go b/dfssc/sign/starter.go index 6c0cc7e6571ff65f153ef0713d87e3a4a4a7faa6..539d3fe75e0f569fb33f31e8e42ede4848084521 100644 --- a/dfssc/sign/starter.go +++ b/dfssc/sign/starter.go @@ -19,6 +19,9 @@ import ( "google.golang.org/grpc" ) +// Limit the buffer size of the channels +const chanBufferSize = 100 + // SignatureManager handles the signature of a contract. type SignatureManager struct { auth *security.AuthContainer diff --git a/tests/starters_test.go b/tests/starters_test.go index 971c332882c2f929accf836bbff97754fc173bcc..7337d6ef8603dd4f1c8ec71d63e2767f0dc95806 100644 --- a/tests/starters_test.go +++ b/tests/starters_test.go @@ -64,7 +64,7 @@ func startPlatform(tmpDir string) (platform, ttp, demo *exec.Cmd, stop func(), c err = ttp.Start() // Start demonstrator - demo = exec.Command(demoPath, "-p", "3000", "nogui") + demo = exec.Command(demoPath, "-p", "9099", "nogui") demo.Stdout = os.Stdout demo.Stderr = os.Stderr err = demo.Start() @@ -105,7 +105,7 @@ func createClient(tmpDir string, ca []byte, port int) (*exec.Cmd, error) { // Prepare the client command. // The last argument is up to you! - cmd := exec.Command(path, "-ca", caPath, "-cert", certPath, "-host", "127.0.0.1:"+testPort, "-key", keyPath, "-port", strconv.Itoa(port), "-v", "-d") + cmd := exec.Command(path, "-ca", caPath, "-cert", certPath, "-host", "127.0.0.1:"+testPort, "-key", keyPath, "-port", strconv.Itoa(port), "-v", "-d", "localhost:9099") return cmd, nil }