diff --git a/.gitignore b/.gitignore index 6143e53..3dc29a8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,22 +1,36 @@ -# Compiled class file -*.class +# kdiff3 ignore +*.orig -# Log file -*.log +# maven ignore +target/ + +# eclipse ignore +.settings/ +.project +.classpath -# BlueJ files -*.ctxt +# idea ignore +.idea/ +*.ipr +*.iml +*.iws + +# temp ignore +*.log +*.cache +*.diff +*.patch +*.tmp -# Mobile Tools for Java (J2ME) -.mtj.tmp/ +# system ignore +.DS_Store +Thumbs.db -# Package Files # -*.jar -*.war -*.ear -*.zip -*.tar.gz -*.rar +# package ignore (optional) +# *.jar +# *.war +# *.zip +# *.tar +# *.tar.gz -# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml -hs_err_pid* +*.hprof \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..b9ede07 --- /dev/null +++ b/pom.xml @@ -0,0 +1,82 @@ + + 4.0.0 + + study.com + deepjava + 1.0-SNAPSHOT + jar + + deepjava + http://maven.apache.org + + + UTF-8 + + + + + junit + junit + 4.12 + test + + + + com.google.guava + guava + 21.0 + + + + org.projectlombok + lombok + 1.16.10 + + + + org.slf4j + log4j-over-slf4j + 1.7.5 + + + org.slf4j + slf4j-api + 1.7.5 + + + + org.slf4j + jcl-over-slf4j + 1.7.5 + + + + ch.qos.logback + logback-classic + 1.0.13 + + + + io.netty + netty-all + 4.1.20.Final + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + diff --git a/src/main/java/study/com/java8/function/Predicates.java b/src/main/java/study/com/java8/function/Predicates.java new file mode 100644 index 0000000..5f58570 --- /dev/null +++ b/src/main/java/study/com/java8/function/Predicates.java @@ -0,0 +1,46 @@ +package study.com.java8.function; + +import com.google.common.base.Strings; +import study.com.java8.lambda.Person; + +import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; + +/** + * Created by runlibo.li on 2018/1/22. + * + * @author runlibo.li + */ +public class Predicates { + + public static void main(String[] args) { + Predicate predicate = (s) -> s.length() > 0; + System.out.println(predicate.test("foo")); + System.out.println(predicate.negate().test("foo")); + + Predicate nonNull = Objects::nonNull; + System.out.println(nonNull.test(false)); + + Predicate isEmpty = Strings::isNullOrEmpty; + System.out.println(isEmpty.test("")); + System.out.println(isEmpty.test(null)); + + Function toInteger = Integer::valueOf; + Function backToString = toInteger.andThen(String::valueOf); + System.out.println(backToString.apply("123")); + + Function backInteger = toInteger.compose(String::valueOf); + System.out.println(backInteger.apply(123)); + + Supplier personSupplier = Person::new; + System.out.println(personSupplier.get()); + + Consumer personConsumer = System.out::println; + personConsumer.accept(new Person("Li", "Bo")); + + } + +} diff --git a/src/main/java/study/com/java8/lambda/LambdaTest.java b/src/main/java/study/com/java8/lambda/LambdaTest.java new file mode 100644 index 0000000..80cef6b --- /dev/null +++ b/src/main/java/study/com/java8/lambda/LambdaTest.java @@ -0,0 +1,17 @@ +package study.com.java8.lambda; + + +/** + * Created by runlibo.li on 2018/1/22. + * + * @author runlibo.li + */ +public class LambdaTest { + + public static void main(String[] args) { + PersonFactory personPersonFactory = Person::new; + Person person = personPersonFactory.create("Li", "Bo"); + System.out.println(person); + } + +} diff --git a/src/main/java/study/com/java8/lambda/Person.java b/src/main/java/study/com/java8/lambda/Person.java new file mode 100644 index 0000000..ebaecc0 --- /dev/null +++ b/src/main/java/study/com/java8/lambda/Person.java @@ -0,0 +1,28 @@ +package study.com.java8.lambda; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +/** + * Created by runlibo.li on 2018/1/22. + * + * @author runlibo.li + */ +@Getter +@Setter +@ToString +public class Person { + String firstName; + + String lastName; + + public Person() { + } + + public Person(String firstName, String lastName) { + this.firstName = firstName; + this.lastName = lastName; + } +} + diff --git a/src/main/java/study/com/java8/lambda/PersonFactory.java b/src/main/java/study/com/java8/lambda/PersonFactory.java new file mode 100644 index 0000000..90f1e17 --- /dev/null +++ b/src/main/java/study/com/java8/lambda/PersonFactory.java @@ -0,0 +1,16 @@ +package study.com.java8.lambda; + +/** + * Created by runlibo.li on 2018/1/22. + * + * @author runlibo.li + */ +@FunctionalInterface +public interface PersonFactory { + + T create(String firstName, String lastName); + + default String descrision() { + return "default"; + } +} diff --git a/src/main/java/study/com/java8/stream/StreamTest.java b/src/main/java/study/com/java8/stream/StreamTest.java new file mode 100644 index 0000000..3083238 --- /dev/null +++ b/src/main/java/study/com/java8/stream/StreamTest.java @@ -0,0 +1,53 @@ +package study.com.java8.stream; + +import com.google.common.collect.Lists; + +import java.io.*; +import java.net.ServerSocket; +import java.util.*; +import java.util.stream.Stream; + +/** + * Created by runlibo.li on 2018/1/22. + * + * @author runlibo.li + */ +public class StreamTest { + + public static List getList() { + return Lists.newArrayList("aaa1", "aaa2", "bbb1", "ccc1", "ddd1", "ddd2"); + } + + public static void testCreate() { + Stream integerStream = Stream.of(1, 2, 3, 4); + Stream stringStream = Stream.of("stream"); + + Stream.generate(Math::random).limit(10).forEach(System.out::println); + + Stream.iterate(1, item -> item + 1).limit(10).forEach(System.out::println); + } + + public static void testFilter() { + StreamTest.getList().stream().filter(s -> s.contains("1")).forEach(System.out::println); + System.out.println(); + + StreamTest.getList().stream().filter(s -> s.contains("1")).sorted(Comparator.reverseOrder()).forEach(System.out::println); + System.out.println(); + + StreamTest.getList().stream().map(String::toUpperCase).forEach(System.out::println); + System.out.println(); + } + + + public static void testMatch() { + System.out.println(StreamTest.getList().stream().anyMatch(s -> s.startsWith("1"))); + System.out.println(StreamTest.getList().stream().allMatch(s -> s.startsWith("1"))); + System.out.println(StreamTest.getList().stream().noneMatch(s -> s.startsWith("1"))); + System.out.println(StreamTest.getList().stream().count()); + } + + public static void main(String[] args) throws FileNotFoundException { + //testFilter(); + //testMatch(); + } +} diff --git a/src/main/java/study/com/netty/started/bytebuf/ByteBufTest.java b/src/main/java/study/com/netty/started/bytebuf/ByteBufTest.java new file mode 100644 index 0000000..9d0a7e8 --- /dev/null +++ b/src/main/java/study/com/netty/started/bytebuf/ByteBufTest.java @@ -0,0 +1,41 @@ +package study.com.netty.started.bytebuf; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.oio.OioEventLoopGroup; +import io.netty.channel.socket.oio.OioServerSocketChannel; + +import java.net.InetSocketAddress; + +/** + * Created by runlibo.li on 2018/4/4. + * + * @author runlibo.li + */ +public class ByteBufTest { + + public static void main(String[] args) { + EventLoopGroup group = new OioEventLoopGroup(); + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(group).channel(OioServerSocketChannel.class) + .localAddress(new InetSocketAddress(8888)); + + } +} + + + + + + + + + + + + + + + diff --git a/src/main/java/study/com/netty/started/chat/SimpleChatClient.java b/src/main/java/study/com/netty/started/chat/SimpleChatClient.java new file mode 100644 index 0000000..8a99700 --- /dev/null +++ b/src/main/java/study/com/netty/started/chat/SimpleChatClient.java @@ -0,0 +1,59 @@ +package study.com.netty.started.chat; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import lombok.extern.slf4j.Slf4j; + +import java.io.BufferedReader; +import java.io.InputStreamReader; + +/** + * Created by runlibo.li on 2018/8/15. + * + * @author runlibo.li + */ +@Slf4j +public class SimpleChatClient { + + + public static void main(String[] args) { + EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); + + try { + Bootstrap bootstrap = new Bootstrap() + .group(eventLoopGroup) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("frame", new LineBasedFrameDecoder(8192)); + pipeline.addLast("decoder", new StringDecoder()); + pipeline.addLast("encoder", new StringEncoder()); + pipeline.addLast("handler", new SimpleChatClientHandler()); + } + }); + Channel channel = bootstrap.connect("127.0.0.1", 9999).sync().channel(); + log.info("client:{} connect server:{}", channel.localAddress(), channel.remoteAddress()); + BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); + while (true) { + channel.writeAndFlush(reader.readLine() + "\n"); + } + } catch (Exception e) { + log.error("", e); + } finally { + eventLoopGroup.shutdownGracefully(); + } + } + + +} diff --git a/src/main/java/study/com/netty/started/chat/SimpleChatClientHandler.java b/src/main/java/study/com/netty/started/chat/SimpleChatClientHandler.java new file mode 100644 index 0000000..23e8eef --- /dev/null +++ b/src/main/java/study/com/netty/started/chat/SimpleChatClientHandler.java @@ -0,0 +1,18 @@ +package study.com.netty.started.chat; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import lombok.extern.slf4j.Slf4j; + +/** + * Created by runlibo.li on 2018/8/15. + * + * @author runlibo.li + */ +@Slf4j +public class SimpleChatClientHandler extends SimpleChannelInboundHandler { + @Override + protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { + log.info(msg); + } +} diff --git a/src/main/java/study/com/netty/started/chat/SimpleChatServer.java b/src/main/java/study/com/netty/started/chat/SimpleChatServer.java new file mode 100644 index 0000000..2bbafe2 --- /dev/null +++ b/src/main/java/study/com/netty/started/chat/SimpleChatServer.java @@ -0,0 +1,55 @@ +package study.com.netty.started.chat; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import lombok.extern.slf4j.Slf4j; + +/** + * Created by runlibo.li on 2018/8/14. + * + * @author runlibo.li + */ +@Slf4j +public class SimpleChatServer { + + public static void main(String[] args) { + EventLoopGroup bossGroup = new NioEventLoopGroup(); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 128) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + //接收到一个连接之后初始化pipeline 初始化完成之后会注册channel,然后变成active状态 + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("framer", new LineBasedFrameDecoder(8192)); + pipeline.addLast("decode", new StringDecoder()); + pipeline.addLast("encode", new StringEncoder()); + pipeline.addLast("handler", new SimpleChatServerHandler()); + } + }) + .childOption(ChannelOption.SO_KEEPALIVE, true); + + log.info("SimpleChatServer启动"); + ChannelFuture future = bootstrap.bind(9999).sync(); + + future.channel().closeFuture().sync(); + } catch (Exception e) { + log.error("", e); + } finally { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + log.info("SimpleChatServer关闭"); + } + } +} diff --git a/src/main/java/study/com/netty/started/chat/SimpleChatServerHandler.java b/src/main/java/study/com/netty/started/chat/SimpleChatServerHandler.java new file mode 100644 index 0000000..2e99e5c --- /dev/null +++ b/src/main/java/study/com/netty/started/chat/SimpleChatServerHandler.java @@ -0,0 +1,92 @@ +package study.com.netty.started.chat; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.GlobalEventExecutor; +import lombok.extern.slf4j.Slf4j; + +/** + * Created by runlibo.li on 2018/8/14. + * + * @author runlibo.li + */ +@Slf4j +public class SimpleChatServerHandler extends SimpleChannelInboundHandler { + + /** + * 实现广播 + */ + private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + Channel incoming = ctx.channel(); + channels.add(incoming); + log.info("{} handlerAdded", incoming.remoteAddress()); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + Channel incoming = ctx.channel(); + channels.remove(incoming); + log.info("{} handlerRemove", incoming.remoteAddress()); + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + log.info("{} channelRegistered", ctx.channel().remoteAddress()); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + log.info("{} channelUnregistered", ctx.channel().remoteAddress()); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + log.info("{} channelReadComplete", ctx.channel().remoteAddress()); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + log.info("{} channelWritabilityChanged", ctx.channel().remoteAddress()); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + Channel incoming = ctx.channel(); + log.info("{} channelActive", incoming.remoteAddress()); + channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 上线\n"); + } + + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + Channel incoming = ctx.channel(); + log.info("{} channelInactive", incoming.remoteAddress()); + channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 下线\n"); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + log.error("{} exceptionCaught", ctx.channel().remoteAddress(), cause); + ctx.close(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { + Channel incoming = ctx.channel(); + log.info("{} channelRead0 receive: {}", incoming.remoteAddress(), msg); + channels.forEach((channel) -> { + if (channel != incoming) { + channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + msg + "\n"); + } else { + channel.writeAndFlush("[you]" + msg + "\n"); + } + }); + } +} diff --git a/src/main/java/study/com/netty/started/discard/DiscardServer.java b/src/main/java/study/com/netty/started/discard/DiscardServer.java new file mode 100644 index 0000000..c206349 --- /dev/null +++ b/src/main/java/study/com/netty/started/discard/DiscardServer.java @@ -0,0 +1,49 @@ +package study.com.netty.started.discard; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import lombok.extern.slf4j.Slf4j; +/** + * Created by runlibo.li on 2018/8/12. + * + * @author runlibo.li + */ +@Slf4j +public class DiscardServer { + + public static void main(String[] args) { + EventLoopGroup bossGroup = new NioEventLoopGroup(); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new DiscardServerHandler()); + } + }) + .option(ChannelOption.SO_BACKLOG, 128) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.TCP_NODELAY, true); + + ChannelFuture future = bootstrap.bind(9999).sync(); + + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + log.error("", e); + } finally { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + +} diff --git a/src/main/java/study/com/netty/started/discard/DiscardServerHandler.java b/src/main/java/study/com/netty/started/discard/DiscardServerHandler.java new file mode 100644 index 0000000..d06e3ea --- /dev/null +++ b/src/main/java/study/com/netty/started/discard/DiscardServerHandler.java @@ -0,0 +1,27 @@ +package study.com.netty.started.discard; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; + +/** + * Created by runlibo.li on 2018/8/12. + * + * @author runlibo.li + */ +@Slf4j +public class DiscardServerHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + //丢弃到收到的数据,不做任何处理 + ((ByteBuf) msg).release(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + log.error("receive exception", cause); + ctx.close(); + } +} diff --git a/src/main/java/study/com/netty/started/echo/EchoClient.java b/src/main/java/study/com/netty/started/echo/EchoClient.java new file mode 100644 index 0000000..2d28dcc --- /dev/null +++ b/src/main/java/study/com/netty/started/echo/EchoClient.java @@ -0,0 +1,43 @@ +package study.com.netty.started.echo; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.extern.slf4j.Slf4j; + +import java.net.InetSocketAddress; + +/** + * Created by runlibo.li on 2018/6/24. + * + * @author runlibo.li + */ +@Slf4j +public class EchoClient { + + public static void main(String[] args) { + NioEventLoopGroup group = new NioEventLoopGroup(); + EchoClientHandler handler = new EchoClientHandler(); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group) + .channel(NioSocketChannel.class) + .remoteAddress(new InetSocketAddress(9999)) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(handler); + } + }); + try { + ChannelFuture future = bootstrap.connect().sync(); + future.channel().closeFuture().sync(); + } catch (Exception e) { + log.error("", e); + } finally { + group.shutdownGracefully(); + } + } +} diff --git a/src/main/java/study/com/netty/started/echo/EchoClientHandler.java b/src/main/java/study/com/netty/started/echo/EchoClientHandler.java new file mode 100644 index 0000000..13ffd9a --- /dev/null +++ b/src/main/java/study/com/netty/started/echo/EchoClientHandler.java @@ -0,0 +1,38 @@ +package study.com.netty.started.echo; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.CharsetUtil; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.Charset; + +/** + * Created by runlibo.li on 2018/6/24. + * + * @author runlibo.li + */ +@Slf4j +@ChannelHandler.Sharable +public class EchoClientHandler extends SimpleChannelInboundHandler { + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + log.info("channelActive"); + ctx.writeAndFlush(Unpooled.copiedBuffer("Good good study!", CharsetUtil.UTF_8)); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + log.error("client receive:{}", msg.toString(Charset.forName("UTF-8"))); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + log.error("", cause); + ctx.close(); + } +} diff --git a/src/main/java/study/com/netty/started/echo/EchoServer.java b/src/main/java/study/com/netty/started/echo/EchoServer.java new file mode 100644 index 0000000..45d178c --- /dev/null +++ b/src/main/java/study/com/netty/started/echo/EchoServer.java @@ -0,0 +1,43 @@ +package study.com.netty.started.echo; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import lombok.extern.slf4j.Slf4j; + +/** + * Created by runlibo.li on 2018/6/24. + * + * @author runlibo.li + */ +@Slf4j +public class EchoServer { + + public static void main(String[] args) { + NioEventLoopGroup group = new NioEventLoopGroup(); + EchoServerHandler serverHandler = new EchoServerHandler(); + ServerBootstrap serverBootstrap = new ServerBootstrap(); + serverBootstrap.group(group) + .channel(NioServerSocketChannel.class) + .localAddress(9999) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(serverHandler); + } + }); + try { + //sync方法阻塞等待直到操作完成,和await方法的区别是它会抛出异常 + ChannelFuture future = serverBootstrap.bind().sync(); + //获取到closeFuture对象,阻塞当前线程直到它完成 + future.channel().closeFuture().sync(); + } catch (Exception e) { + log.error("", e); + } finally { + group.shutdownGracefully(); + } + } +} diff --git a/src/main/java/study/com/netty/started/echo/EchoServerHandler.java b/src/main/java/study/com/netty/started/echo/EchoServerHandler.java new file mode 100644 index 0000000..0d32bd8 --- /dev/null +++ b/src/main/java/study/com/netty/started/echo/EchoServerHandler.java @@ -0,0 +1,78 @@ +package study.com.netty.started.echo; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.Charset; + +/** + * Created by runlibo.li on 2018/6/24. + * + * @author runlibo.li + */ +@ChannelHandler.Sharable +@Slf4j +public class EchoServerHandler implements ChannelInboundHandler { + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + log.info("channel channelRead"); + ByteBuf byteBuf = (ByteBuf) msg; + log.info("server receive:{}", byteBuf.toString(Charset.forName("UTF-8"))); + ctx.write(byteBuf); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + log.info("channel channelReadComplete"); + ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + log.error("channel exceptionCaught", cause); + ctx.close(); + } +} diff --git a/src/main/java/study/com/netty/started/heartbeat/AcceptorIdleStateTrigger.java b/src/main/java/study/com/netty/started/heartbeat/AcceptorIdleStateTrigger.java new file mode 100644 index 0000000..4c12cef --- /dev/null +++ b/src/main/java/study/com/netty/started/heartbeat/AcceptorIdleStateTrigger.java @@ -0,0 +1,28 @@ +package study.com.netty.started.heartbeat; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; + +/** + * Created by runlibo.li on 2018/8/17. + * + * @author runlibo.li + */ +@ChannelHandler.Sharable +public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter { + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleState state = ((IdleStateEvent) evt).state(); + if (state == IdleState.READER_IDLE) { + throw new IllegalStateException("idle exception"); + } else { + super.userEventTriggered(ctx, evt); + } + } + } +} diff --git a/src/main/java/study/com/netty/started/heartbeat/ChannelHandlerHolder.java b/src/main/java/study/com/netty/started/heartbeat/ChannelHandlerHolder.java new file mode 100644 index 0000000..c81602e --- /dev/null +++ b/src/main/java/study/com/netty/started/heartbeat/ChannelHandlerHolder.java @@ -0,0 +1,17 @@ +package study.com.netty.started.heartbeat; + +import io.netty.channel.ChannelHandler; + +/** + * Created by runlibo.li on 2018/8/19. + *

