Thursday, June 12, 2025
HomeGolangChannels, When this system movement exit from the infinite for { choose...

Channels, When this system movement exit from the infinite for { choose } – Getting Assist


I have to implement the observer sample with grpc.
There’s a server that broadcast messages to shoppers
after which the shoppers return with an acknowledge.

Utilizing a bidirectional stream:
rpc CommandStream(stream AcknowMessage) returns (stream CommandMessage);

The place the server sends CommandMessage to shoppers.
And the shoppers reply with AcknowMessage.

The server have a perform to broadcast message when required,
additionally the perform observe aknow messages or timeout and return
a map of outcome like map[clientId]standing.

So ask ChatGpt and get the subsequent code, this system work nicely.
Even when an akwnoledge sign arrive after the timeout it doesn’t taked in rely.
The code:


sort consumer struct {
	id     string
	stream pb.CommandService_CommandStreamServer
	ackCh  chan *pb.Acknowledgement
}

sort server struct {
	pb.UnimplementedCommandServiceServer
	mu      sync.Mutex
	shoppers map[string]*consumer
}

func newServer() *server {
	return &server{
		shoppers: make(map[string]*consumer),
	}
}
//Wen consumer begin, it ship CommandStream to suscribe
func (s *server) CommandStream(stream pb.CommandService_CommandStreamServer) error {
	p, _ := peer.FromContext(stream.Context())
	// del IP
	clientID := p.Addr.String() //
	//clientID := fmt.Sprintf("client-%d", time.Now().UnixNano())

	c := &consumer{
		id:     clientID,
		stream: stream,
		ackCh:  make(chan *pb.Acknowledgement),
	}

	s.mu.Lock()
	s.shoppers[clientID] = c
	s.mu.Unlock()
	log.Printf("Shopper related: %s", clientID)

	defer func() {
		s.mu.Lock()
		delete(s.shoppers, clientID)
		s.mu.Unlock()
		log.Printf("Shopper disconnected: %s", clientID)
	}()

	// Load the ack response within the consumer channel
	for {
		in, err := stream.Recv() //Look forward to reply
		if err != nil {
			return err
		}
		if ack := in.GetAck(); ack != nil {
			//log.Printf("Obtained ACK from %s: %+v", clientID, ack)
			log.Printf("Obtain cmd: %s", ack.CommandId)
			c.ackCh <- ack
		}
	}
}


func (s *server) BroadcastCommand(identify, jsonArgs string) map[string]string {
	s.mu.Lock()
	defer s.mu.Unlock()

	commandID := fmt.Sprintf("cmd-%d", time.Now().UnixNano())
	log.Printf("Broadcast command: %v", commandID)

	cmd := &pb.CommandMessage{
		Payload: &pb.CommandMessage_Command{
			Command: &pb.Command{
				Id:       commandID,
				Identify:     identify,
				JsonArgs: jsonArgs,
			},
		},
	}

	outcomes := make(map[string]string)
	var wg sync.WaitGroup
	mu := sync.Mutex{} // Defend outcomes map

	for _, c := vary s.shoppers {
		wg.Add(1)

		go func(cl *consumer) {
			defer wg.Carried out()

			// Ship command
			if err := c.stream.Ship(cmd); err != nil {
				log.Printf("Error sending to %s: %v", c.id, err)
				mu.Lock()
				outcomes[cl.id] = "send_failed"
				mu.Unlock()
				return
			}

			// Look forward to ack or timeout
			ackCh := make(chan string, 1)
			start := time.Now().UnixNano()			
			fmt.Printf(" Look forward to ack or timeout  n")
			reads := 0
		
			go func() {

				for {

					choose {
					case ack := <-c.ackCh:
						fmt.Printf("  learn channel after: %d  n", time.Now().UnixNano()-begin)
						fmt.Printf("               cmdId: %s  n", ack.CommandId)
						reads++
						ackCh <- ack.Standing

					case <-time.After(500 * time.Millisecond):
						fmt.Printf("  timeout after: %d  n", time.Now().UnixNano()-begin)
						ackCh <- "timeout"

					}
				}

			}()

			//fmt.Printf("many reads: %d  n", reads) // All the time print 0, Why ??

			standing := <-ackCh
			mu.Lock()
			outcomes[cl.id] = standing
			mu.Unlock()
		}(c)
	}

	wg.Wait()
	return outcomes
}

However, inside the printed perform there’s a piece of code that I can’t perceive:

			go func() {

				for {
					reads++
					choose {
					case ack := <-c.ackCh:						
						ackCh <- ack.Standing

					case <-time.After(500 * time.Millisecond):						
						ackCh <- "timeout"
					}
				}

			}()
         fmt.Printf("many reads: %d  n", reads) // All the time print 0, Why ??

Why the learn counter at all times is 0 ?
How the movement escape for the infinite loop with out a return ?

Primarily based on my channel information a return is lacking, however the code work nicely.
And if I add a return, the for loop is executed only one time, and typically
there are earlier ack messages that has arrived after timeout,
As a result of earlier command that has arrived after timeout are wrote to a channel, and the write operation is blocked till a channel learn.
The sending goroutine blocks till one other goroutine is able to obtain from that channel.
I at all times have to discard late arrival msg and get simply the final one.

Is There a greater method to write this?

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments