中易网

Java网络编程中怎样用管道流

答案:5  悬赏:40  
解决时间 2021-03-29 11:46
Java网络编程中怎样用管道流
最佳答案
管道流可以实现两个线程之间,二进制数据的传输。
  管道流就像一条管道,一端输入数据,别一端则输出数据。通常要分别用两个不同的线程来控制它们。
  使用方法如下:
  [html] view plaincopy
  import java.io.IOException;
  import java.io.PipedInputStream;
  import java.io.PipedOutputStream;
  
  public class PipedInputStreamTest {
  
  public static void main(String[] args) {
  //管道输出流
  PipedOutputStream out = new PipedOutputStream();
  //管道输入流
  PipedInputStream in = null;
  try {
  //连接两个管道流。或者调用connect(Piped..);方法也可以
  in = new PipedInputStream(out);
  Thread read = new Thread(new Read(in));
  Thread write = new Thread(new Write(out));
  //启动线程
  read.start();
  write.start();
  } catch (IOException e) {
  e.printStackTrace();
  }
  }
  }
  
  class Write implements Runnable {
  PipedOutputStream pos = null;
  
  public Write(PipedOutputStream pos) {
  this.pos = pos;
  }
  
  public void run() {
  try {
  System.out.println("程序将在3秒后写入数据,请稍等。。。");
  Thread.sleep(3000);
  pos.write("wangzhihong".getBytes());
  pos.flush();
  } catch (IOException e) {
  e.printStackTrace();
  } catch (InterruptedException e) {
  e.printStackTrace();
  } finally {
  try {
  if (pos != null) {
  pos.close();
  }
  } catch (IOException e) {
  e.printStackTrace();
  }
  }
  }
  }
  
  class Read implements Runnable {
  PipedInputStream pis = null;
  
  public Read(PipedInputStream pis) {
  this.pis = pis;
  }
  
  public void run() {
  byte[] buf = new byte[1024];
  try {
  pis.read(buf);
  System.out.println(new String(buf));
  } catch (IOException e) {
  e.printStackTrace();
  } finally {
  try {
  if (pis != null) {
  pis.close();
  }
  } catch (IOException e) {
  e.printStackTrace();
  }
  }
  }
  }