+ * 客户端的ChannelHandler集合,由子类实现,这样做的好处: + * 继承这个接口的所有子类可以很方便地获取ChannelPipeline中的Handlers + * 获取到handlers之后方便ChannelPipeline中的handler的初始化和在重连的时候也能很方便 + * 地获取所有的handlers + * + * @author runlibo.li + */ +public interface ChannelHandlerHolder { + ChannelHandler[] handlers(); +} diff --git a/src/main/java/study/com/netty/started/heartbeat/ConnectionWatchdog.java b/src/main/java/study/com/netty/started/heartbeat/ConnectionWatchdog.java new file mode 100644 index 0000000..5e93a71 --- /dev/null +++ b/src/main/java/study/com/netty/started/heartbeat/ConnectionWatchdog.java @@ -0,0 +1,96 @@ +package study.com.netty.started.heartbeat; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.*; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.TimeUnit; + +/** + * Created by runlibo.li on 2018/8/17. + *

+ * 重连检测狗,当发现当前链路关闭之后,进行重连 + * + * @author runlibo.li + */ +@Slf4j +@ChannelHandler.Sharable +public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask, ChannelHandlerHolder { + + private final Bootstrap bootstrap; + private final String host; + private final int port; + private final Timer timer; + private final int retryTimes; + private volatile boolean reconnect = true; + private int currentTimes = 0; + + + public ConnectionWatchdog(Bootstrap bootstrap, String host, int port, Timer timer, boolean reconnect, int retryTimes) { + this.bootstrap = bootstrap; + this.host = host; + this.port = port; + this.timer = timer; + this.reconnect = reconnect; + this.retryTimes = retryTimes; + } + + + /** + * channel链路active的时候,将其重试次数置为0 + */ + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + if (currentTimes != 0) { + log.info("reconnect {} success, retryTimes:{}", ctx.channel().remoteAddress(), currentTimes); + } + currentTimes = 0; + ctx.fireChannelActive(); + } + + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + log.info("server {} inactive", ctx.channel().remoteAddress()); + if (reconnect) { + if (currentTimes < retryTimes) { + currentTimes++; + //重连的时间间隔会越来越长 + int timeout = 2 << currentTimes; + timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS); + } + } + + super.channelInactive(ctx); + } + + @Override + public void run(Timeout timeout) throws Exception { + ChannelFuture future = null; + //这里应该主要为了同步变量,并没有并发的问题 + //bootstrap已经初始化好了,只需要将handler填入 + synchronized (bootstrap) { + bootstrap.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(handlers()); + } + }); + future = bootstrap.connect(host, port); + } + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + log.info("reconnect {} success", future.channel().remoteAddress()); + } else { + log.info("reconnect {} fail", future.channel().remoteAddress()); + future.channel().pipeline().fireChannelInactive(); + } + } + }); + } +} diff --git a/src/main/java/study/com/netty/started/heartbeat/ConnectorIdleStateTrigger.java b/src/main/java/study/com/netty/started/heartbeat/ConnectorIdleStateTrigger.java new file mode 100644 index 0000000..c9eaf38 --- /dev/null +++ b/src/main/java/study/com/netty/started/heartbeat/ConnectorIdleStateTrigger.java @@ -0,0 +1,33 @@ +package study.com.netty.started.heartbeat; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.CharsetUtil; + +/** + * Created by runlibo.li on 2018/8/19. + * + * @author runlibo.li + */ +@ChannelHandler.Sharable +public class ConnectorIdleStateTrigger extends ChannelInboundHandlerAdapter { + + private static final ByteBuf HEALTH_CHECK = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("healthcheck", CharsetUtil.UTF_8)); + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleState idleState = ((IdleStateEvent) evt).state(); + if (idleState == IdleState.WRITER_IDLE) { + ctx.writeAndFlush(HEALTH_CHECK.duplicate()); + } + } else { + super.userEventTriggered(ctx, evt); + } + } +} diff --git a/src/main/java/study/com/netty/started/heartbeat/HeartBeatClient.java b/src/main/java/study/com/netty/started/heartbeat/HeartBeatClient.java new file mode 100644 index 0000000..fb6b573 --- /dev/null +++ b/src/main/java/study/com/netty/started/heartbeat/HeartBeatClient.java @@ -0,0 +1,63 @@ +package study.com.netty.started.heartbeat; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.HashedWheelTimer; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.TimeUnit; + +/** + * Created by runlibo.li on 2018/8/19. + * + * @author runlibo.li + */ +@Slf4j +public class HeartBeatClient { + + private final HashedWheelTimer timer = new HashedWheelTimer(); + private final ConnectorIdleStateTrigger idleStateTrigger = new ConnectorIdleStateTrigger(); + + public void connect(String host, int port) { + EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap().group(eventLoopGroup).channel(NioSocketChannel.class); + ConnectionWatchdog watchdog = new ConnectionWatchdog(bootstrap, host, port, timer, true, 20) { + @Override + public ChannelHandler[] handlers() { + return new ChannelHandler[]{ + this, + new IdleStateHandler(0, 1, 0, TimeUnit.SECONDS), + idleStateTrigger + }; + } + }; + + synchronized (bootstrap) { + bootstrap.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(watchdog.handlers()); + } + }); + } + try { + Channel channel = bootstrap.connect(host, port).sync().channel(); + log.info("client:{} connect server:{}", channel.localAddress(), channel.remoteAddress()); + } catch (Exception e) { + log.error("connect server error, ", e); + } + } + + + public static void main(String[] args) { + new HeartBeatClient().connect("127.0.0.1", 9998); + } + + +} diff --git a/src/main/java/study/com/netty/started/heartbeat/HeartBeatServer.java b/src/main/java/study/com/netty/started/heartbeat/HeartBeatServer.java new file mode 100644 index 0000000..a1b01ff --- /dev/null +++ b/src/main/java/study/com/netty/started/heartbeat/HeartBeatServer.java @@ -0,0 +1,59 @@ +package study.com.netty.started.heartbeat; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.handler.timeout.IdleStateHandler; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.TimeUnit; + +/** + * Created by runlibo.li on 2018/8/16. + * + * @author runlibo.li + */ +@Slf4j +public class HeartBeatServer { + + public static void main(String[] args) { + EventLoopGroup bossGroup = new NioEventLoopGroup(); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + try { + AcceptorIdleStateTrigger idleStateTrigger = new AcceptorIdleStateTrigger(); + HeartBeatServerHandler serverHandler = new HeartBeatServerHandler(); + ServerBootstrap bootstrap = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 128) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("idle", new IdleStateHandler(2, 0, 0, TimeUnit.SECONDS)); + pipeline.addLast("idleState", idleStateTrigger); + pipeline.addLast("decode", new StringDecoder()); + pipeline.addLast("encode", new StringEncoder()); + pipeline.addLast("heartbeat", serverHandler); + } + }) + .childOption(ChannelOption.SO_KEEPALIVE, true); + + log.info("HeartBeatServer启动"); + Channel channel = bootstrap.bind(9998).sync().channel(); + channel.closeFuture().sync(); + } catch (Exception e) { + log.error("", e); + } finally { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + log.info("HeartBeatServer关闭"); + } + } + + +} diff --git a/src/main/java/study/com/netty/started/heartbeat/HeartBeatServerHandler.java b/src/main/java/study/com/netty/started/heartbeat/HeartBeatServerHandler.java new file mode 100644 index 0000000..ea5d780 --- /dev/null +++ b/src/main/java/study/com/netty/started/heartbeat/HeartBeatServerHandler.java @@ -0,0 +1,27 @@ +package study.com.netty.started.heartbeat; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; + +/** + * Created by runlibo.li on 2018/8/17. + * + * @author runlibo.li + */ +@Slf4j +@ChannelHandler.Sharable +public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + log.info("{} send msg {}", ctx.channel().remoteAddress(), msg); + } + + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + log.error("{} ", ctx.channel().remoteAddress(), cause); + ctx.close(); + } +} diff --git a/src/main/java/study/com/netty/started/transfer/NettyOioServer.java b/src/main/java/study/com/netty/started/transfer/NettyOioServer.java new file mode 100644 index 0000000..ef16927 --- /dev/null +++ b/src/main/java/study/com/netty/started/transfer/NettyOioServer.java @@ -0,0 +1,10 @@ +package study.com.netty.started.transfer; + +/** + * Created by runlibo.li on 2018/4/3. + * + * @author runlibo.li + */ +public class NettyOioServer { + +} diff --git a/src/main/java/study/com/nio/aio/AbstractSession.java b/src/main/java/study/com/nio/aio/AbstractSession.java new file mode 100644 index 0000000..a7321a5 --- /dev/null +++ b/src/main/java/study/com/nio/aio/AbstractSession.java @@ -0,0 +1,35 @@ +package study.com.nio.aio; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.nio.channels.AsynchronousServerSocketChannel; +import java.nio.channels.AsynchronousSocketChannel; + +/** + * Created by runlibo.li on 2018/4/26. + * + * @author runlibo.li + */ +@Getter +@Slf4j +public class AbstractSession { + + protected AsynchronousSocketChannel clientChannel; + + protected AsynchronousServerSocketChannel serverChannel; + + AbstractSession(SessionConfig sessionConfig) { + this.clientChannel = sessionConfig.getClientChannel(); + this.serverChannel = sessionConfig.getServerSocketChannel(); + } + + public void close() { + try { + clientChannel.close(); + } catch (IOException e) { + log.error("ClientChannel close error", e); + } + } +} diff --git a/src/main/java/study/com/nio/aio/AcceptCompletionHandler.java b/src/main/java/study/com/nio/aio/AcceptCompletionHandler.java new file mode 100644 index 0000000..8e5ccbc --- /dev/null +++ b/src/main/java/study/com/nio/aio/AcceptCompletionHandler.java @@ -0,0 +1,49 @@ +package study.com.nio.aio; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.nio.channels.AsynchronousServerSocketChannel; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; + +/** + * Created by runlibo.li on 2018/4/24. + * + * @author runlibo.li + */ +@Slf4j +public final class AcceptCompletionHandler implements CompletionHandler { + + @Override + public void completed(AsynchronousSocketChannel client, AsynchronousServerSocketChannel serverSocketChannel) { + try { + log.info("Incoming connection from:{}", client.getRemoteAddress()); + SessionConfig sessionConfig = SessionConfig.builder() + .serverSocketChannel(serverSocketChannel) + .clientChannel(client).build(); + AioReadSession session = new AioReadSession(sessionConfig, 1024); + session.start(); + } catch (IOException e) { + log.error("Accept error", e); + } finally { + pendingAccept(serverSocketChannel); + } + } + + + @Override + public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) { + log.error("Accept fail", exc); + pendingAccept(attachment); + } + + + private void pendingAccept(AsynchronousServerSocketChannel serverSocketChannel) { + if (serverSocketChannel.isOpen()) { + serverSocketChannel.accept(serverSocketChannel, new AcceptCompletionHandler()); + } else { + throw new IllegalStateException("connection has been closed"); + } + } +} diff --git a/src/main/java/study/com/nio/aio/AioReadSession.java b/src/main/java/study/com/nio/aio/AioReadSession.java new file mode 100644 index 0000000..6feebee --- /dev/null +++ b/src/main/java/study/com/nio/aio/AioReadSession.java @@ -0,0 +1,41 @@ +package study.com.nio.aio; + +import java.nio.ByteBuffer; + +/** + * Created by runlibo.li on 2018/4/24. + * + * @author runlibo.li + */ +public class AioReadSession extends AbstractSession { + + private ByteBuffer readBuffer; + + private ReadCompletionHandler readCompletionHandler = new ReadCompletionHandler(); + + public AioReadSession(SessionConfig sessionConfig, int bufSize) { + super(sessionConfig); + this.readBuffer = ByteBuffer.allocate(bufSize); + } + + public void start() { + pendingRead(); + } + + public void pendingRead() { + if (clientChannel.isOpen()) { + clientChannel.read(readBuffer, this, this.readCompletionHandler); + } else { + throw new IllegalStateException("ClientChannel has been closed"); + } + } + + + public String getReadContent() { + readBuffer.flip(); + byte[] bytes = new byte[readBuffer.remaining()]; + readBuffer.get(bytes); + readBuffer.clear(); + return new String(bytes); + } +} diff --git a/src/main/java/study/com/nio/aio/AioServerOnhAndler.java b/src/main/java/study/com/nio/aio/AioServerOnhAndler.java new file mode 100644 index 0000000..178357c --- /dev/null +++ b/src/main/java/study/com/nio/aio/AioServerOnhAndler.java @@ -0,0 +1,28 @@ +package study.com.nio.aio; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.AsynchronousServerSocketChannel; + +/** + * Created by runlibo.li on 2018/4/23. + * + * @author runlibo.li + */ +@Slf4j +public class AioServerOnHandler { + + public static void main(String[] args) throws IOException, InterruptedException { + try (AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open()) { + serverSocketChannel.bind(new InetSocketAddress(9999)); + log.info("Waiting for connections"); + serverSocketChannel.accept(serverSocketChannel, new AcceptCompletionHandler()); + + Thread.currentThread().join(); + } + } +} + + diff --git a/src/main/java/study/com/nio/aio/AioWriteSession.java b/src/main/java/study/com/nio/aio/AioWriteSession.java new file mode 100644 index 0000000..f08cda4 --- /dev/null +++ b/src/main/java/study/com/nio/aio/AioWriteSession.java @@ -0,0 +1,55 @@ +package study.com.nio.aio; + +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.Queue; + +/** + * Created by runlibo.li on 2018/4/26. + * + * @author runlibo.li + */ +public class AioWriteSession extends AbstractSession { + + private final Queue writeQueue = new LinkedList<>(); + + private WriteCompletionHandler writeCompletionHandler = new WriteCompletionHandler(); + + public AioWriteSession(SessionConfig sessionConfig) { + super(sessionConfig); + } + + public void write(ByteBuffer writeBuffer) { + boolean needWrite = false; + synchronized (writeQueue) { + needWrite = writeQueue.isEmpty(); + writeQueue.offer(writeBuffer); + } + if (needWrite) { + pendingWrite(writeBuffer); + } + } + + public void tryWrite(){ + ByteBuffer writeBuffer = null; + synchronized (writeQueue) { + writeBuffer = writeQueue.peek(); + if (writeBuffer == null || !writeBuffer.hasRemaining()){ + writeQueue.poll(); + writeBuffer = writeQueue.peek(); + } + } + + if (writeBuffer != null) { + pendingWrite(writeBuffer); + } + } + + private void pendingWrite(ByteBuffer writeBuffer) { + if (clientChannel.isOpen()) { + clientChannel.write(writeBuffer, this, this.writeCompletionHandler); + } else { + throw new IllegalStateException("ClientChannel has been closed"); + } + } +} diff --git a/src/main/java/study/com/nio/aio/ReadCompletionHandler.java b/src/main/java/study/com/nio/aio/ReadCompletionHandler.java new file mode 100644 index 0000000..ae6adec --- /dev/null +++ b/src/main/java/study/com/nio/aio/ReadCompletionHandler.java @@ -0,0 +1,43 @@ +package study.com.nio.aio; + +import com.google.common.base.Charsets; +import lombok.extern.slf4j.Slf4j; + +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; +/** + * Created by runlibo.li on 2018/4/24. + * + * @author runlibo.li + */ +@Slf4j +public final class ReadCompletionHandler implements CompletionHandler { + @Override + public void completed(Integer result, AioReadSession readSession) { + if (result < 0){ + log.info("Client closed"); + readSession.close(); + return; + } + if (result > 0){ + String receiveContent = readSession.getReadContent(); + log.info(receiveContent); + readSession.pendingRead(); + + SessionConfig sessionConfig = SessionConfig.builder() + .clientChannel(readSession.getClientChannel()) + .serverSocketChannel(readSession.getServerChannel()) + .build(); + AioWriteSession writeSession = new AioWriteSession(sessionConfig); + ByteBuffer writeBuffer = ByteBuffer.wrap("receive succ".getBytes(Charsets.UTF_8)); + writeSession.write(writeBuffer); + } + + } + + @Override + public void failed(Throwable exc, AioReadSession session) { + log.error("read failed", exc); + session.close(); + } +} diff --git a/src/main/java/study/com/nio/aio/SessionConfig.java b/src/main/java/study/com/nio/aio/SessionConfig.java new file mode 100644 index 0000000..0e59510 --- /dev/null +++ b/src/main/java/study/com/nio/aio/SessionConfig.java @@ -0,0 +1,22 @@ +package study.com.nio.aio; + +import lombok.Builder; +import lombok.Getter; + +import java.nio.channels.AsynchronousServerSocketChannel; +import java.nio.channels.AsynchronousSocketChannel; + +/** + * Created by runlibo.li on 2018/4/26. + * + * @author runlibo.li + */ +@Getter +@Builder +public class SessionConfig { + + private AsynchronousServerSocketChannel serverSocketChannel; + + private AsynchronousSocketChannel clientChannel; + +} diff --git a/src/main/java/study/com/nio/aio/WriteCompletionHandler.java b/src/main/java/study/com/nio/aio/WriteCompletionHandler.java new file mode 100644 index 0000000..03e8cb1 --- /dev/null +++ b/src/main/java/study/com/nio/aio/WriteCompletionHandler.java @@ -0,0 +1,25 @@ +package study.com.nio.aio; + +import lombok.extern.slf4j.Slf4j; + +import java.nio.channels.CompletionHandler; + +/** + * Created by runlibo.li on 2018/4/26. + * + * @author runlibo.li + */ +@Slf4j +public class WriteCompletionHandler implements CompletionHandler { + @Override + public void completed(Integer result, AioWriteSession writeSession) { + log.debug("to {} write {} bytes", writeSession.serverChannel); + writeSession.tryWrite(); + } + + @Override + public void failed(Throwable exc, AioWriteSession writeSession) { + log.error("write failed", exc); + writeSession.close(); + } +} diff --git a/src/main/java/study/com/nio/buffer/BufferTest.java b/src/main/java/study/com/nio/buffer/BufferTest.java new file mode 100644 index 0000000..784fd5f --- /dev/null +++ b/src/main/java/study/com/nio/buffer/BufferTest.java @@ -0,0 +1,130 @@ +package study.com.nio.buffer; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.CharBuffer; +import java.nio.IntBuffer; + +/** + * Created by runlibo.li on 2018/1/30. + * + * @author runlibo.li + */ +public class BufferTest { + + private static void testProperties() { + CharBuffer charBuffer = CharBuffer.allocate(10); + showBuffer(charBuffer); + + charBuffer.put("abc"); + showBuffer(charBuffer); + + //flip在写入后读取buffer之前进行调用,它是将limit设成写状态下的position,position设为0 + charBuffer.flip(); + showBuffer(charBuffer); + + char c = charBuffer.get(); + showBuffer(charBuffer); + + //clear是将position设为0,limit设置成capacity的大小 + charBuffer.clear(); + showBuffer(charBuffer); + + charBuffer.position(1); + showBuffer(charBuffer); + } + + private static void testCompact() { + CharBuffer charBuffer = CharBuffer.allocate(10); + charBuffer.put("def"); + showBuffer(charBuffer); + + charBuffer.flip(); + showBuffer(charBuffer); + + char[] arr = new char[2]; + charBuffer.get(arr); + showBuffer(charBuffer); + + //compact将读取过程中还没有读到的那部分内容挪到buffer的头部,这时position是没有读到内容的长度大小, + // limit成了capacity的大小,便于后续的写入,可以认为buffer是写状态 + charBuffer.compact(); + charBuffer.put("ghlmn"); + charBuffer.flip(); + showBuffer(charBuffer); + } + + private static void testDuplicate() { + CharBuffer charBuffer = CharBuffer.allocate(10); + charBuffer.put("abcde"); + //duplicate用于复制缓冲区,最终两个缓冲区实际上指向的是同一个内部数组,只是分别管理各自的属性 + CharBuffer charBuffer1 = charBuffer.duplicate(); + charBuffer1.clear(); + charBuffer1.put("other1"); + + showBuffer(charBuffer); + showBuffer(charBuffer1); + } + + + private static void testElementView() { + ByteBuffer byteBuffer = ByteBuffer.allocate(12); + byteBuffer.put(new byte[]{0x00, 0x00, 0x00, 0x42}); + byteBuffer.position(0); + //转换成视图缓冲区,是以position开头,limit结尾来创建新的视图缓冲区的 + IntBuffer intBuffer = byteBuffer.asIntBuffer(); + + int i = intBuffer.get(); + System.out.println(i); + System.out.println(Integer.toHexString(i)); + } + + + private static void testPutAndGetElement() { + ByteBuffer buffer = ByteBuffer.allocate(12); + buffer.putInt(0x1234abcd); + buffer.position(0); + byte b1 = buffer.get(); + byte b2 = buffer.get(); + byte b3 = buffer.get(); + byte b4 = buffer.get(); + + //java中默认是以大端的方式来存放元素的 + System.out.println(Integer.toHexString(b1 & 0xff)); + System.out.println(Integer.toHexString(b2 & 0xff)); + System.out.println(Integer.toHexString(b3 & 0xff)); + System.out.println(Integer.toHexString(b4 & 0xff)); + + buffer.position(0); + int bigEndian = buffer.getInt(); + System.out.println(Integer.toHexString(bigEndian)); + + buffer.rewind(); + int littleEndian = buffer.order(ByteOrder.LITTLE_ENDIAN).getInt(); + System.out.println(Integer.toHexString(littleEndian)); + } + + + private static void showBuffer(CharBuffer buffer) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < buffer.limit(); ++i) { + char c = buffer.get(i); + if (c == 0) { + c = '.'; + } + sb.append(c); + } + System.out.printf("position=%d, limit=%d, capacity=%d, content=%s\n", + buffer.position(), buffer.limit(), buffer.capacity(), sb.toString()); + } + + + public static void main(String[] args) { + testProperties(); + testCompact(); + testDuplicate(); + testElementView(); + testPutAndGetElement(); + } + +} diff --git a/src/main/java/study/com/nio/channel/ChannelTest.java b/src/main/java/study/com/nio/channel/ChannelTest.java new file mode 100644 index 0000000..e0aa558 --- /dev/null +++ b/src/main/java/study/com/nio/channel/ChannelTest.java @@ -0,0 +1,58 @@ +package study.com.nio.channel; + + +import java.io.*; +import java.nio.ByteBuffer; +import java.nio.channels.*; +/** + * Created by runlibo.li on 2018/1/24. + * + * @author runlibo.li + */ +public class ChannelTest { + + private static void testNewChannel() throws IOException { + ReadableByteChannel readableByteChannel = Channels.newChannel(System.in); + WritableByteChannel writableByteChannel = Channels.newChannel(System.out); + ByteBuffer byteBuffer = ByteBuffer.allocate(1024); + + //注意这里buffer的remaining为0的时候,read会返回0 + while (readableByteChannel.read(byteBuffer) != -1) { + byteBuffer.flip(); + while (byteBuffer.hasRemaining()) { + writableByteChannel.write(byteBuffer); + } + //在使用完buffer需要再次使用时应该清除读取的状态信息 + byteBuffer.clear(); + } + } + + + private static void testFileChannelCopy() throws IOException { + String pathSrc = "./test.txt"; + String pathDes = "./copy.txt"; + + RandomAccessFile source = new RandomAccessFile(pathSrc, "r"); + RandomAccessFile dest = new RandomAccessFile(pathDes, "rw"); + + FileChannel srcChannel = source.getChannel(); + FileChannel desChannel = dest.getChannel(); + ByteBuffer buffer = ByteBuffer.allocate(1024); + + while (srcChannel.read(buffer) != -1) { + buffer.flip(); + while (buffer.hasRemaining()) { + desChannel.write(buffer); + } + buffer.clear(); + } + srcChannel.close(); + desChannel.close(); + } + + + public static void main(String[] args) throws IOException { + //testNewChannel(); + testFileChannelCopy(); + } +} diff --git a/src/main/java/study/com/nio/socket/BlockingChannelClient.java b/src/main/java/study/com/nio/socket/BlockingChannelClient.java new file mode 100644 index 0000000..01f1d46 --- /dev/null +++ b/src/main/java/study/com/nio/socket/BlockingChannelClient.java @@ -0,0 +1,41 @@ +package study.com.nio.socket; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SocketChannel; + +/** + * Created by runlibo.li on 2018/1/31. + * + * @author runlibo.li + */ +public class BlockingChannelClient { + + public static void main(String[] args) throws IOException { + SocketChannel socketChannel = SocketChannel.open(); + socketChannel.connect(new InetSocketAddress(6666)); + + ReadableByteChannel inputChannel = Channels.newChannel(System.in); + ByteBuffer byteBuffer = ByteBuffer.allocate(1024); + while (inputChannel.read(byteBuffer) != -1) { + byteBuffer.flip(); + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + if (new String(bytes).startsWith("exit")){ + socketChannel.close(); + break; + } + + byteBuffer.position(0); + while (byteBuffer.hasRemaining()) { + socketChannel.write(byteBuffer); + } + byteBuffer.clear(); + } + inputChannel.close(); + socketChannel.close(); + } +} diff --git a/src/main/java/study/com/nio/socket/BlockingChannelServer.java b/src/main/java/study/com/nio/socket/BlockingChannelServer.java new file mode 100644 index 0000000..8582319 --- /dev/null +++ b/src/main/java/study/com/nio/socket/BlockingChannelServer.java @@ -0,0 +1,35 @@ +package study.com.nio.socket; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +/** + * Created by runlibo.li on 2018/1/31. + * + * @author runlibo.li + */ +@Slf4j +public class BlockingChannelServer { + public static void main(String[] args) throws IOException { + ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.bind(new InetSocketAddress(6666)); + SocketChannel cli = serverSocketChannel.accept(); + + ByteBuffer byteBuffer = ByteBuffer.allocate(1024); + while (cli.read(byteBuffer) != -1) { + byteBuffer.flip(); + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + log.info(new String(bytes)); + byteBuffer.clear(); + } + + serverSocketChannel.close(); + cli.close(); + } +} diff --git a/src/main/java/study/com/nio/socket/NonBlockingChannelClient.java b/src/main/java/study/com/nio/socket/NonBlockingChannelClient.java new file mode 100644 index 0000000..2cf1980 --- /dev/null +++ b/src/main/java/study/com/nio/socket/NonBlockingChannelClient.java @@ -0,0 +1,44 @@ +package study.com.nio.socket; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SocketChannel; +import java.util.concurrent.TimeUnit; + +/** + * Created by runlibo.li on 2018/1/31. + * + * @author runlibo.li + */ +@Slf4j +public class NonBlockingChannelClient { + + public static void main(String[] args) throws IOException, InterruptedException { + SocketChannel socketChannel = SocketChannel.open(); + socketChannel.configureBlocking(false); + socketChannel.connect(new InetSocketAddress(7777)); + while (!socketChannel.finishConnect()) { + log.info("connect has not finished, wait..."); + TimeUnit.SECONDS.sleep(1); + } + + ReadableByteChannel inputChannel = Channels.newChannel(System.in); + ByteBuffer buffer = ByteBuffer.allocate(1024); + while (inputChannel.read(buffer) != -1) { + buffer.flip(); + while (buffer.hasRemaining()) { + socketChannel.write(buffer); + } + buffer.clear(); + } + + inputChannel.close(); + socketChannel.close(); + } + +} diff --git a/src/main/java/study/com/nio/socket/NonBlockingChannelServer.java b/src/main/java/study/com/nio/socket/NonBlockingChannelServer.java new file mode 100644 index 0000000..a29644d --- /dev/null +++ b/src/main/java/study/com/nio/socket/NonBlockingChannelServer.java @@ -0,0 +1,44 @@ +package study.com.nio.socket; + + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.concurrent.TimeUnit; + +/** + * Created by runlibo.li on 2018/1/31. + * + * @author runlibo.li + */ +@Slf4j +public class NonBlockingChannelServer { + public static void main(String[] args) throws IOException, InterruptedException { + ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.configureBlocking(false); + serverSocketChannel.bind(new InetSocketAddress(7777)); + + SocketChannel cli = null; + while ((cli = serverSocketChannel.accept()) == null) { + TimeUnit.SECONDS.sleep(1); + log.info("non-blocking model no connection"); + } + log.info("accept connection from:{}", cli.getRemoteAddress()); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + while (cli.read(buffer) != -1) { + buffer.flip(); + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + log.info("{}", new String(bytes)); + buffer.clear(); + } + cli.close(); + serverSocketChannel.close(); + } + +} diff --git a/src/main/java/study/com/nio/socket/SelectorClient.java b/src/main/java/study/com/nio/socket/SelectorClient.java new file mode 100644 index 0000000..450aa05 --- /dev/null +++ b/src/main/java/study/com/nio/socket/SelectorClient.java @@ -0,0 +1,80 @@ +package study.com.nio.socket; + +import com.google.common.base.Strings; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.Scanner; +import java.util.concurrent.TimeUnit; + +/** + * Created by runlibo.li on 2018/2/1. + * + * @author runlibo.li + */ +@Slf4j +public class SelectorClient { + + public static void main(String[] args) throws IOException, InterruptedException { + SocketChannel socketChannel = null; + Scanner scanner = new Scanner(System.in); + + try { + socketChannel = SocketChannel.open(); + socketChannel.configureBlocking(false); + socketChannel.connect(new InetSocketAddress(9999)); + while (!socketChannel.finishConnect()) { + log.info("connect has not finished, wait..."); + TimeUnit.SECONDS.sleep(1); + } + + ByteBuffer writeBuffer = ByteBuffer.allocate(1024); + ByteBuffer header = ByteBuffer.allocate(4); + ByteBuffer body = ByteBuffer.allocate(1024); + + ByteBuffer[] readBuffer = new ByteBuffer[]{header, body}; + while (true) { + String inputStr = scanner.nextLine(); + if (Strings.isNullOrEmpty(inputStr)) { + continue; + } + if (inputStr.startsWith("exit")) { + break; + } + + byte[] bytes = inputStr.getBytes("UTF-8"); + writeBuffer.put(bytes).flip(); + while (writeBuffer.hasRemaining()) { + socketChannel.write(writeBuffer); + } + writeBuffer.clear(); + + long res = 0; + while ((res = socketChannel.read(readBuffer)) == 0) { + } + if (res == -1) { + break; + } + + header.flip(); + body.flip(); + byte[] receiveBytes = new byte[body.remaining()]; + body.get(receiveBytes); + log.info("client receive, header:{}, body:{}", header.getInt(), new String(receiveBytes, "UTF-8")); + header.clear(); + body.clear(); + } + } catch (Exception e) { + log.error("", e); + throw e; + } finally { + if (socketChannel != null) { + socketChannel.close(); + } + scanner.close(); + } + } +} diff --git a/src/main/java/study/com/nio/socket/SelectorServer.java b/src/main/java/study/com/nio/socket/SelectorServer.java new file mode 100644 index 0000000..cc9e8e9 --- /dev/null +++ b/src/main/java/study/com/nio/socket/SelectorServer.java @@ -0,0 +1,123 @@ +package study.com.nio.socket; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.*; +import java.util.Iterator; + +/** + * Created by runlibo.li on 2018/2/1. + * + * @author runlibo.li + */ +@Slf4j +public class SelectorServer { + public static void main(String[] args) throws IOException { + ServerSocketChannel serverSocketChannel = null; + Selector selector = null; + + try { + serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.bind(new InetSocketAddress(9999)); + serverSocketChannel.configureBlocking(false); + + selector = Selector.open(); + serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); + + while (selector.select() != 0) { + Iterator iterator = selector.selectedKeys().iterator(); + while (iterator.hasNext()) { + SelectionKey selectionKey = iterator.next(); + if (selectionKey.isConnectable()) { + log.info("Connectable"); + } else if (selectionKey.isAcceptable()) { + log.info("Acceptable"); + handleAccept(selectionKey); + } else if (selectionKey.isReadable()) { + log.info("Readable"); + handleRead(selectionKey); + } else if (selectionKey.isWritable()) { + log.info("Writable"); + handleWrite(selectionKey); + } + + iterator.remove(); + } + } + } catch (IOException e) { + log.error("", e); + throw e; + } finally { + if (serverSocketChannel != null) { + serverSocketChannel.close(); + } + + if (selector != null) { + selector.close(); + } + } + } + + + private static void handleAccept(SelectionKey key) throws IOException { + SocketChannel clientChannel = null; + try { + clientChannel = ((ServerSocketChannel) key.channel()).accept(); + clientChannel.configureBlocking(false); + clientChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(1024)); + } catch (IOException e) { + log.error("", e); + key.cancel(); + if (clientChannel != null) { + clientChannel.close(); + } + } + } + + private static void handleRead(SelectionKey key) throws IOException { + SocketChannel clientChannel = (SocketChannel) key.channel(); + try { + ByteBuffer buffer = (ByteBuffer) key.attachment(); + if (clientChannel.read(buffer) != -1) { + buffer.flip(); + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + + log.info("server receive: {}", new String(bytes, "UTF-8")); + buffer.rewind(); + key.interestOps(SelectionKey.OP_WRITE); + } else { + clientChannel.close(); + key.cancel(); + } + } catch (IOException e) { + log.error("", e); + key.cancel(); + clientChannel.close(); + } + } + + private static void handleWrite(SelectionKey key) throws IOException { + SocketChannel clientChannel = (SocketChannel) key.channel(); + ByteBuffer header = ByteBuffer.allocate(4); + header.putInt(0); + header.flip(); + ByteBuffer body = (ByteBuffer) key.attachment(); + ByteBuffer[] content = new ByteBuffer[]{header, body}; + + try { + clientChannel.write(content); + } catch (IOException e) { + log.error("", e); + key.cancel(); + clientChannel.close(); + } + + body.clear(); + key.interestOps(SelectionKey.OP_READ); + } + +} diff --git a/src/main/java/study/com/oom/DirectMemoryOOM.java b/src/main/java/study/com/oom/DirectMemoryOOM.java new file mode 100644 index 0000000..4570f76 --- /dev/null +++ b/src/main/java/study/com/oom/DirectMemoryOOM.java @@ -0,0 +1,26 @@ +package study.com.oom; + +import sun.misc.Unsafe; + +import java.lang.reflect.Field; + +/** + * Created by runlibo.li on 2018/1/10. + * + * @author runlibo.li + */ +public class DirectMemoryOOM { + private static final int MB = 1 * 1024 * 1024; + + /** + * -XX:MaxDirectMemorySize=10M + */ + public static void main(String[] args) throws IllegalAccessException { + Field field = Unsafe.class.getDeclaredFields()[0]; + field.setAccessible(true); + Unsafe unsafe = (Unsafe) field.get(null); + while (true) { + unsafe.allocateMemory(MB); + } + } +} diff --git a/src/main/java/study/com/oom/HeapOOM.java b/src/main/java/study/com/oom/HeapOOM.java new file mode 100644 index 0000000..33e150f --- /dev/null +++ b/src/main/java/study/com/oom/HeapOOM.java @@ -0,0 +1,26 @@ +package study.com.oom; + +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * Created by runlibo.li on 2018/1/9. + * + * @author runlibo.li + */ +public class HeapOOM { + + static class OOMObject { + } + + /** + * -Xms10M -Xmx10M -XX:+HeapDumpOnOutOfMemoryError + */ + public static void main(String[] args) { + List list = Lists.newArrayList(); + while (true) { + list.add(new OOMObject()); + } + } +} diff --git a/src/main/java/study/com/oom/RuntimeConstanntPoolOOM.java b/src/main/java/study/com/oom/RuntimeConstanntPoolOOM.java new file mode 100644 index 0000000..65c57fc --- /dev/null +++ b/src/main/java/study/com/oom/RuntimeConstanntPoolOOM.java @@ -0,0 +1,37 @@ +package study.com.oom; + +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * Created by runlibo.li on 2018/1/10. + * + * @author runlibo.li + */ +public class RuntimeConstanntPoolOOM { + + /** + * -XX:PermSize=10M -XX:MaxPermSize=10M + * 在jdk7之后这两个参数已经不再支持,没有了长久代 + * 对运行时期生成的常量,常量池中也不再存储对象实例,而是存储对象引用,对象实例存储在堆中 + * 可以通过String.intern()方法,JDK1.7考虑下面例子: + * String str1 = new StringBuilder("计算机").append("软件").toString(); + * System.out.println(str1.intern() == str1); true + * String str2 = new String("a"); + * System.out.println(str2.intern() == str2); false + * String str3= new String("a") + "b"; + * System.out.println(str3.intern() == str3); true + * String str4= new String("ma") + "in"; + * System.out.println(str4.intern() == str4); false + *

+ * 下面方法当生成常量的大小超过堆的限制时会抛出oom的异常 + */ + public static void main(String[] args) { + List list = Lists.newArrayList(); + int i = 0; + while (true) { + list.add(String.valueOf(i++).intern()); + } + } +} diff --git a/src/main/java/study/com/oom/StackOOM.java b/src/main/java/study/com/oom/StackOOM.java new file mode 100644 index 0000000..06eceb0 --- /dev/null +++ b/src/main/java/study/com/oom/StackOOM.java @@ -0,0 +1,26 @@ +package study.com.oom; + + +/** + * Created by runlibo.li on 2018/1/10. + * + * @author runlibo.li + */ +public class StackOOM { + + private Long stackLength = 1L; + + public void stackLeak() { + ++stackLength; + stackLeak(); + } + + + /** + * -Xss128k + */ + public static void main(String[] args) { + StackOOM stackOOM = new StackOOM(); + stackOOM.stackLeak(); + } +} diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 0000000..2587770 --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,12 @@ + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %c{1}:%L [%t] - %m%n + + + + + + + \ No newline at end of file diff --git a/test.txt b/test.txt new file mode 100644 index 0000000..22a9eee --- /dev/null +++ b/test.txt @@ -0,0 +1,3 @@ +This is a text file. +abcdefghijklmnopqrstuvwxyz +end~ \ No newline at end of file