Commit e8867f36 authored by Richer Maximilien's avatar Richer Maximilien Committed by Loïck Bonniot

[c] Update channels with capacity and timeout

parent 5bb03e5d
Pipeline #645 failed with stage
......@@ -61,8 +61,8 @@ Integration tests:
# Build binary
- "cd $GOPATH/src/dfss/dfssd && go build -ldflags \"-r .\" -o dfssd"
# Install binary
- "mv $GOPATH/src/dfss/dfssd/dfssd /bin/"
- "mv $GOPATH/src/dfss/dfssd/libqtdrv.ui.so.1 /lib/"
- "cp $GOPATH/src/dfss/dfssd/dfssd $GOPATH/bin/"
- "cp $GOPATH/src/dfss/dfssd/libqtdrv.ui.so.1 /lib/"
# Start integration tests
- "go test -v dfss/tests"
......
......@@ -23,8 +23,8 @@ func (m *SignatureManager) Sign() error {
}
m.makeSignersHashToIDMap()
m.cServerIface.incomingPromises = make(chan interface{})
m.cServerIface.incomingSignatures = make(chan interface{})
m.cServerIface.incomingPromises = make(chan interface{}, chanBufferSize)
m.cServerIface.incomingSignatures = make(chan interface{}, chanBufferSize)
// Cooldown delay, let other clients wake-up their channels
time.Sleep(time.Second)
......@@ -88,25 +88,30 @@ func (m *SignatureManager) makeSignersHashToIDMap() {
}
// promiseRound describes a promise round: reception and sending
// TODO better error management - this function should return `error` !
func (m *SignatureManager) promiseRound(pendingSet, sendSet []uint32, myID uint32) {
// Reception of the due promises
// TODO this ctx needs a timeout !
for len(pendingSet) > 0 {
promise := (<-m.cServerIface.incomingPromises).(*cAPI.Promise)
senderID, exist := m.hashToID[fmt.Sprintf("%x", promise.Context.SenderKeyHash)]
if exist {
var err error
pendingSet, err = common.Remove(pendingSet, senderID)
if err != nil {
continue
select {
case promiseIface := <-m.cServerIface.incomingPromises:
promise := (promiseIface).(*cAPI.Promise)
senderID, exist := m.hashToID[fmt.Sprintf("%x", promise.Context.SenderKeyHash)]
if exist {
var err error
pendingSet, err = common.Remove(pendingSet, senderID)
if err != nil {
continue
}
m.archives.receivedPromises = append(m.archives.receivedPromises, promise)
}
m.archives.receivedPromises = append(m.archives.receivedPromises, promise)
case <-time.After(time.Minute):
// TODO contact TTP
return
}
}
c := make(chan *cAPI.Promise)
c := make(chan *cAPI.Promise, chanBufferSize)
// Sending of due promises
for _, id := range sendSet {
go func(id uint32, m *SignatureManager) {
......
package sign
import (
"dfss/dfssc/common"
"fmt"
"time"
cAPI "dfss/dfssc/api"
"dfss/dfssc/common"
)
// ExchangeAllSignatures creates and sends signatures to all the signers of the contract
func (m *SignatureManager) ExchangeAllSignatures() error {
allReceived := make(chan error)
allReceived := make(chan error, chanBufferSize)
go m.ReceiveAllSignatures(allReceived)
myID, err := m.FindID()
......@@ -19,7 +20,7 @@ func (m *SignatureManager) ExchangeAllSignatures() error {
// compute a set of all signers except me
sendSet := common.GetAllButOne(m.sequence, myID)
errorChan := make(chan error)
errorChan := make(chan error, chanBufferSize)
for _, id := range sendSet {
go func(id uint32) {
signature, err2 := m.CreateSignature(myID, id)
......@@ -71,13 +72,20 @@ func (m *SignatureManager) ReceiveAllSignatures(out chan error) {
// compute a set of all signers except me
pendingSet := common.GetAllButOne(m.sequence, myID)
// TODO this ctx needs a timeout !
for len(pendingSet) > 0 {
signature := (<-m.cServerIface.incomingSignatures).(*cAPI.Signature)
senderID, exist := m.hashToID[fmt.Sprintf("%x", signature.Context.SenderKeyHash)]
if exist {
pendingSet, _ = common.Remove(pendingSet, senderID)
m.archives.receivedSignatures = append(m.archives.receivedSignatures, signature)
select {
// Waiting for signatures from grpc handler
case signatureIface := <-m.cServerIface.incomingSignatures:
signature := (signatureIface).(*cAPI.Signature)
senderID, exist := m.hashToID[fmt.Sprintf("%x", signature.Context.SenderKeyHash)]
if exist {
pendingSet, _ = common.Remove(pendingSet, senderID)
m.archives.receivedSignatures = append(m.archives.receivedSignatures, signature)
}
case <-time.After(time.Minute):
out <- fmt.Errorf("Signature reception timeout!")
return
}
}
......
......@@ -19,6 +19,9 @@ import (
"google.golang.org/grpc"
)
// Limit the buffer size of the channels
const chanBufferSize = 100
// SignatureManager handles the signature of a contract.
type SignatureManager struct {
auth *security.AuthContainer
......
......@@ -64,7 +64,7 @@ func startPlatform(tmpDir string) (platform, ttp, demo *exec.Cmd, stop func(), c
err = ttp.Start()
// Start demonstrator
demo = exec.Command(demoPath, "-p", "3000", "nogui")
demo = exec.Command(demoPath, "-p", "9099", "nogui")
demo.Stdout = os.Stdout
demo.Stderr = os.Stderr
err = demo.Start()
......@@ -105,7 +105,7 @@ func createClient(tmpDir string, ca []byte, port int) (*exec.Cmd, error) {
// Prepare the client command.
// The last argument is up to you!
cmd := exec.Command(path, "-ca", caPath, "-cert", certPath, "-host", "127.0.0.1:"+testPort, "-key", keyPath, "-port", strconv.Itoa(port), "-v", "-d")
cmd := exec.Command(path, "-ca", caPath, "-cert", certPath, "-host", "127.0.0.1:"+testPort, "-key", keyPath, "-port", strconv.Itoa(port), "-v", "-d", "localhost:9099")
return cmd, nil
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment