chatcontroller.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package controller
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/gin-gonic/gin"
  6. "github.com/gorilla/websocket"
  7. "net/http"
  8. "strconv"
  9. "time"
  10. "trading-go/model"
  11. "trading-go/response"
  12. )
  13. var UP = websocket.Upgrader{
  14. ReadBufferSize: 1024,
  15. WriteBufferSize: 1024,
  16. }
  17. type client struct {
  18. conn *websocket.Conn
  19. pip chan *model.Message
  20. }
  21. var Conns map[uint]*client
  22. func init() {
  23. Conns = make(map[uint]*client)
  24. }
  25. // 发送消息
  26. func send(conn *websocket.Conn, uid uint) {
  27. pip := Conns[uid].pip
  28. defer func() {
  29. delete(Conns, uid)
  30. }()
  31. for {
  32. data := <-pip
  33. err := data.Save()
  34. if err != nil {
  35. fmt.Println(err.Error())
  36. break
  37. }
  38. msg, err := json.Marshal(data)
  39. if err != nil {
  40. fmt.Println("link closed")
  41. break
  42. }
  43. if data.Time == 1 {
  44. err = conn.WriteMessage(data.MsgType, msg)
  45. if err != nil {
  46. fmt.Println(err.Error())
  47. break
  48. }
  49. }
  50. }
  51. }
  52. // 接收消息
  53. func reception(conn *websocket.Conn, uid uint) {
  54. defer func() {
  55. delete(Conns, uid)
  56. }()
  57. for {
  58. var msg model.Message
  59. err := conn.ReadJSON(&msg)
  60. if err != nil {
  61. fmt.Println(err.Error())
  62. break
  63. }
  64. err = msg.Save()
  65. if err != nil {
  66. fmt.Println(err.Error())
  67. break
  68. }
  69. if v, ok := Conns[msg.To]; ok {
  70. v.pip <- &msg
  71. } else {
  72. }
  73. }
  74. }
  75. func broadcast(data model.Message) {
  76. for _, conn := range Conns {
  77. msg, err := json.Marshal(data)
  78. if err != nil {
  79. fmt.Println(err.Error())
  80. break
  81. }
  82. err = conn.conn.WriteMessage(data.MsgType, msg)
  83. if err != nil {
  84. fmt.Println(err.Error())
  85. break
  86. }
  87. }
  88. }
  89. func heartBeat(conn *websocket.Conn, uid uint) {
  90. defer func() {
  91. delete(Conns, uid)
  92. }()
  93. for {
  94. msg := model.Message{
  95. MsgType: 4,
  96. From: 0,
  97. To: uid,
  98. Time: uint(time.Now().Unix()),
  99. Content: "alive",
  100. }
  101. data, err := json.Marshal(msg)
  102. if err != nil {
  103. fmt.Println(err.Error())
  104. break
  105. }
  106. err = conn.WriteMessage(websocket.TextMessage, data)
  107. if err != nil {
  108. fmt.Println(err.Error())
  109. break
  110. }
  111. time.Sleep(time.Second * 5)
  112. }
  113. }
  114. func Chat(w http.ResponseWriter, rq *http.Request, uid uint) {
  115. // 升级为websocket
  116. conn, err := UP.Upgrade(w, rq, nil)
  117. if err != nil {
  118. response.Fail(w, err.Error(), http.StatusUpgradeRequired)
  119. return
  120. }
  121. pip := make(chan *model.Message, 1024)
  122. client := client{
  123. conn: conn,
  124. pip: pip,
  125. }
  126. Conns[uid] = &client
  127. go send(conn, uid)
  128. go reception(conn, uid)
  129. go heartBeat(conn, uid)
  130. response.Success(w, "success", nil)
  131. }
  132. // LinkToServer
  133. // @Tags 聊天模块
  134. // @Summary 与服务端进行websocket连接,请使用postman测试
  135. // @Success 200 {object} response.Response
  136. // @Router /chat [get]
  137. func LinkToServer(c *gin.Context) {
  138. uid, err := strconv.Atoi(c.Query("uid"))
  139. if err != nil {
  140. response.Fail(c.Writer, "failed", 500)
  141. return
  142. }
  143. Chat(c.Writer, c.Request, uint(uid))
  144. }
  145. // GetMsgFromPaged
  146. // @Tags 聊天模块
  147. // @Summary 获取未过期且来源为特定用户的聊天记录
  148. // @Success 200 {object} response.Response
  149. // @Param page path int true "页数"
  150. // @Param pageSize path int true "一页的大小"
  151. // @Param uid query string true "用户id"
  152. // @Router /chat/from/{page}/{pageSize} [get]
  153. func GetMsgFromPaged(c *gin.Context) {
  154. id := c.Query("uid")
  155. p := c.Param("page")
  156. pS := c.Param("pageSize")
  157. page, err := strconv.Atoi(p)
  158. pageSize, err := strconv.Atoi(pS)
  159. if err != nil {
  160. response.Fail(c.Writer, err.Error(), 500)
  161. return
  162. }
  163. uid, err := strconv.ParseUint(id, 10, 64)
  164. if err != nil {
  165. msg := err.Error()
  166. response.Fail(c.Writer, msg, 500)
  167. return
  168. }
  169. rsp, err := model.Message{}.GetFrom(uint(uid), page, pageSize)
  170. if err != nil {
  171. response.Fail(c.Writer, err.Error(), 500)
  172. return
  173. }
  174. response.Success(c.Writer, "success", rsp)
  175. }
  176. // GetMsgToPaged
  177. // @Tags 聊天模块
  178. // @Summary 获取未过期且目标为特定用户的聊天记录
  179. // @Param page path int true "页数"
  180. // @Param pageSize path int true "一页的大小"
  181. // @Param uid query string true "用户id"
  182. // @Success 200 {object} response.Response
  183. // @Router /chat/to/{page}/{pageSize} [get]
  184. func GetMsgToPaged(c *gin.Context) {
  185. id := c.Query("uid")
  186. p := c.Param("page")
  187. pS := c.Param("pageSize")
  188. page, err := strconv.Atoi(p)
  189. pageSize, err := strconv.Atoi(pS)
  190. if err != nil {
  191. response.Fail(c.Writer, err.Error(), 500)
  192. return
  193. }
  194. uid, err := strconv.ParseUint(id, 10, 64)
  195. if err != nil {
  196. msg := err.Error()
  197. response.Fail(c.Writer, msg, 500)
  198. return
  199. }
  200. rsp, err := model.Message{}.GetTo(uint(uid), page, pageSize)
  201. if err != nil {
  202. response.Fail(c.Writer, err.Error(), 500)
  203. return
  204. }
  205. response.Success(c.Writer, "success", rsp)
  206. }