全部回答
Server端
------------------
import java.io.*;
import java.net.*;
import java.util.Scanner;
public class Server {
public static int PORT = 1007;
private ServerSocket server;
public Server() throws IOException {
server = new ServerSocket(PORT);
}
public static void main(String[] args) throws Exception {
System.out.println("--Server--");
Server server = new Server();
server.accept();
}
public void accept() throws IOException {
Socket client = server.accept();//等待客户端的连接
getInfo(client.getInputStream());//启动等待消息线程
toInfo(client.getOutputStream());//启动发送消息线程
}
//等待客户端发送消息
public void getInfo(InputStream in) throws IOException {
final Scanner sc = new Scanner(in);//获取客户端的输入流
new Thread() {
@Override
public void run() {
while(true) {
if(sc.hasNextLine()) {//如果客户端有发送消息过来
System.out.println("Client:" + sc.nextLine());//打印客户端的消息
}
}
}
}.start();
}
//发送消息到客户端
public void toInfo(OutputStream out) throws IOException {
final PrintWriter pw = new PrintWriter(out, true);//获取客户端的输出流,自动清空缓存的内容
final Scanner sc = new Scanner(System.in);//获取控制台的标准输入流,从控制台输入数据
new Thread() {
@Override
public void run() {
while(true) {
pw.println(sc.nextLine());//将输入的数据发送给客户端
}
}
}.start();
}
}
Client端
------------------------
import java.io.*;
import java.net.Socket;
import java.util.Scanner;
public class Client {
public static void main(String[] args) throws Exception, IOException {
System.out.println("--Client--");
Client client = new Client();
client.connection("localhost", Server.PORT);
}
public void connection(String host, int port) throws IOException {
Socket client = new Socket(host, port);
getInfo(client.getInputStream());
toInfo(client.getOutputStream());
}
public void getInfo(InputStream in) throws IOException {
final Scanner sc = new Scanner(in);
new Thread() {
@Override
public void run() {
while(true) {
if(sc.hasNextLine()) {
System.out.println("Server:" + sc.nextLine());
}
}
}
}.start();
}
public void toInfo(OutputStream out) throws IOException {
final PrintWriter pw = new PrintWriter(out, true);
final Scanner sc = new Scanner(System.in);
new Thread() {
@Override
public void run() {
while(true) {
pw.println(sc.nextLine());
}
}
}.start();
}
}
管道流就像一条条水管只是有些水管没有水而已!网路流就像外来水管裏面有水!通过编程对接将网路流连接到IO流或然後将它打印出来或做其它处理。
一起学习,管道流满搞的,用着比socket的stream麻烦
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.*;
import java.nio.channels.*;
import java.nio.*;
import java.util.*;
import java.nio.charset.*;
public class TCP extends Thread{
private SocketChannel channel;
private ServerSocket serverSocket;
private ServerSocketChannel serverSocketChannel;
private ByteBuffer readBuffer;
private ByteBuffer sendBuffer;
private Boolean isAccept=false;
private boolean isConnect=false;
private Thread accept;
private Thread connect;

public TCP(int port,String addr) {
try {
readBuffer=ByteBuffer.allocate(1024);
serverSocketChannel=ServerSocketChannel.open();
serverSocket=serverSocketChannel.socket();
serverSocket.setReuseAddress(true);
serverSocket.bind(new InetSocketAddress(port));
channel=SocketChannel.open();
channel.connect(new InetSocketAddress(InetAddress.getByName(addr),port));
accept=new Thread(){
public void run(){
Selector selector;
try {
selector=Selector.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
isAccept=false;
selectors(selector);
} catch (ClosedChannelException ex) {
ex.printStackTrace();
} catch (IOException ex) {
ex.printStackTrace();
}
}
};
connect=new Thread(){
public void run(){
try{
Selector selector;
selector=Selector.open();
channel.configureBlocking(false);
channel.register(selector,SelectionKey.OP_WRITE|SelectionKey.OP_READ);
isConnect=false;
selectors(selector);
}catch (Exception ex){
ex.printStackTrace();
}
}
};
} catch (IOException ex) {
ex.printStackTrace();
System.out.println("d1");
}catch(Exception ex){
System.out.println("d2");
}
}
private void service(){
if(isConnect){
connect.start();
}
if(isAccept){
accept.start();
}
}
private void selectors(Selector selector){
try {
while (selector.select()>0){
Set readyKeys=selector.selectedKeys();
Iterator it=readyKeys.iterator();
while(it.hasNext()){
SelectionKey key=null;
key=it.next();
it.remove();
if(key.isAcceptable()){
//System.out.println("isAcceptable");
ServerSocketChannel ssc=(ServerSocketChannel)key.channel();
SocketChannel socketChannel=ssc.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
if(key.isReadable()){
synchronized(readBuffer){
// System.out.println("isReadable");
ByteBuffer buffer=ByteBuffer.allocate(1024);
SocketChannel socketChannel=(SocketChannel)key.channel();
socketChannel.read(buffer);
readBuffer=buffer;
}
}
if(key.isWritable()){
synchronized(sendBuffer){
// System.out.println("isWritable");
SocketChannel channel=(SocketChannel)key.channel();
if(sendBuffer!=null)
channel.write(sendBuffer);
}
}

}
try {
sleep(1);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
} catch (ClosedChannelException ex) {
ex.printStackTrace();
} catch (IOException ex) {
ex.printStackTrace();
}
}
public void send(ByteBuffer buff){
this.sendBuffer=buff;
}
public ByteBuffer get(){
return readBuffer;
}
public void accpet(){
isAccept=true;
}
public void connect(){
isConnect=true;
}
public void run(){
while (true){
service();
}
}
}
一起学习,管道流满搞的,用着比socket的stream麻烦
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.*;
import java.nio.channels.*;
import java.nio.*;
import java.util.*;
import java.nio.charset.*;
public class TCP extends Thread{
private SocketChannel channel;
private ServerSocket serverSocket;
private ServerSocketChannel serverSocketChannel;
private ByteBuffer readBuffer;
private ByteBuffer sendBuffer;
private Boolean isAccept=false;
private boolean isConnect=false;
private Thread accept;
private Thread connect;

public TCP(int port,String addr) {
try {
readBuffer=ByteBuffer.allocate(1024);
serverSocketChannel=ServerSocketChannel.open();
serverSocket=serverSocketChannel.socket();
serverSocket.setReuseAddress(true);
serverSocket.bind(new InetSocketAddress(port));
channel=SocketChannel.open();
channel.connect(new InetSocketAddress(InetAddress.getByName(addr),port));
accept=new Thread(){
public void run(){
Selector selector;
try {
selector=Selector.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
isAccept=false;
selectors(selector);
} catch (ClosedChannelException ex) {
ex.printStackTrace();
} catch (IOException ex) {
ex.printStackTrace();
}
}
};
connect=new Thread(){
public void run(){
try{
Selector selector;
selector=Selector.open();
channel.configureBlocking(false);
channel.register(selector,SelectionKey.OP_WRITE|SelectionKey.OP_READ);
isConnect=false;
selectors(selector);
}catch (Exception ex){
ex.printStackTrace();
}
}
};
} catch (IOException ex) {
ex.printStackTrace();
System.out.println("d1");
}catch(Exception ex){
System.out.println("d2");
}
}
private void service(){
if(isConnect){
connect.start();
}
if(isAccept){
accept.start();
}
}
private void selectors(Selector selector){
try {
while (selector.select()>0){
Set readyKeys=selector.selectedKeys();
Iterator<SelectionKey> it=readyKeys.iterator();
while(it.hasNext()){
SelectionKey key=null;
key=it.next();
it.remove();
if(key.isAcceptable()){
//System.out.println("isAcceptable");
ServerSocketChannel ssc=(ServerSocketChannel)key.channel();
SocketChannel socketChannel=ssc.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
if(key.isReadable()){
synchronized(readBuffer){
// System.out.println("isReadable");
ByteBuffer buffer=ByteBuffer.allocate(1024);
SocketChannel socketChannel=(SocketChannel)key.channel();
socketChannel.read(buffer);
readBuffer=buffer;
}
}
if(key.isWritable()){
synchronized(sendBuffer){
// System.out.println("isWritable");
SocketChannel channel=(SocketChannel)key.channel();
if(sendBuffer!=null)
channel.write(sendBuffer);
}
}

}
try {
sleep(1);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
} catch (ClosedChannelException ex) {
ex.printStackTrace();
} catch (IOException ex) {
ex.printStackTrace();
}
}
public void send(ByteBuffer buff){
this.sendBuffer=buff;
}
public ByteBuffer get(){
return readBuffer;
}
public void accpet(){
isAccept=true;
}
public void connect(){
isConnect=true;
}
public void run(){
while (true){
service();
}
}
}
我要举报
如以上问答内容为低俗、色情、不良、暴力、侵权、涉及违法等信息,可以点下面链接进行举报!
大家都在看
崇左驮卢镇农业银行的行号是什么?
封丘县房屋拆迁补偿标准是什么?
泰安阳光小镇里的淘乐堡一次能玩多久?
广州市体育馆天河体育中心武术器材在哪能买到
贵州省贵阳市修文县扎佐镇邮编是什么?
四川省广元市朝天区花石乡金花村邮编是什么?
轩和茶庄怎么去啊,我要去那办事
CBB微商是传销吗?
2011年5月底买多美滋金装金盾奶粉3段,发现配
火车票,昆明到罗平
让男人上瘾一辈子的女人是怎样的
关于澳门的青年旅社
中国国债可以到哪里去买到啊?
请问,为什么我们的历史教科书里没有朱载堉?
iphone5充电宝10元能用么
推荐资讯
简单的几套孕期瑜伽 轻松缓解腰背酸痛(图文
5岁孩子晚上很难入睡,到底是什么原因啊?
常熟市酒吧咨客招聘
怀孕什么时候补钙呢?好担心胎宝宝营养跟不上
算了吧散了吧什么歌
原平范中怎么样近年我在那里报了名
烟台ktv沙发、练歌房沙发、歌厅沙发、酒吧沙
宝宝经常吐奶怎么回事如何防吐奶
广东省广州市海珠区沥窖付四约北街邮编是什么
衣帽间在主卧和主卫生间中间好不好?
室内墙潮,墙面的涂料起皮了,想重新装修一下
恋之欲室在哪能看
手机登qq时,显示手机磁盘不足,清理后重新登
刺客的套装怎么选啊?