admin 发表于 2017-12-1 15:51:06

Netty实现服务器推Push(Android推送)

简介消息推送一般的思路就是:
1.轮询(Pull)客户端定时的去询问服务器是否有新消息需要下发;确点很明显Android后台不停的访问网络费电还浪费流量。
2.推送(Push)服务端有新消息立即发送给客户端,这就没有时间的延迟,消息及时到达。
当时需求过来之后就首先考虑的这两个,开发的角度Pull实现起来简单省事,但从用户来说省电和省流量才是主要的,所以最后选用Push。客户端与服务端使用长连接,客户端定时向服务端发送心跳包维持长连接。
那么这里会有一个问题,心跳包的周期多少才合理?由于移动无线网络的特点,推送服务的心跳周期并不能设置的太长,否则长连接会被释放,造成频繁的客户端重连,但是也不能设置太短,否则在当前缺乏统一心跳框架的机制下很容易导致信令风暴(例如微信心跳信令风暴问题)。具体的心跳周期并没有统一的标准,180S也许是个不错的选择,微信为300S。
更多关于Netty推送要点:Netty系列之Netty百万级推送服务设计要点
首先来看看本次推送的效果:
发送推送:
http://img.blog.csdn.net/20170412211729874?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvamVmZmxlbw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast
点击推送:
http://img.blog.csdn.net/20170412211744622?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvamVmZmxlbw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast
一、添加依赖最好使用Netty4.0,5.0仍然在测试阶段,stackoverflow有人说在android上有bug
服务端IDEA中Maven配置:
<dependency>    <groupId>io.netty</groupId>    <artifactId>netty-all</artifactId>    <version>4.0.0.Final</version>    <scope>compile</scope></dependency>
[*]1
[*]2
[*]3
[*]4
[*]5
[*]6
客户端AndridStdio中配置:
compile 'io.netty:netty-all:4.0.0.Final'
[*]1
二、自定义消息体Message
public class Message implements Serializable{    private static final long serialVersionUID = 1L;    private byte type;    private PushMsg msg;    //getter,setter
[*]1
[*]2
[*]3
[*]4
[*]5
[*]6
MessageType
public enum MessageType {    CONNECT_REQ((byte)1), CONNECT_SUCCESS((byte)2), CONNECT_FAIL((byte)3),    HEARTBEAT_REQ((byte)4), HEARTBEAT_RESP((byte)5), MSG_PUSH((byte)6);    private byte value;    private MessageType(byte value){      this.value = value;    }    public byte getValue() {      return value;    }    public void setValue(byte value) {      this.value = value;    }}
[*]1
[*]2
[*]3
[*]4
[*]5
[*]6
[*]7
[*]8
[*]9
[*]10
[*]11
[*]12
[*]13
[*]14
[*]15
[*]16
[*]17
[*]18
[*]19
[*]20
PushMessage
public class PushMsg implements Serializable{    private static final long serialVersionUID = 2L;    public String author_name;    public String date;    public String thumbnail_pic_s;//图片链接    public String title;    public String url;//详情链接
[*]1
[*]2
[*]3
[*]4
[*]5
[*]6
[*]7
[*]8
[*]9
在这里你可能会遇到一个异常,java.io.InvalidClassException,为什么会产生,怎么解决,详情看:http://blog.csdn.net/jeffleo/article/details/70147889
三、服务端1. 握手验证握手验证,这一步主要进行节点ID有效性校检,节点重复登录校检,Ip合法性校检,我在这实现了ip黑名单功能,当有恶意ip对服务端进行有害的操作时,可以将该ip拉黑,使得该ip连接认证失败
public class ConnectHandler extends SimpleChannelInboundHandler<Message> {    //增加黑名单功能    private String[] blackIps = {"192.168.199.201"};    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {      Message message = (Message) msg;      //如果是连接信息,判断是否是黑名单ip      if(message != null && message.getType() == MessageType.CONNECT_REQ.getValue()){            Message response = null;            boolean ok = true;            for (String ip : blackIps) {                InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();                if(address.getHostName().equals(ip)){                  ok = false;                }            }            response = ok ? buildMessage((byte)MessageType.CONNECT_SUCCESS.getValue()):                  buildMessage((byte) MessageType.CONNECT_FAIL.getValue());            ctx.writeAndFlush(response);      }else{            ctx.fireChannelRead(message);      }    }    @Override    protected void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception {    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {      ctx.close();      ctx.fireExceptionCaught(cause);    }    private Message buildMessage(byte result){      Message msg = new Message();      msg.setType(result);      return msg;    }}
[*]1
[*]2
[*]3
[*]4
[*]5
[*]6
[*]7
[*]8
[*]9
[*]10
[*]11
[*]12
[*]13
[*]14
[*]15
[*]16
[*]17
[*]18
[*]19
[*]20
[*]21
[*]22
[*]23
[*]24
[*]25
[*]26
[*]27
[*]28
[*]29
[*]30
[*]31
[*]32
[*]33
[*]34
[*]35
[*]36
[*]37
[*]38
[*]39
[*]40
[*]41
[*]42
2. 心跳检测这里心跳策略使用Ping-Pong机制,当链路处于空闲时,客户端主动发送Ping消息给服务端,服务端接受到Ping消息后,发送Pong应答给客户端,如果客户端连续发送N条Ping消息都没有接受到服务端返回的Pong消息,说明链路已经挂死或者对方处于异常,客户端关闭连接,间隔周期T发起重练(链断重连)
public class HeartBeatHandler extends SimpleChannelInboundHandler<Message>{    //加入到在线列表,只有在线用户才可以实时推送    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {      ChanneList.channels.add(ctx.channel());    }    @Override    protected void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception {      //如果是心跳包ping,则返回pong      if(message != null && message.getType() == MessageType.HEARTBEAT_REQ.getValue()){            Message response = buildMessage(MessageType.HEARTBEAT_RESP.getValue());            ctx.writeAndFlush(response);      }else{            ctx.fireChannelRead(message);      }    }    private Message buildMessage(byte result){      Message msg = new Message();      msg.setType(result);      return msg;    }}
[*]1
[*]2
[*]3
[*]4
[*]5
[*]6
[*]7
[*]8
[*]9
[*]10
[*]11
[*]12
[*]13
[*]14
[*]15
[*]16
[*]17
[*]18
[*]19
[*]20
[*]21
[*]22
[*]23
[*]24
[*]25
[*]26
当客户端和服务端握手成功后,把该channels放入在线列表,只有在线列表channel才可以实时推送
3. 服务端启动public class PushServer {    public void bind() throws Exception{      EventLoopGroup bossGroup = new NioEventLoopGroup();      EventLoopGroup workGroup = new NioEventLoopGroup();      ServerBootstrap bs = new ServerBootstrap();      bs.group(bossGroup, workGroup)                .channel(NioServerSocketChannel.class)                .option(ChannelOption.SO_BACKLOG, 1000)                .option(ChannelOption.SO_KEEPALIVE, true)                .childHandler(new ChannelInitializer<SocketChannel>() {                  @Override                  protected void initChannel(SocketChannel channel) throws Exception {                        ChannelPipeline p = channel.pipeline();                        p.addLast(new ObjectEncoder());                        p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));                        //心跳超时                        p.addLast(new ReadTimeoutHandler(100));                        p.addLast(new ConnectHandler());                        p.addLast(new HeartBeatHandler());                  }                });      bs.bind(8000).sync();      System.out.println("com.liu.nettypushtest.server 8000 start....");    }    //消息推送    public void push(){      List<Channel> channels = ChanneList.channels;      System.out.println("push 消息 + " + channels.size());      Message message = new Message();      message.setType(MessageType.MSG_PUSH.getValue());      PushMsg pushMsg = new PushMsg();      pushMsg.setAuthor_name("中新社");      pushMsg.setDate("2017-04-12 13:51");      pushMsg.setThumbnail_pic_s("http:\\/\\/05.imgmini.eastday.com\\/mobile\\/20170412\\/20170412135121_ff0cae3d2601191a77afa948a8424142_1_mwpm_03200403.jpeg");      pushMsg.setTitle("法国安娜思托保健品进军亚洲市场");      pushMsg.setUrl("http:\\/\\/mini.eastday.com\\/mobile\\/170412135121788.html");      message.setMsg(pushMsg);      for (Channel channel : channels){            channel.writeAndFlush(message);      }    }    public static void main(String[] args) throws Exception{      PushServer pushServer = new PushServer();      pushServer.bind();    }}
[*]1
[*]2
[*]3
[*]4
[*]5
[*]6
[*]7
[*]8
[*]9
[*]10
[*]11
[*]12
[*]13
[*]14
[*]15
[*]16
[*]17
[*]18
[*]19
[*]20
[*]21
[*]22
[*]23
[*]24
[*]25
[*]26
[*]27
[*]28
[*]29
[*]30
[*]31
[*]32
[*]33
[*]34
[*]35
[*]36
[*]37
[*]38
[*]39
[*]40
[*]41
[*]42
[*]43
[*]44
[*]45
[*]46
[*]47
[*]48
[*]49
心跳超时的实现是采用了ReadTimeOutHandler机制
四、客户端1. 握手认证public class ConnectHandler extends SimpleChannelInboundHandler<Message> {    //三次握手成功,发送登录验证    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {      Message message = new Message();      message.setType(MessageType.CONNECT_REQ.getValue());      ctx.writeAndFlush(message);    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {      System.out.println("read");      Message message = (Message) msg;      //登录验证失败      if(message != null && message.getType() == MessageType.CONNECT_FAIL.getValue()){            ctx.close();      }else if(message.getType() == MessageType.CONNECT_SUCCESS.getValue()){//登录验证成功            System.out.println("login is ok....");            ctx.fireChannelRead(message);      }else{            ctx.fireChannelRead(message);      }    }    @Override    protected void channelRead0(ChannelHandlerContext ctx,Message message) throws Exception {    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {      ctx.close();      ctx.fireExceptionCaught(cause);    }    private Message buildMessage(byte result){      Message msg = new Message();      msg.setType(result);      return msg;    }}
[*]1
[*]2
[*]3
[*]4
[*]5
[*]6
[*]7
[*]8
[*]9
[*]10
[*]11
[*]12
[*]13
[*]14
[*]15
[*]16
[*]17
[*]18
[*]19
[*]20
[*]21
[*]22
[*]23
[*]24
[*]25
[*]26
[*]27
[*]28
[*]29
[*]30
[*]31
[*]32
[*]33
[*]34
[*]35
[*]36
[*]37
[*]38
[*]39
[*]40
[*]41
[*]42
[*]43
2. 心跳检测public class HeartBeatHandler extends SimpleChannelInboundHandler<Message>{    private volatile ScheduledFuture<?> heartBeat;    @Override    protected void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception {      //如果是心跳包      if(message != null && message.getType() == MessageType.CONNECT_SUCCESS.getValue()){            //50秒钟发一个心跳            heartBeat = ctx.executor().scheduleAtFixedRate(                  new HeartBeatTask(ctx), 0, 50000, TimeUnit.MILLISECONDS);      }else if(message != null &&                message.getType() == MessageType.HEARTBEAT_RESP.getValue()){            System.out.println("Client reciver heart beat message : ----> " + message);      }else{            //编码好的Message传递给下一个Handler            ctx.fireChannelRead(message);      }    }    private Message buildMessage(byte result){      Message msg = new Message();      msg.setType(result);      return msg;    }    //心跳包发送任务    private class HeartBeatTask implements Runnable{      private ChannelHandlerContext ctx;      public HeartBeatTask(ChannelHandlerContext ctx) {            this.ctx = ctx;      }      public void run() {            Message message = buildHeartMessage();            System.out.println("Client send heart beat message : ----> " + message);            ctx.writeAndFlush(message);      }      private Message buildHeartMessage(){            Message message = new Message();            message.setType(MessageType.HEARTBEAT_REQ.getValue());            return message;      }    }}
[*]1
[*]2
[*]3
[*]4
[*]5
[*]6
[*]7
[*]8
[*]9
[*]10
[*]11
[*]12
[*]13
[*]14
[*]15
[*]16
[*]17
[*]18
[*]19
[*]20
[*]21
[*]22
[*]23
[*]24
[*]25
[*]26
[*]27
[*]28
[*]29
[*]30
[*]31
[*]32
[*]33
[*]34
[*]35
[*]36
[*]37
[*]38
[*]39
[*]40
[*]41
[*]42
[*]43
[*]44
[*]45
[*]46
[*]47
[*]48
50s发送一个心跳包
3. 接受推送包public class PushMsgHandler extends SimpleChannelInboundHandler<Message>{    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {      Message message = (Message) msg;      PushMsg pushMsg = message.getMsg();      System.out.println(pushMsg);      if(message.getType() == MessageType.MSG_PUSH.getValue()){            NotificationManager manager = (NotificationManager) MainApplication.getContext().getSystemService(MainApplication.getContext().NOTIFICATION_SERVICE);            Notification.Builder builder = new Notification.Builder(MainApplication.getContext());            //创建消息,设置点击效果            Intent i = new Intent(MainApplication.getContext(), NewsDetailActivity.class);            NewsDetailList newsDetail = new NewsDetailList();            newsDetail.author_name = pushMsg.getAuthor_name();            newsDetail.date = pushMsg.getDate();            newsDetail.thumbnail_pic_s = pushMsg.getThumbnail_pic_s();            newsDetail.title = pushMsg.getTitle();            newsDetail.url = pushMsg.getUrl();            i.putExtra(Config.NEWS_DATA, newsDetail);            PendingIntent intent = PendingIntent.getActivity(MainApplication.getContext(), 0,                  i, PendingIntent.FLAG_ONE_SHOT);            builder.setContentTitle(pushMsg.getTitle())//设置通知栏标题                  .setContentText(pushMsg.getAuthor_name()) //设置通知栏显示内容                  .setTicker("ok社区") //通知首次出现在通知栏,带上升动画效果的                  .setWhen(System.currentTimeMillis())//通知产生的时间,会在通知信息里显示,一般是系统获取到的时间                  .setContentIntent(intent)                  .setAutoCancel(true)//设置这个标志当用户单击面板就可以让通知将自动取消                  //.setOngoing(false)//ture,设置他为一个正在进行的通知。他们通常是用来表示一个后台任务,用户积极参与(如播放音乐)或以某种方式正在等待,因此占用设备(如一个文件下载,同步操作,主动网络连接)                  .setDefaults(Notification.DEFAULT_VIBRATE)//向通知添加声音、闪灯和振动效果的最简单、最一致的方式是使用当前的用户默认设置,使用defaults属性,可以组合                            //Notification.DEFAULT_ALLNotification.DEFAULT_SOUND 添加声音 // requires VIBRATE permission                  .setSmallIcon(R.mipmap.logo);//设置通知小ICON            manager.notify(1, builder.build());      }    }    @Override    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception {    }}
[*]1
[*]2
[*]3
[*]4
[*]5
[*]6
[*]7
[*]8
[*]9
[*]10
[*]11
[*]12
[*]13
[*]14
[*]15
[*]16
[*]17
[*]18
[*]19
[*]20
[*]21
[*]22
[*]23
[*]24
[*]25
[*]26
[*]27
[*]28
[*]29
[*]30
[*]31
[*]32
[*]33
[*]34
[*]35
[*]36
[*]37
[*]38
[*]39
[*]40
[*]41
[*]42
4. 客户端启动和断错重连public class PushClient {    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);    //换掉ip    private String host = "";    private int port = 8000;    public void connect() throws Exception {      try{            EventLoopGroup group = new NioEventLoopGroup();            Bootstrap bs = new Bootstrap();            bs.group(group)                  .channel(NioSocketChannel.class)                  .option(ChannelOption.SO_KEEPALIVE, true)                  .handler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel channel) throws Exception {                            ChannelPipeline p = channel.pipeline();                            p.addLast(new IdleStateHandler(20, 10, 0));                            p.addLast(new ObjectEncoder());                            p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));                            p.addLast(new ReadTimeoutHandler(100));                            p.addLast(new ConnectHandler());                            p.addLast(new HeartBeatHandler());                            p.addLast(new PushMsgHandler());                        }                  });            System.out.println("开始连接");            ChannelFuture future = bs.connect(new InetSocketAddress(host, port)).sync();            future.channel().closeFuture().sync();//这一步会阻塞住            System.out.println("关闭后");      } finally {            //断错重连            executor.execute(new Runnable() {                public void run() {                  System.out.println("Client 尝试重新连接-->>>>>>");                  //等待InterVAl时间,重连                  try {                        TimeUnit.SECONDS.sleep(5);                        //发起重连                        connect();                  } catch (Exception e) {                        e.printStackTrace();                  }                }            });      }    }}
[*]1
[*]2
[*]3
[*]4
[*]5
[*]6
[*]7
[*]8
[*]9
[*]10
[*]11
[*]12
[*]13
[*]14
[*]15
[*]16
[*]17
[*]18
[*]19
[*]20
[*]21
[*]22
[*]23
[*]24
[*]25
[*]26
[*]27
[*]28
[*]29
[*]30
[*]31
[*]32
[*]33
[*]34
[*]35
[*]36
[*]37
[*]38
[*]39
[*]40
[*]41
[*]42
[*]43
[*]44
[*]45
[*]46
[*]47
[*]48
[*]49
[*]50
5. 在Android中,把PushClient的启动作为服务,使其在后台也能接受推送public class PushMsgHandler extends SimpleChannelInboundHandler<Message>{    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {      Message message = (Message) msg;      if(message.getType() == MessageType.MSG_PUSH.getValue()){            System.out.println(message.getMsg());      }    }    @Override    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception {    }}
[*]1
[*]2
[*]3
[*]4
[*]5
[*]6
[*]7
[*]8
[*]9
[*]10
[*]11
[*]12
[*]13
[*]14
[*]15
五、如何进行推送?
[*]目前我想到的是连接一个专门用于推送的客户端,当该客户端将消息发送到服务端,并且type为PUSH_REQ,说明有推送的请求,判断为该type,则调用PushServer 的 push操作,这样就能完成推送
[*]在网页上填写推送的消息,然后推送到服务端,这个暂时不知道该怎么实现,有知道的同学请指教


来自:http://blog.csdn.net/jeffleo/article/details/70147719


页: [1]
查看完整版本: Netty实现服务器推Push(Android推送)