From a5e1acb6ce0be21a41717b4465aad05eac5072e0 Mon Sep 17 00:00:00 2001 From: Martino Facchin Date: Tue, 12 Dec 2023 16:33:07 +0100 Subject: [PATCH 1/4] Fix format for NOTIFICATION packet and remove log --- example/sample_mprpc.go | 11 ------ proxy/main.go | 74 +++-------------------------------------- rpc/server.go | 48 ++++++++++++++------------ 3 files changed, 31 insertions(+), 102 deletions(-) diff --git a/example/sample_mprpc.go b/example/sample_mprpc.go index b0c36a5..ec51e99 100644 --- a/example/sample_mprpc.go +++ b/example/sample_mprpc.go @@ -13,7 +13,6 @@ import ( type Resolver map[string]reflect.Value func (self Resolver) Resolve(name string, arguments []reflect.Value) (reflect.Value, error) { - //fmt.Println("resolving ", name) return self[name], nil } @@ -22,12 +21,10 @@ func (self Resolver) Functions() []string { for el := range self { functions = append(functions, el) } - fmt.Println(functions) return functions } func echo(test string) (string, fmt.Stringer) { - fmt.Println(test) return "Hello, " + test, nil } @@ -37,7 +34,6 @@ func whoami() (string, fmt.Stringer) { } func add(a, b uint) (uint, fmt.Stringer) { - fmt.Println("calling add on ", a, " and ", b) return a + b, nil } @@ -47,20 +43,15 @@ func serialportListener(serport *os.File) { n, err := serport.Read(data) if err != nil { - fmt.Println(err) continue } - fmt.Println("got data on serial port") - data = data[:n] - fmt.Println(data) conn, err := net.Dial("tcp", "m4-proxy:5001") client := rpc.NewSession(conn, true) xerr := client.Send("tty", data) if xerr != nil { - fmt.Println(xerr) continue } } @@ -69,12 +60,10 @@ func serialportListener(serport *os.File) { var serport *os.File func tty(test []reflect.Value) fmt.Stringer { - fmt.Println("tty called: ", test) var temp []byte for _, elem := range test { temp = append(temp, byte(elem.Int())) } - //fmt.Println(temp) serport.Write(temp) return nil } diff --git a/proxy/main.go b/proxy/main.go index 3e0e7f5..b575ede 100644 --- a/proxy/main.go +++ b/proxy/main.go @@ -24,44 +24,15 @@ const ( type Resolver map[string]reflect.Value func handleConnection(c net.Conn, chardev *os.File, resp chan []byte) { - - fmt.Printf("Serving %s\n", c.RemoteAddr().String()) - data, _, err := msgpack.Unpack(c) if err != nil { fmt.Println(err) return } - fmt.Println(data) var buf bytes.Buffer msgpack.Pack(&buf, data.Interface()) - - /* - msgId := _req[1] - msgName := _req[2] - msgArgs := _req[3] - - rawdata := make([]byte, 5) - rawdata[0] = byte(msgType.Int()) - rawdata[1] = byte(msgId.Int()) - rawdata[2] = byte(msgId.Int()) - rawdata[3] = byte(msgId.Int()) - rawdata[4] = byte(msgId.Int()) - rawdata = append(rawdata, msgName.Bytes()...) - - something := msgArgs.Addr().Bytes() - - fmt.Println(something) - rawdata = append(rawdata, something...) - - fmt.Println(data) - fmt.Println(rawdata) - */ - - fmt.Println(buf) - chardev.Write(buf.Bytes()) msgType := buf.Bytes()[1] @@ -69,18 +40,14 @@ func handleConnection(c net.Conn, chardev *os.File, resp chan []byte) { if msgType == REQUEST { // wait to be unlocked by the other reading goroutine // TODO: add timeout handling - fmt.Println("wait for response") select { case response := <-resp: //chardev.Read(response) - fmt.Println("return response to client") c.Write(response) case <-time.After(1 * time.Second): c.Write(nil) } } - fmt.Println("done") - if msgType == NOTIFICATION { // fire and forget } @@ -91,9 +58,6 @@ func handleConnection(c net.Conn, chardev *os.File, resp chan []byte) { func chardevListener(chardev *os.File, resp chan []byte) { for { - - fmt.Println("charDevListener") - data := make([]byte, 1024) response := make([]byte, 1024) @@ -102,36 +66,16 @@ func chardevListener(chardev *os.File, resp chan []byte) { data = data[:n] if err != nil { - fmt.Println(err) continue } - fmt.Println("chardev.Read returned") - if n <= 0 { continue } - - fmt.Println("got data from chardev") - fmt.Println(data) - start := 0 for { - - fmt.Println("unpacker loop") - copy_data := data[start:] - message, n, err := msgpack.UnpackReflected(bytes.NewReader(copy_data)) - start += n - - fmt.Printf("%d bytes consumed\n", n) - fmt.Printf("%v\n", message) - fmt.Println(message) - if err != nil { - fmt.Printf("Err: %v\n", err) - } - if err == io.EOF { break } @@ -142,12 +86,8 @@ func chardevListener(chardev *os.File, resp chan []byte) { } msgType := _req[0].Int() - fmt.Printf("MsgType: %d\n", msgType) - - fmt.Println("before response") if msgType == RESPONSE { - fmt.Println("got response and continue") // unlock thread waiting on handleConnection resp <- copy_data[:n] continue @@ -163,23 +103,21 @@ func chardevListener(chardev *os.File, resp chan []byte) { msgFunction = _req[1] } - fmt.Println("before serving") method := string(msgFunction.Bytes()) port := functionToPort(method) - fmt.Println("Serving function ", method, " to port ", port) - // REQUEST or NOTIFICATION conn, err := net.Dial("tcp", port) + if err != nil { + continue + } + _, err = conn.Write(copy_data[:n]) if err != nil { fmt.Println(err) continue } - conn.Write(copy_data[:n]) if msgType == REQUEST { - fmt.Println("ask for a response") - var to_send []byte i := 0 for { @@ -191,12 +129,10 @@ func chardevListener(chardev *os.File, resp chan []byte) { break } } - fmt.Println("sending ", to_send[:i]) chardev.Write(to_send[:i]) } if msgType == NOTIFICATION { - fmt.Println("got a notification") // fire and forget } @@ -206,7 +142,6 @@ func chardevListener(chardev *os.File, resp chan []byte) { } func (self Resolver) Resolve(name string, arguments []reflect.Value) (reflect.Value, error) { - fmt.Println("resolving ", name) return self[name], nil } @@ -215,7 +150,6 @@ func (self Resolver) Functions() []string { for el := range self { functions = append(functions, el) } - fmt.Println(functions) return functions } diff --git a/rpc/server.go b/rpc/server.go index 43ee5e5..1a3a352 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -25,7 +25,7 @@ func (self *Server) Register() { conn, _ := net.Dial("tcp", "m4-proxy:5000") client := NewSession(conn, true) - client.Send("register", self.port, self.resolver.Functions()) + client.Call("register", self.port, self.resolver.Functions()) } // Goes into the event loop to get ready to serve. @@ -229,26 +229,31 @@ func HandleRPCRequest(req reflect.Value) (int, string, []reflect.Value, int, err if !ok { break } - if len(_req) != 4 { - fmt.Println(len(_req)) + index := 0 + if len(_req) > 4 && len(_req) < 3 { break } - msgType := _req[0] + msgType := _req[index] typeOk := msgType.Kind() == reflect.Int || msgType.Kind() == reflect.Int8 || msgType.Kind() == reflect.Int16 || msgType.Kind() == reflect.Int32 || msgType.Kind() == reflect.Int64 if !typeOk { fmt.Println("msgType") fmt.Println(msgType) break } - msgId := _req[1] - idOk := msgId.Kind() == reflect.Int || msgId.Kind() == reflect.Int8 || msgId.Kind() == reflect.Int16 || msgId.Kind() == reflect.Int32 || msgId.Kind() == reflect.Int64 || msgId.Kind() == reflect.Uint32 - if !idOk { - fmt.Println("msgId") - fmt.Println(msgId) - fmt.Println(msgId.Kind()) - break + index++ + var msgId reflect.Value + if msgType.Int() == REQUEST { + msgId = _req[index] + idOk := msgId.Kind() == reflect.Int || msgId.Kind() == reflect.Int8 || msgId.Kind() == reflect.Int16 || msgId.Kind() == reflect.Int32 || msgId.Kind() == reflect.Int64 || msgId.Kind() == reflect.Uint32 + if !idOk { + fmt.Println("msgId") + fmt.Println(msgId) + fmt.Println(msgId.Kind()) + break + } + index++ } - _funcName := _req[2] + _funcName := _req[index] funcOk := _funcName.Kind() == reflect.Array || _funcName.Kind() == reflect.Slice if !funcOk { fmt.Println("_funcName") @@ -261,27 +266,29 @@ func HandleRPCRequest(req reflect.Value) (int, string, []reflect.Value, int, err break } if msgType.Int() != REQUEST && msgType.Int() != NOTIFICATION { - fmt.Println(msgType.Int()) break } - _arguments := _req[3] + index++ + _arguments := _req[index] var arguments []reflect.Value if _arguments.Kind() == reflect.Array || _arguments.Kind() == reflect.Slice { - elemType := _req[3].Type().Elem() + elemType := _req[index].Type().Elem() _elemType := elemType - ok := _elemType.Kind() == reflect.Uint || _elemType.Kind() == reflect.Uint8 || _elemType.Kind() == reflect.Uint16 || _elemType.Kind() == reflect.Uint32 || _elemType.Kind() == reflect.Uint64 || _elemType.Kind() == reflect.Uintptr + ok := isUintType(_elemType) if !ok || _elemType.Kind() != reflect.Uint8 { arguments, ok = _arguments.Interface().([]reflect.Value) } else { - arguments = []reflect.Value{reflect.ValueOf(string(_req[3].Interface().([]byte)))} + arguments = []reflect.Value{reflect.ValueOf(string(_req[index].Interface().([]byte)))} } } else { - arguments = []reflect.Value{_req[3]} + arguments = []reflect.Value{_req[index]} } - if msgId.Kind() == reflect.Uint || msgId.Kind() == reflect.Uint32 { + if isUintType(msgId) { return int(msgId.Uint()), string(funcName), arguments, int(msgType.Int()), nil - } else { + } else if isIntType(msgId) { return int(msgId.Int()), string(funcName), arguments, int(msgType.Int()), nil + } else { + return int(0), string(funcName), arguments, int(msgType.Int()), nil } } return 0, "", nil, 0, errors.New("Invalid message format") @@ -290,7 +297,6 @@ func HandleRPCRequest(req reflect.Value) (int, string, []reflect.Value, int, err // This is a low-level function that is not supposed to be called directly // by the user. Change this if the MessagePack protocol is updated. func SendResponseMessage(writer io.Writer, msgId int, value reflect.Value) error { - fmt.Println("sending response") _, err := writer.Write([]byte{0x94}) if err != nil { From 519218cb0571d4f940d0de64c4458e6a26f6af45 Mon Sep 17 00:00:00 2001 From: Martino Facchin Date: Tue, 12 Dec 2023 16:35:13 +0100 Subject: [PATCH 2/4] Add led RPC function for ping-pong demo --- example/sample_mprpc.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/example/sample_mprpc.go b/example/sample_mprpc.go index ec51e99..b1fa416 100644 --- a/example/sample_mprpc.go +++ b/example/sample_mprpc.go @@ -68,13 +68,24 @@ func tty(test []reflect.Value) fmt.Stringer { return nil } +func led(status uint) fmt.Stringer { + conn, _ := net.Dial("tcp", "m4-proxy:5001") + client := rpc.NewSession(conn, true) + xerr := client.Send("led", status) + if xerr != nil { + fmt.Println(xerr) + } + return nil +} + func main() { + exec.Command("stty", "-F", "/dev/ttyGS0", "raw") serport, _ = os.OpenFile("/dev/ttyGS0", os.O_RDWR, 0) go serialportListener(serport) - res := Resolver{"echo": reflect.ValueOf(echo), "add": reflect.ValueOf(add), "tty": reflect.ValueOf(tty), "whoami": reflect.ValueOf(whoami)} + res := Resolver{"echo": reflect.ValueOf(echo), "add": reflect.ValueOf(add), "tty": reflect.ValueOf(tty), "whoami": reflect.ValueOf(whoami), "led": reflect.ValueOf(led)} serv := rpc.NewServer(res, true, nil, 5002) l, _ := net.Listen("tcp", ":5002") From e5446cecd7a7bfc571617763fd930aa6c0f070cf Mon Sep 17 00:00:00 2001 From: Martino Facchin Date: Fri, 12 Jan 2024 09:12:57 +0100 Subject: [PATCH 3/4] handle local connection (no containers) --- example/sample_mprpc.go | 16 ++++++++++++++-- rpc/server.go | 13 ++++++++++++- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/example/sample_mprpc.go b/example/sample_mprpc.go index b1fa416..d1d8504 100644 --- a/example/sample_mprpc.go +++ b/example/sample_mprpc.go @@ -37,6 +37,8 @@ func add(a, b uint) (uint, fmt.Stringer) { return a + b, nil } +var proxyname string = "m4-proxy" + func serialportListener(serport *os.File) { for { data := make([]byte, 1024) @@ -48,7 +50,7 @@ func serialportListener(serport *os.File) { data = data[:n] - conn, err := net.Dial("tcp", "m4-proxy:5001") + conn, err := net.Dial("tcp", proxyname+":5001") client := rpc.NewSession(conn, true) xerr := client.Send("tty", data) if xerr != nil { @@ -69,7 +71,7 @@ func tty(test []reflect.Value) fmt.Stringer { } func led(status uint) fmt.Stringer { - conn, _ := net.Dial("tcp", "m4-proxy:5001") + conn, _ := net.Dial("tcp", proxyname+":5001") client := rpc.NewSession(conn, true) xerr := client.Send("led", status) if xerr != nil { @@ -83,6 +85,16 @@ func main() { exec.Command("stty", "-F", "/dev/ttyGS0", "raw") serport, _ = os.OpenFile("/dev/ttyGS0", os.O_RDWR, 0) + // if Proxy is started outside a container, m4-proxy will be localhost + // ping m4-proxy, if failed then m4-proxy is localhost + // if ping success, then m4-proxy is m4-proxy + if err := exec.Command("ping", "-c", "1", proxyname).Run(); err != nil { + fmt.Println("Proxy is localhost") + proxyname = "localhost" + } + + // serialportListener listens to the serial port and forwards the data to the proxy + go serialportListener(serport) res := Resolver{"echo": reflect.ValueOf(echo), "add": reflect.ValueOf(add), "tty": reflect.ValueOf(tty), "whoami": reflect.ValueOf(whoami), "led": reflect.ValueOf(led)} diff --git a/rpc/server.go b/rpc/server.go index 1a3a352..e2b8d3c 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -7,6 +7,7 @@ import ( "log" "net" "os" + "os/exec" "reflect" msgpack "github.com/facchinm/msgpack-go" @@ -21,8 +22,18 @@ type Server struct { port uint } +var proxyname string = "m4-proxy" + func (self *Server) Register() { - conn, _ := net.Dial("tcp", "m4-proxy:5000") + + // if Proxy is started outside a container, m4-proxy will be localhost + // ping m4-proxy, if failed then m4-proxy is localhost + // if ping success, then m4-proxy is m4-proxy + if err := exec.Command("ping", "-c", "1", proxyname).Run(); err != nil { + fmt.Println("Proxy is localhost") + proxyname = "localhost" + } + conn, _ := net.Dial("tcp", proxyname+":5000") client := NewSession(conn, true) client.Call("register", self.port, self.resolver.Functions()) From 1bdf98d28f7a228dd5c96d395085e0e96268b968 Mon Sep 17 00:00:00 2001 From: Alexander Entinger Date: Wed, 17 Jan 2024 10:55:32 +0100 Subject: [PATCH 4/4] Fix: provide a human readable error message in case the serial interface can not be opened. --- proxy/main.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/proxy/main.go b/proxy/main.go index b575ede..678f996 100644 --- a/proxy/main.go +++ b/proxy/main.go @@ -172,6 +172,10 @@ func main() { functions = make(map[string]int) chardev, err := os.OpenFile("/dev/x8h7_ui", os.O_RDWR, 0) + if (err != nil) { + fmt.Println(err) + return + } chardev_reader_chan := make(chan []byte, 1024)