I have to implement the observer sample with grpc.
There’s a server that broadcast messages to shoppers
after which the shoppers return with an acknowledge.
Utilizing a bidirectional stream:
rpc CommandStream(stream AcknowMessage) returns (stream CommandMessage);
The place the server sends CommandMessage to shoppers.
And the shoppers reply with AcknowMessage.
The server have a perform to broadcast message when required,
additionally the perform observe aknow messages or timeout and return
a map of outcome like map[clientId]standing.
So ask ChatGpt and get the subsequent code, this system work nicely.
Even when an akwnoledge sign arrive after the timeout it doesn’t taked in rely.
The code:
sort consumer struct {
id string
stream pb.CommandService_CommandStreamServer
ackCh chan *pb.Acknowledgement
}
sort server struct {
pb.UnimplementedCommandServiceServer
mu sync.Mutex
shoppers map[string]*consumer
}
func newServer() *server {
return &server{
shoppers: make(map[string]*consumer),
}
}
//Wen consumer begin, it ship CommandStream to suscribe
func (s *server) CommandStream(stream pb.CommandService_CommandStreamServer) error {
p, _ := peer.FromContext(stream.Context())
// del IP
clientID := p.Addr.String() //
//clientID := fmt.Sprintf("client-%d", time.Now().UnixNano())
c := &consumer{
id: clientID,
stream: stream,
ackCh: make(chan *pb.Acknowledgement),
}
s.mu.Lock()
s.shoppers[clientID] = c
s.mu.Unlock()
log.Printf("Shopper related: %s", clientID)
defer func() {
s.mu.Lock()
delete(s.shoppers, clientID)
s.mu.Unlock()
log.Printf("Shopper disconnected: %s", clientID)
}()
// Load the ack response within the consumer channel
for {
in, err := stream.Recv() //Look forward to reply
if err != nil {
return err
}
if ack := in.GetAck(); ack != nil {
//log.Printf("Obtained ACK from %s: %+v", clientID, ack)
log.Printf("Obtain cmd: %s", ack.CommandId)
c.ackCh <- ack
}
}
}
func (s *server) BroadcastCommand(identify, jsonArgs string) map[string]string {
s.mu.Lock()
defer s.mu.Unlock()
commandID := fmt.Sprintf("cmd-%d", time.Now().UnixNano())
log.Printf("Broadcast command: %v", commandID)
cmd := &pb.CommandMessage{
Payload: &pb.CommandMessage_Command{
Command: &pb.Command{
Id: commandID,
Identify: identify,
JsonArgs: jsonArgs,
},
},
}
outcomes := make(map[string]string)
var wg sync.WaitGroup
mu := sync.Mutex{} // Defend outcomes map
for _, c := vary s.shoppers {
wg.Add(1)
go func(cl *consumer) {
defer wg.Carried out()
// Ship command
if err := c.stream.Ship(cmd); err != nil {
log.Printf("Error sending to %s: %v", c.id, err)
mu.Lock()
outcomes[cl.id] = "send_failed"
mu.Unlock()
return
}
// Look forward to ack or timeout
ackCh := make(chan string, 1)
start := time.Now().UnixNano()
fmt.Printf(" Look forward to ack or timeout n")
reads := 0
go func() {
for {
choose {
case ack := <-c.ackCh:
fmt.Printf(" learn channel after: %d n", time.Now().UnixNano()-begin)
fmt.Printf(" cmdId: %s n", ack.CommandId)
reads++
ackCh <- ack.Standing
case <-time.After(500 * time.Millisecond):
fmt.Printf(" timeout after: %d n", time.Now().UnixNano()-begin)
ackCh <- "timeout"
}
}
}()
//fmt.Printf("many reads: %d n", reads) // All the time print 0, Why ??
standing := <-ackCh
mu.Lock()
outcomes[cl.id] = standing
mu.Unlock()
}(c)
}
wg.Wait()
return outcomes
}
However, inside the printed perform there’s a piece of code that I can’t perceive:
go func() {
for {
reads++
choose {
case ack := <-c.ackCh:
ackCh <- ack.Standing
case <-time.After(500 * time.Millisecond):
ackCh <- "timeout"
}
}
}()
fmt.Printf("many reads: %d n", reads) // All the time print 0, Why ??
Why the learn counter at all times is 0 ?
How the movement escape for the infinite loop with out a return ?
Primarily based on my channel information a return is lacking, however the code work nicely.
And if I add a return, the for loop is executed only one time, and typically
there are earlier ack messages that has arrived after timeout,
As a result of earlier command that has arrived after timeout are wrote to a channel, and the write operation is blocked till a channel learn.
The sending goroutine blocks till one other goroutine is able to obtain from that channel.
I at all times have to discard late arrival msg and get simply the final one.
Is There a greater method to write this?