您的位置:新葡亰496net > 奥门新萄京娱乐场 > 新葡亰496net:技术学习,memcache遍布式和要注意的

新葡亰496net:技术学习,memcache遍布式和要注意的

发布时间:2019-07-21 20:33编辑:奥门新萄京娱乐场浏览(196)

      用一了段时间NSQ还是很稳定的。除了稳定,还有一个特别值的说的就是部署非常简单。总想写点什么推荐给大家使用nsq来做一些东西。但是就是因为他太简单易用,文档也比较简单易懂。一直不知道要写啥!!!!!

    Docker 技术学习

    Memcache的分布式介绍

    memcached虽然称为“分布式”缓存服务器,但服务器端并没有“分布式”功能。服务器端仅包括内存存储功能,其实现非常简单。至于memcached的分布式,则是完全由客户端程序库实现的。这种分布式是memcached的最大特点。

    前言

    本篇文章主要介绍的是SpringBoot整合Netty以及使用Protobuf进行数据传输的相关内容。Protobuf会简单的介绍下用法,至于Netty在之前的文章中已经简单的介绍过了,这里就不再过多细说了。

    Docker是什么?

      nsq官网: 

    • Docker 简介

    • 什么是容器 :

    • 新葡亰496net:技术学习,memcache遍布式和要注意的难点。是一种虚拟化的方案,与传统的虚拟机不同,传统的虚拟机是将一台或者多台独立的机器虚拟于独立的硬件之上,而容器是直接运行在操作系统内核之上的内核空间,容器虚拟化也被称为操作系统虚拟化。

    • 操作系统级别的虚拟化

    • 只能运行相同或者相似内核的操作系统

    • 依赖与Linux内核特性:Namespace 和Cgroups(Control Group)

    • 容器技术优点 :

    Memcached的分布式是什么意思?

    这里多次使用了“分布式”这个词,但并未做详细解释。现在开始简单地介绍一下其原理,各个客户端的实现基本相同。

    下面假设memcached服务器有node1~node3三台,应用程序要保存键名为“tokyo”“kanagawa”“chiba”“saitama”“gunma”的数据。

    首先向memcached中添加“tokyo”。将“tokyo”传给客户端程序库后,客户端实现的算法就会根据“键”来决定保存数据的memcached服务器。服务器选定后,即命令它保存“tokyo”及其值。

    同样,“kanagawa”“chiba”“saitama”“gunma”都是先选择服务器再保存。接下来获取保存的数据。获取时也要将要获取的键“tokyo”传递给函数库。函数库通过与数据保存时相同的算法,根据“键”选择服务器。使用的算法相同,就能选中与保存时相同的服务器,然后发送get命令。只要数据没有因为某些原因被删除,就能获得保存的值。

    这样,将不同的键保存到不同的服务器上,就实现了memcached的分布式。 memcached服务器增多后,键就会分散,即使一台memcached服务器发生故障无法连接,也不会影响其他的缓存,系统依然能继续运行。

    Protobuf

    简单来说,Docker平台是关于使用容器来使得创建、部署、运营变得更简单的一切相关内容。容器可以让开发者将一个应用程序打包,里面包含了必须的部分,如应用程序依赖的库和其他元素,以包的形式发布出去。通过将app和相关的元素添加到容器内,开发者确保这个apps能够运行在任何Linux机器上,不管机器有怎样的默认配置,或者这个机器与编写测试代码的机器有多大区别。这点对开发者而言很有用,因为它使得整个生命周期内对app操作变得很容易。

      新葡亰496net 1

    新葡亰496net 2image.png

    Php memcache实现分布式:

    我们PHP的PECL中的Memcache扩展能够有效的解决Memcache的分布式问题,主要的接口就是 addServer() 函数,具体关于addServer()函数的实现可以参考该扩展源代码。那么现在就存在第二个问题,就是说无法同步数据,可以理解为MySQL中Master/Slave的机制,就是说如果我们有多台的Memcache服务器,使用addServer函数的话,每个服务器存储的数据都是唯一的,也就是说每个memcached服务器上存储的数据不是统一的,而是各自保存了不通的数据。

    介绍

    protocolbuffer(以下简称PB)是google 的一种数据交换的格式,它独立于语言,独立于平台。google 提供了多种语言的实现:java、c#、c 、go 和python,每一种实现都包含了相应语言的编译器以及库文件。由于它是一种二进制的格式,比使用 xml进行数据交换快许多。可以把它用于分布式应用之间的数据通信或者异构环境下的数据交换。作为一种效率和兼容性都很优秀的二进制数据传输格式,可以用于诸如网络传输、配置文件、数据存储等诸多领域。

    官方地址:

    docker-logo

      为了容灾需要对nsqd多机器部属,有了Docker后,快速扩还是很方便的。

    1.虚拟机对内存和CPU的损耗相当大

    php memcache分布式系统的存在的问题

    在Memcache的实际使用中,遇到的最严重的问题,就是在增减服务器的时候,会导致大范围的缓存丢失,从而可能会引导数据库的性能瓶颈,为了避免出现这种情况,请先看Consistent hashing算法,中文的介绍可以参考这里,通过存取时选定服务器算法的改变,来实现。

    新葡亰496net,修改PHP的Memcache扩展memcache.c的源代码中的

    "memcache.hash_strategy" = standard

    "memcache.hash_strategy" = consistent

    重新编译,这时候就是使用Consistent hashing算法来寻找服务器存取数据了。

    有效测试数据表明,使用Consistent hashing可以极大的改善增删Memcache时缓存大范围丢失的情况。
    NonConsistentHash: 92% of lookups changed after adding a target to the existing 10
    NonConsistentHash: 90% of lookups changed after removing 1 of 10 targets
    ConsistentHash: 6% of lookups changed after adding a target to the existing 10
    ConsistentHash: 9% of lookups changed after removing 1 of 10 targets

    使用

    这里的使用就只介绍Java相关的使用。
    首先我们需要建立一个proto文件,在该文件定义我们需要传输的文件。
    例如我们需要定义一个用户的信息,包含的字段主要有编号、名称、年龄。
    新葡亰496net:技术学习,memcache遍布式和要注意的难点。那么该protobuf文件的格式如下:
    :这里使用的是proto3,相关的注释我已写了,这里便不再过多讲述了。需要注意一点的是proto文件和生成的Java文件名称不能一致!

    syntax = "proto3";
    // 生成的包名
    option java_package="com.pancm.protobuf";
    //生成的java名
    option java_outer_classname = "UserInfo";
    
    message UserMsg {  
    
         // ID  
         int32 id = 1;  
    
        // 姓名  
         string name = 2;  
    
        // 年龄  
          int32 age = 3;  
    
         // 状态 
         int32 state = 4;  
    } 
    

    创建好该文件之后,我们把该文件和protoc.exe(生成Java文件的软件)放到E盘目录下的protobuf文件夹下,然后再到该目录的dos界面下输入:protoc.exe --java_out=文件绝对路径名称
    例如:

    protoc.exe --java_out=E:protobuf User.proto
    

    输入完之后,回车即可在同级目录看到已经生成好的Java文件,然后将该文件放到项目中该文件指定的路径下即可。

    注:生成protobuf的文件软件和测试的protobuf文件我也整合到该项目中了,可以直接获取的。

    Java文件生成好之后,我们再来看怎么使用。
    这里我就直接贴代码了,并且将注释写在代码中,应该更容易理解些吧。。。
    代码示例:

         // 按照定义的数据结构,创建一个对象  
            UserInfo.UserMsg.Builder userInfo = UserInfo.UserMsg.newBuilder();  
            userInfo.setId(1);
            userInfo.setName("xuwujing");
            userInfo.setAge(18);
            UserInfo.UserMsg userMsg = userInfo.build();  
            // 将数据写到输出流 
            ByteArrayOutputStream output = new ByteArrayOutputStream();  
            userMsg.writeTo(output);  
            // 将数据序列化后发送 
            byte[] byteArray = output.toByteArray();  
            // 接收到流并读取
            ByteArrayInputStream input = new ByteArrayInputStream(byteArray);  
            // 反序列化  
            UserInfo.UserMsg userInfo2 = UserInfo.UserMsg.parseFrom(input);  
            System.out.println("id:"   userInfo2.getId());  
            System.out.println("name:"   userInfo2.getName());  
            System.out.println("age:"   userInfo2.getAge());  
    

    注:这里说明一点,因为protobuf是通过二进制进行传输,所以需要注意下相应的编码。还有使用protobuf也需要注意一下一次传输的最大字节长度。

    输出结果:

    id:1
    name:xuwujing
    age:18
    

    新葡亰496net 3

      部署完后我会用go和c#写一些代码方便大家学习。

    2.Docker磁盘占用空间更少,虚拟机需要包含完整的操作系统。

    php memcache具体实现

    基础环境
    其实基于PHP扩展的Memcache客户端实际上早已经实现,而且非常稳定。先解释一些名词,Memcache是danga.com的一个开源项目,可以类比于MySQL这样的服务,而PHP扩展的Memcache实际上是连接Memcache的方式。

    首先,进行Memcache被安装具体可查看:
    Linux下的Memcache安装:
    Windows下的Memcache安装:
    其次,进行PHP扩展的安装,官方地址是
    最后,启动Memcache服务,比如这样

    /usr/local/bin/memcached -d -p 11213 -u root -m 10 -c 1024 -t 8 -P /tmp/memcached.pid
    /usr/local/bin/memcached -d -p 11214 -u root -m 10 -c 1024 -t 8 -P /tmp/memcached.pid
    /usr/local/bin/memcached -d -p 11215 -u root -m 10 -c 1024 -t 8 -P /tmp/memcached.pid

    启动三个只使用10M内存以方便测试。

    分布式部署
    PHP的PECL扩展中的memcache实际上在2.0.0的版本中就已经实现多服务器支持,现在都已经2.2.5了。请看如下代码

    $memcache = new Memcache;
    $memcache->addServer('localhost', 11213);
    $memcache->addServer('localhost', 11214);
    $memcache->addServer('localhost', 11215);
    $memStats = $memcache->getExtendedStats();
    print_r($memStats);

    通过上例就已经实现Memcache的分布式部署,是不是非常简单。

    版权声明:本文为博主原创文章,未经博主允许不得转载。


    SpringBoot整合Netty

    说明:如果想直接获取工程那么可以直接跳到底部,通过链接下载工程代码。

    Docker是一种类似虚拟机的东西,但它不会创建一个完整的操作系统,它让应用程序具有这个优势,应用程序运行在一个系统上就如同它运行在Linux内核上一样。那样的话,应用程序只用添加不在宿主机上的东西而不是一整个OS。这就意味着,apps更小,比那些依赖系统的apps性能也更好。它也有其他一些好处。

     

    3.容器技术,只需要包含应用及其依赖的库。

    开发准备

    环境要求
    JDK::1.8
    Netty::4.0或以上(不包括5)
    Protobuf:3.0或以上

    如果对Netty不熟的话,可以看看我之前写的一些文章。大神请无视~。~
    地址:

    首先还是Maven的相关依赖:

    <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <java.version>1.8</java.version>
            <netty.version>4.1.22.Final</netty.version>
            <protobuf.version>3.5.1</protobuf.version>
            <springboot>1.5.9.RELEASE</springboot>
            <fastjson>1.2.41</fastjson>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
    
    
        <dependencies>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
                <version>${springboot}</version>
            </dependency>
    
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <version>${springboot}</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
                <version>${springboot}</version>
                <optional>true</optional>
            </dependency>
    
    
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>${netty.version}</version>
            </dependency>
    
            <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
                <version>${protobuf.version}</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>${fastjson}</version>
            </dependency>
    
    
        <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency> 
    </dependencies>
    

    添加了相应的maven依赖之后,配置文件这块暂时没有什么可以添加的,因为暂时就一个监听的端口而已。

    对开发者和系统管理员而言,Docker是一个分布式应用的开放平台。它为基础设施不可知的CaaS模型提供了一个集成的套件。有了Docker,IT运营团队能够对基础设施资源和基本应用程序内容提供保护,管理,而开发者可以以一种自助的形式编译和发布他们的应用。

      准备工作:

    • 什么是Docker ?

    • 将应用程序自动部署到容器的开源引擎

    • Go语言开源引擎,Github 地址

    • 2013年初 dotCloud(现在的Docker公司) 基于Apache 2.0 开源授权协议发行

    • 应用程序部署引擎,目标就是提供一个轻量快速的引擎。

    • Docker 的目标

    • 提供简单轻量的建模方式 :docker非常容易上手,用户只需要几分钟就可以把自己的程序docker化,大多数的docker容器只需要不到1s就可以运行起来。

    • 职责的逻辑分离:使用docker开发人员只要关心容器中运行的应用程序,而运维人员只需要关心如何管理容器 ,Docker的设计目的就是加强开发人员写代码的开发环境与应用程序运行和部署的环境的一致性。

    • 快速高效的开发生命周期: 缩短代码到开发、测试、部署上线运行的周期。让你的应用程序具备可一致性。在容器中开发,以容器的模式交付和分发。

    • 鼓励使用面向服务的架构: docker推荐一个容器只运行一个应用程序或者进程,这样就形成了分布式的应用程序模型,应用程序或者服务就可以表示为一些了内部互相的应用程序。高内聚、低耦合,单一任务。分布式应用程序是的扩展或者活调试应用程序变得简单。避免在同一服务器上部署不同服务时可能带来的不同服务之间的不同影响, 这样在运行过程中出现问题比较容易定位问题所在。

    • Docker的使用场景

    • 使用docker容器开发、测试、部署服务。docker本身比较轻量化,开发人员可以可以开发、分享容器,容器可以在开发环境中创建,然后提交给测试和进入生产环境。

    • 创建隔离的运行环境.

    • 搭建测试环境:开发者利用docker在本地搭建测试环境。搭建用来测程序在不同系统下的兼容性,

    • 构建多用户的平台级服务基础设施

    • 提供软件级服务应用程序

    • 高性能、超大规模的宿主机部署。 目的巨大部分的公有云服务都提供了对docker的支持。

    • Docker 的基本组成

    • Docker Client 客户端:

    • Docker Daemon 守护进程

    • Docker Image 镜像

    • Docker Container 容器

    • Dcoker Registry 仓库

      Docker 客户端/守护进程 , 是C/S 架构程序, 客户端想守护进程 发送请求,客户端对服务器端的访问既可以是通过本地也可以通过远程,即 本地/远程 。Docker的CS架构 如下图:

    代码编写

    代码模块主要分为服务端和客户端。
    主要实现的业务逻辑:
    服务端启动成功之后,客户端也启动成功,这时服务端会发送一条protobuf格式的信息给客户端,然后客户端给予相应的应答。客户端与服务端连接成功之后,客户端每个一段时间会发送心跳指令给服务端,告诉服务端该客户端还存过中,如果客户端没有在指定的时间发送信息,服务端会关闭与该客户端的连接。当客户端无法连接到服务端之后,会每隔一段时间去尝试重连,只到重连成功!

    关键的优势

      》两台服务器:192.168.0.49; 192.168.0.105.

    新葡亰496net 4image.png

    服务端

    首先是编写服务端的启动类,相应的注释在代码中写得很详细了,这里也不再过多讲述了。不过需要注意的是,在之前的我写的Netty文章中,是通过main方法直接启动服务端,因此是直接new一个对象的。而在和SpringBoot整合之后,我们需要将Netty交给springBoot去管理,所以这里就用了相应的注解。
    代码如下:

    @Service("nettyServer")
    public class NettyServer {
        private static final int port = 9876; // 设置服务端端口
        private static EventLoopGroup boss = new NioEventLoopGroup(); // 通过nio方式来接收连接和处理连接
        private static EventLoopGroup work = new NioEventLoopGroup(); // 通过nio方式来接收连接和处理连接
        private static ServerBootstrap b = new ServerBootstrap();
    
        @Autowired
        private NettyServerFilter nettyServerFilter;
    
    
        public void run() {
            try {
                b.group(boss, work);
                b.channel(NioServerSocketChannel.class);
                b.childHandler(nettyServerFilter); // 设置过滤器
                // 服务器绑定端口监听
                ChannelFuture f = b.bind(port).sync();
                System.out.println("服务端启动成功,端口是:"   port);
                // 监听服务器关闭监听
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 关闭EventLoopGroup,释放掉所有资源包括创建的线程
                work.shutdownGracefully();
                boss.shutdownGracefully();
            }
        }
    }
    

    服务端主类编写完毕之后,我们再来设置下相应的过滤条件。
    这里需要继承Netty中ChannelInitializer类,然后重写initChannel该方法,进行添加相应的设置,如心跳超时设置,传输协议设置,以及相应的业务实现类。
    代码如下:

        @Component
         public class NettyServerFilter extends ChannelInitializer<SocketChannel> {
    
        @Autowired
        private NettyServerHandler nettyServerHandler;
    
         @Override
         protected void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline ph = ch.pipeline();
    
             //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
             ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
             // 解码和编码,应和客户端一致
             //传输的协议 Protobuf
             ph.addLast(new ProtobufVarint32FrameDecoder());
             ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance()));
             ph.addLast(new ProtobufVarint32LengthFieldPrepender());
             ph.addLast(new ProtobufEncoder());
    
             //业务逻辑实现类
             ph.addLast("nettyServerHandler", nettyServerHandler);
           }
         }
    

    服务相关的设置的代码写完之后,我们再来编写主要的业务代码。
    使用Netty编写业务层的代码,我们需要继承ChannelInboundHandlerAdapterSimpleChannelInboundHandler类,在这里顺便说下它们两的区别吧。
    继承SimpleChannelInboundHandler类之后,会在接收到数据后会自动release掉数据占用的Bytebuffer资源。并且继承该类需要指定数据格式。
    而继承ChannelInboundHandlerAdapter则不会自动释放,需要手动调用ReferenceCountUtil.release()等方法进行释放。继承该类不需要指定数据格式。
    所以在这里,个人推荐服务端继承ChannelInboundHandlerAdapter,手动进行释放,防止数据未处理完就自动释放了。而且服务端可能有多个客户端进行连接,并且每一个客户端请求的数据格式都不一致,这时便可以进行相应的处理。
    客户端根据情况可以继承SimpleChannelInboundHandler类。好处是直接指定好传输的数据格式,就不需要再进行格式的转换了。

    代码如下:

    @Service("nettyServerHandler")
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        /** 空闲次数 */
        private int idle_count = 1;
        /** 发送次数 */
        private int count = 1;
    
    
        /**
         * 建立连接时,发送一条消息
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("连接的客户端地址:"   ctx.channel().remoteAddress());
            UserInfo.UserMsg userMsg = UserInfo.UserMsg.newBuilder().setId(1).setAge(18).setName("xuwujing").setState(0)
                    .build();
            ctx.writeAndFlush(userMsg);
            super.channelActive(ctx);
        }
    
        /**
         * 超时处理 如果5秒没有接受客户端的心跳,就触发; 如果超过两次,则直接关闭;
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
            if (obj instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) obj;
                if (IdleState.READER_IDLE.equals(event.state())) { // 如果读通道处于空闲状态,说明没有接收到心跳命令
                    System.out.println("已经5秒没有接收到客户端的信息了");
                    if (idle_count > 1) {
                        System.out.println("关闭这个不活跃的channel");
                        ctx.channel().close();
                    }
                    idle_count  ;
                }
            } else {
                super.userEventTriggered(ctx, obj);
            }
        }
    
        /**
         * 业务逻辑处理
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("第"   count   "次"   ",服务端接受的消息:"   msg);
            try {
                // 如果是protobuf类型的数据
              if (msg instanceof UserMsg) {
                    UserInfo.UserMsg userState = (UserInfo.UserMsg) msg;
                    if (userState.getState() == 1) {
                        System.out.println("客户端业务处理成功!");
                    } else if(userState.getState() == 2){
                        System.out.println("接受到客户端发送的心跳!");
                    }else{
                        System.out.println("未知命令!");
                    }
                } else {
                    System.out.println("未知数据!"   msg);
                    return;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                ReferenceCountUtil.release(msg);
            }
            count  ;
        }
    
        /**
         * 异常处理
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    

    还有个服务端的启动类,之前是通过main方法直接启动, 不过这里改成了通过springBoot进行启动,差别不大。
    代码如下:

    @SpringBootApplication
    public class NettyServerApp {
    
        public static void main(String[] args) {
            // 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件
            ApplicationContext context = SpringApplication.run(NettyServerApp.class, args);
            NettyServer nettyServer = context.getBean(NettyServer.class);
            nettyServer.run();
        }
    
    }
    

    到这里服务端相应的代码就编写完毕了。

    开源

      》需要在两台机器上安装好Docker

    Docker image 镜像:镜像是容器的基石,容器基于镜像启动和运行,镜像好比容器的源代码,保存了基于容器的各种条件,docker镜像是有个层叠的只读文件系统, 最低端是一个引导文件系统bootfs (好比linux的引导文件系统),docker用户几乎不可能跟引导文件有交互,到一个容器启动后,将会被已到内容中而引导文件系统将会被卸载。docker镜像的第二层是rootfs,root文件系统。 docker将这样的文件系统称为镜像。一个镜像可以放到另一个镜像的顶部,位于下面的镜像称为父镜像。

    客户端

    客户端这边的代码和服务端的很多地方都类似,我就不再过多细说了,主要将一些不同的代码拿出来简单的讲述下。
    首先是客户端的主类,基本和服务端的差不多,也就是多了监听的端口和一个监听器(用来监听是否和服务端断开连接,用于重连)。
    主要实现的代码逻辑如下:

        public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) {
            ChannelFuture f = null;
            try {
                if (bootstrap != null) {
                    bootstrap.group(eventLoopGroup);
                    bootstrap.channel(NioSocketChannel.class);
                    bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
                    bootstrap.handler(nettyClientFilter);
                    bootstrap.remoteAddress(host, port);
                    f = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
                        final EventLoop eventLoop = futureListener.channel().eventLoop();
                        if (!futureListener.isSuccess()) {
                            System.out.println("与服务端断开连接!在10s之后准备尝试重连!");
                            eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);
                        }
                    });
                    if(initFalg){
                        System.out.println("Netty客户端启动成功!");
                        initFalg=false;
                    }
                    // 阻塞
                    f.channel().closeFuture().sync();
                }
            } catch (Exception e) {
                System.out.println("客户端连接失败!" e.getMessage());
            }
        }
    

    注:监听器这块的实现用的是JDK1.8的写法。

    客户端过滤其这块基本和服务端一直。不过需要注意的是,传输协议、编码和解码应该一致,还有心跳的读写时间应该小于服务端所设置的时间。
    改动的代码如下:

        ChannelPipeline ph = ch.pipeline();
            /*
             * 解码和编码,应和服务端一致
             * */
            //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式
            ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS)); 
    

    客户端的业务代码逻辑。
    主要实现的几点逻辑是心跳按时发送以及解析服务发送的protobuf格式的数据。
    这里比服务端多个个注解, 该注解Sharable主要是为了多个handler可以被多个channel安全地共享,也就是保证线程安全。
    废话就不多说了,代码如下:

        @Service("nettyClientHandler")
        @ChannelHandler.Sharable
        public class NettyClientHandler extends ChannelInboundHandlerAdapter {
        @Autowired
        private NettyClient nettyClient;
    
        /** 循环次数 */
        private int fcount = 1;
    
        /**
         * 建立连接时
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("建立连接时:"   new Date());
            ctx.fireChannelActive();
        }
    
        /**
         * 关闭连接时
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("关闭连接时:"   new Date());
            final EventLoop eventLoop = ctx.channel().eventLoop();
            nettyClient.doConnect(new Bootstrap(), eventLoop);
            super.channelInactive(ctx);
        }
    
        /**
         * 心跳请求处理 每4秒发送一次心跳请求;
         * 
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
            System.out.println("循环请求的时间:"   new Date()   ",次数"   fcount);
            if (obj instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) obj;
                if (IdleState.WRITER_IDLE.equals(event.state())) { // 如果写通道处于空闲状态,就发送心跳命令
                    UserMsg.Builder userState = UserMsg.newBuilder().setState(2);
                    ctx.channel().writeAndFlush(userState);
                    fcount  ;
                }
            }
        }
    
        /**
         * 业务逻辑处理
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 如果不是protobuf类型的数据
            if (!(msg instanceof UserMsg)) {
                System.out.println("未知数据!"   msg);
                return;
            }
            try {
    
                // 得到protobuf的数据
                UserInfo.UserMsg userMsg = (UserInfo.UserMsg) msg;
                // 进行相应的业务处理。。。
                // 这里就从简了,只是打印而已
                System.out.println(
                        "客户端接受到的用户信息。编号:"   userMsg.getId()   ",姓名:"   userMsg.getName()   ",年龄:"   userMsg.getAge());
    
                // 这里返回一个已经接受到数据的状态
                UserMsg.Builder userState = UserMsg.newBuilder().setState(1);
                ctx.writeAndFlush(userState);
                System.out.println("成功发送给服务端!");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                ReferenceCountUtil.release(msg);
            }
         }
        }
    

    那么到这里客户端的代码也编写完毕了。

    Docker的另一个关键因素是,它是完全开源的。这就意味着任何人都能给平台提出意见,当Docker本身不包含他们需要的特性时,可以适配并扩展它以满足他们自己的需求。所有这些使得对开发者和系统管理者而言,Docker是非常方便的选择。

      》两台机器上镜像的拉取 

    新葡亰496net 5image.png

    功能测试

    首先启动服务端,然后再启动客户端。
    我们来看看结果是否如上述所说。

    服务端输出结果:

    服务端启动成功,端口是:9876
    连接的客户端地址:/127.0.0.1:53319
    第1次,服务端接受的消息:state: 1
    
    客户端业务处理成功!
    第2次,服务端接受的消息:state: 2
    
    接受到客户端发送的心跳!
    第3次,服务端接受的消息:state: 2
    
    接受到客户端发送的心跳!
    第4次,服务端接受的消息:state: 2
    
    接受到客户端发送的心跳!
    

    客户端输入结果:

    Netty客户端启动成功!
    建立连接时:Mon Jul 16 23:31:58 CST 2018
    客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18
    成功发送给服务端!
    循环请求的时间:Mon Jul 16 23:32:02 CST 2018,次数1
    循环请求的时间:Mon Jul 16 23:32:06 CST 2018,次数2
    循环请求的时间:Mon Jul 16 23:32:10 CST 2018,次数3
    循环请求的时间:Mon Jul 16 23:32:14 CST 2018,次数4
    

    通过打印信息可以看出如上述所说。

    接下来我们再来看看客户端是否能够实现重连。
    先启动客户端,再启动服务端。

    客户端输入结果:

    Netty客户端启动成功!
    与服务端断开连接!在10s之后准备尝试重连!
    客户端连接失败!AbstractChannel$CloseFuture@1fbaa3ac(incomplete)
    建立连接时:Mon Jul 16 23:41:33 CST 2018
    客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18
    成功发送给服务端!
    循环请求的时间:Mon Jul 16 23:41:38 CST 2018,次数1
    循环请求的时间:Mon Jul 16 23:41:42 CST 2018,次数2
    循环请求的时间:Mon Jul 16 23:41:46 CST 2018,次数3
    

    服务端输出结果:

    服务端启动成功,端口是:9876
    连接的客户端地址:/127.0.0.1:53492
    第1次,服务端接受的消息:state: 1
    
    客户端业务处理成功!
    第2次,服务端接受的消息:state: 2
    
    接受到客户端发送的心跳!
    第3次,服务端接受的消息:state: 2
    
    接受到客户端发送的心跳!
    第4次,服务端接受的消息:state: 2
    

    结果也如上述所说!

    低开销

    docker pull nsqio/nsq
    

    Docker Container 容器:通过镜像来启动:docker的容器是docker的执行单元 ,容器中可以运行客户的一个或者多个进程,如果说镜像是docker安装包的构建和打包阶段,那么容器则是启动和执行阶段。启动和执行阶段写时负责(copy on write)

    其它

    关于SpringBoot整合Netty使用Protobuf进行数据传输到这里就结束了。
    SpringBoot整合Netty使用Protobuf进行数据传输的项目工程地址:

    对了,也有不使用springBoot整合的Netty项目工程地址:

    原创不易,如果感觉不错,希望给个推荐!您的支持是我写作的最大动力!
    版权声明:
    作者:虚无境
    博客园出处:
    CSDN出处:    
    个人博客出处:

    因为开发者不需要提供一个真实的虚拟环境一直到硬件级别,通过只创建运行时必要的库和OS部件,可以降低系统的开销。

      我们在105上启动lookup, nsqd和客户端都需要连接这个lookup。  

    新葡亰496net 6image.png

    新葡亰496net 7

    docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
    

    Docker Registry 仓库:docker 用仓库来保存用户构建的镜像,仓库分为公用和私有。docker公司自己通过了一个共有仓库Docker Hub ,

    敏捷

    新葡亰496net 8

    新葡亰496net 9image.png

    思路上,Docker是依赖速度和简单性来创建的。这也是它变得如此流行的部分原因。开发者现在能够非常简单地将软件和依赖元素打包到一个容器中。他们能使用任何编程语言,任何版本、任何工具,因为它们是一起打包到一个容器中,因而,事实上标准化了所有元素,同时也并未丢弃任何东西。

      

    Docker 在线演示地址:

    可移植性

      在105和49上启动nsqd, lookup的地址要写105

    Docker以一种全新方式使得应用程序容器变得完全可移植。开发者现在能够分发app从开发到测试到制作而又不破坏代码。机器环境的差异不会对打包进容器中的东西造成任何影响。制作中不需要改变app,这点对IT运营团队而言很有帮助,因为现在他们能够在数据中心之间移动apps而又避免供应商依赖。

    docker run --name nsqd -p 4150:4150 -p 4151:4151     nsqio/nsq /nsqd     --broadcast-address=192.168.0.105     --lookupd-tcp-address=192.168.0.105:4160
    
    docker run --name nsqd -p 4150:4150 -p 4151:4151     nsqio/nsq /nsqd     --broadcast-address=192.168.0.49     --lookupd-tcp-address=192.168.0.105:4160
    

    控制

     

    当apps在生命周期中移动时,Docker提供了对apps最大程度的控制,因为环境是标准化的。这也使得很容易回答在整个过程中有关安全性、管理型、和规模方面的问题。针对特定项目,IT团队可以定制需要的控制和灵活性级别来保持服务级别,性能和合规性。

     新葡亰496net 10

    它是怎么被创造的以及它是如何出现的?

     

    过去apps以一种非常不同的方式开发。有许多私人数据中心运行买来的程序,数据中心由庞大的代码库控制,每年都要升级一次。随着云计算的开发和应用,一切都改变了。现在世界上的公司都依赖于软件去与他们的客户联系,软件选项变得越来越定制化。

      到了这一步就可以写代码发送和接收信息了。但是还有一个管理系统需要启动一下。nsqadmin 

    随着软件变得更加复杂,服务,依赖性,基础设施,对获取app的终端状态提出了很多挑战。这就是Docker的来源。

    docker run --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=192.168.0.105:4161
    

    新葡亰496net 11

    新葡亰496net 12

    在2013年,Docker被研发出来作为一种编译、分发、运行应用程序的方式,在所有使用容器的地方。软件容器是一种软件的标准单元,它不受代码和包含在代码内部的依赖关系的影响。这使得开发者和系统管理员能够处理在不同基础设施和不同环境下移动软件而无需任何修改的需求。

     

    在2013年3月13日PyCon Lightning Talk – The future of Linux Containers大会上,Docker发布。Moby Dock,Docker吉祥物,几个月后被创造出来。同年9月,Docker和RedHat宣布联盟,引入了Fedora/RHEL兼容性。在2014年1月公司获得了1500万美元的B轮融资,同年8月Docker Engine1.2发布。2014年9月他们获得了4000万美元的C轮融资,在2014年12月31日,Docker已经拥有了1亿的容器下载量。在2015年4月,他们获得了9500万美元的D轮融资,拥有了3亿的容器下载量。

       用浏览器看一下管理端:  和 192.168.0.49。其他的你可以点开看看。

    它是怎么工作的?

    新葡亰496net 13

    Docker是一种容器即服务。为了理解它是怎么工作的,首先要了解一个Linux容器是什么。

      我用go语言 简单写一个发送信息的例子:

    新葡亰496net 14

      go使用的库是 go-nsq 地址  : github.com/nsqio/go-nsq

    Linux容器

      

    在一个通常的虚拟化环境中,虚拟机在系统管理程序(例如Xen, Hyper-V)的帮助下运行于物理机器之上。容器运行于操作系统内核之上的用户空间。每个容器都有自身队列的用户空间,在一个主机上可以运行多个不同容器。通过使用两种Linux内核特性:命名空间和控制组,一个主机中的容器能够彼此独立。

    func main() {
        config := nsq.NewConfig()
        // 随便给哪个ip发都可以
        //w1, _ := nsq.NewProducer("192.168.0.105:4150", config)
        w1, _ := nsq.NewProducer("192.168.0.49:4150", config)
    
        err1 := w1.Ping()
        if err1 != nil {
            log.Fatal("should not be able to ping after Stop()")
            return
        }
        defer w1.Stop()
        topicName := "publishtest"
        msgCount := 2
        for i := 1; i < msgCount; i   {
            err1 := w1.Publish(topicName, []byte("测试测试publis test case"))
            if err1 != nil {
                log.Fatal("error")
            }
        }
    }
    

    在Linux中有六个命名空间,它们允许容器包拥有自己的网络接口,IP地址,等等。容器使用的资源由控制组来管理,它可以允许你限制容器能够使用的CPU和存储资源数量。

      可以尝试给49和105都发送一次试试。再看一下我们的管理页面:

    Docker

      publishtest被ip105和49都发送过。但是还没有channel:

    Docker是一种容器引擎,它使用了Linux内核特性,使容器运行于OS之上,在容器中自动化app部署。它提供了一个轻量级环境来运行app代码,目的是创建一个更有效的工作流,当你在整个软件生命周期中移动app时。它运行于一个客户端-服务器架构。Docker Daemon负责与容器相关的所有动作,这个守护程序通过代理或REST APIs从Docker客户端获取命令。

    新葡亰496net 15

    容器从镜像中编译,这些镜像可以配置包含apps,作为创建容器的一个模板。它们组织成一层,镜像的每个变化会添加到它上面的一层。Docker镜像存储于Docker注册机中,开发者使用公用或私用的注册机去编译并在团队中共享镜像。Docker-hosted的注册机服务被称为DockerHub(Docker公共仓库),允许你从中心位置节点上传下载镜像。

     

    一旦你有了镜像,你可以创建一个容器,它是镜像的一个可写层。镜像通知Docker容器包含什么内容,当容器加载后要运行什么进程和配置数据。一旦容器开始运行,你就可以管理它,与app交互,当你结束的时候停止并移除容器。这就使得运行app变得很容易,无需修改代码。

     客户端golang代码

    新葡亰496net 16

    package main
    
    import (
        "fmt"
        "github.com/nsqio/go-nsq"
        "log"
        "os"
        "os/signal"
        "strconv"
        "time"
        "sync"
    )
    
    func main() {
    
        topicName := "publishtest"
        msgCount := 2
        for i := 0; i < msgCount; i   {
            //time.Sleep(time.Millisecond * 20)
            go readMessage(topicName, i)
        }
    
        //cleanup := make(chan os.Signal, 1)
        cleanup := make(chan os.Signal)
        signal.Notify(cleanup, os.Interrupt)
        fmt.Println("server is running....")
    
        quit := make(chan bool)
        go func() {
    
            select {
                case <- cleanup:
                    fmt.Println("Received an interrupt , stoping service ...")
                    for _, ele := range consumers {
                        ele.StopChan <- 1
                        ele.Stop()
                    }
                    quit <- true
            }
        }()
        <-quit
        fmt.Println("Shutdown server....")
    }
    
    type ConsumerHandle struct {
        q       *nsq.Consumer
        msgGood int
    }
    
    var consumers []*nsq.Consumer = make([]*nsq.Consumer, 0)
    var mux *sync.Mutex = &sync.Mutex{}
    
    func (h *ConsumerHandle) HandleMessage(message *nsq.Message) error {
        msg := string(message.Body)   "  "   strconv.Itoa(h.msgGood)
        fmt.Println(msg)
    
        return nil
    }
    
    func readMessage(topicName string, msgCount int) {
    
        defer func() {
            if err := recover(); err != nil {
                fmt.Println("error: ", err)
            }
        }()
    
        config := nsq.NewConfig()
        config.MaxInFlight = 1000
        config.MaxBackoffDuration = 500 * time.Second
    
        //q, _ := nsq.NewConsumer(topicName, "ch"   strconv.Itoa(msgCount), config)
        //q, _ := nsq.NewConsumer(topicName, "ch"   strconv.Itoa(msgCount)   "#ephemeral", config)
        q, _ := nsq.NewConsumer(topicName, "ch" strconv.Itoa(msgCount), config)
    
        h := &ConsumerHandle{q: q, msgGood: msgCount}
        q.AddHandler(h)
    
        err := q.ConnectToNSQLookupd("192.168.0.105:4161")
        //err := q.ConnectToNSQDs([]string{"192.168.0.105:4161"})
        //err := q.ConnectToNSQD("192.168.0.49:4150")
        //err := q.ConnectToNSQD("192.168.0.105:4415")
        if err != nil {
            fmt.Println("conect nsqd error")
            log.Println(err)
        }
        mux.Lock()
        consumers = append(consumers, q)
        mux.Unlock()
        <-q.StopChan
        fmt.Println("end....")
    }
    

    为什么开发者会在乎?

     

    Docker在整个开发周期中对开发者很有帮助。它使得你可以在装有app和服务的本地容器上进行开发,然后集成到一个持续集成和部署工作流中。本质上,它使得开发者的开发过程更加简单。它特别有用处,基于以下原因:

     

    更容易容量扩展

     

    Docker使得保持工作负载高可移植性很容易。容器可以运行在开发者的本地主机,也能运行于物理或虚拟机器或云服务中。这就使得管理工作负载更简单,你可以近乎实时地用它来按比例增加或拆除app和服务。

      运行一下,会启动两个终端:

    更高的密度和更多工作负载

      用我们的发送代码发送信息,再看我们的客户端

    相对基于系统管理程序的虚拟机,Docker是一个轻量级和很划算的选择,能适应高密度环境。对小型和中级部署也很有用,在这类型的部署中你想要充分利用你已经拥有的资源。

      新葡亰496net 17

     

      c# 使用的库为NsqSharp.Core 地址为:

      

    新葡亰496net 18

     

      简单客户端代码为:

     

    class Program
        {
            static void Main()
            {
                // Create a new Consumer for each topic/channel
                var consumerCount = 2;
                var listC = new  List<Consumer>();
                for (var i = 0; i < consumerCount; i  )
                {
                    var consumer = new Consumer("publishtest", $"channel{i}" );
                    consumer.ChangeMaxInFlight(2500);
                    consumer.AddHandler(new MessageHandler());
                    consumer.ConnectToNsqLookupd("192.168.0.105:4161");
                    listC.Add(consumer);
                }
    
    
                var exitEvent = new ManualResetEvent(false);
    
                Console.CancelKeyPress  = (sender, eventArgs) => {
                    eventArgs.Cancel = true;
                    listC.ForEach(x => x.Stop());
                    exitEvent.Set();
                };
    
                exitEvent.WaitOne();
    
    
            }
        }
    
        public class MessageHandler : IHandler
        {
            /// <summary>Handles a message.</summary>
            public void HandleMessage(IMessage message)
            {
                string msg = Encoding.UTF8.GetString(message.Body);
                Console.WriteLine(msg);
            }
    
            /// <summary>
            /// Called when a message has exceeded the specified <see cref="Config.MaxAttempts"/>.
            /// </summary>
            /// <param name="message">The failed message.</param>
            public void LogFailedMessage(IMessage message)
            {
                // Log failed messages
            }
        }
    

     

    本文由新葡亰496net发布于奥门新萄京娱乐场,转载请注明出处:新葡亰496net:技术学习,memcache遍布式和要注意的

    关键词: