node的可读流基于事件
可读流之流动模式,这种流动模式会有一个"开关",每次当"开关"开启的时候,流动模式起作用,如果将这个"开关"设置成暂停的话,那么,这个可读流将不会去读取文件,直到将这个"开关"重新置为流动。
读取文件流程
读取文件内容的流程,主要为:
- 打开文件,打开文件成功,将触发open事件,如果打开失败,触发error事件和close事件,将文件关闭。
- 开始读取文件中的内容,监听data事件,数据处于流动状态,可通过修改开关的状态来暂停读取。
- 每次读取到的内容放入缓存中,并通过data事件将数据发布出去。
- 当文件中的内容读取完毕之后,将文件关闭。
这一系列动作都是基于事件来进行操作的,而node中的事件我们都知道是一种发布订阅模式来实现的。
下面我们来看一看,node是如何使用可读流来读取文件中的内容?
node 可读流参数
首先我们通过fs模块来创建一个可读流,可读流接受两个参数:
- 第一个参数是要读取的文件地址,在这里指明你要读取哪个文件。
- 第二个参数是可选项,这个参数是一个对象,用来指定可读流的一些具体的参数。
如下几个参数我们来一一说明:
- highWaterMark:设置高水位线,这个参数主要用于在读取文件时,可读流会将文件中的内容读取到缓存当中,而这里我们需要创建一个buffer来缓存这些数据,所以这个参数是用来设置buffer的大小,如果不对这个参数进行设置的话,可读流默认的配置64k。
- flags:这个参数主要用于设置文件的执行模式,比如说我们具体的操作适用于读取文件还是写入文件等这些操作。如果是写入文件的话那我们,使用的是w。如果是读取文件的话那这个操作符就应该是r。
下面这张表格就说明了不同的符号代表不同含义:
符号
含义
r
读文件,文件不存在报错
r+
读取并写入,文件不存在报错
rs
同步读取文件并忽略缓存
w
写入文件,不存在则创建,存在则清空
wx
排它写入文件
w+
读取并写入文件,不存在则创建,存在则清空
wx+
和w+类似,排他方式打开
a
追加写入
ax
与a类似,排他方式写入
a+
读取并追加写入,不存在则创建
ax+
作用与a+类似,但是以排他方式打开文件
- autoClose:这个参数主要用于,对文件的关闭的一些控制。如果文件再打开的过程或者其他操作的过程中出现了错误的情况下,需要将文件进行关闭。那这个参数是设置文件是否自动关闭的功能。
- encoding:node中用buffer来读取文件操作的东西二进制数据。这些数据展现出来的话我们是一堆乱码,所以需要,要我们对这个数据指定一个具体的编码格式。然后将会对这些数据进行编码转化,这样转化出来的数据就是我们能看懂的数据。
- starts:这个参数主要用于指定从什么位置开始读取文件中的内容,默认的话是从零开始。
- ends:这个参数主要用于指定定具体要读取文件多长的数据,这里需要说明一下,这个参数是包括本身的位置,也就是所谓的包前和包后。
下面我们来看看可读流具体例子:
let fs = require("fs"); let rs = fs.createReadStream("./a.js", { highWaterMark: 3, encoding: "utf8", autoClose: true, start: 0, end: 9 }); rs.on("open", () => {console.log("open");}); rs.on("close", () => {console.log("close");}); rs.on("data", data => { console.log(data); rs.pause();//暂停读取 此时流动模式为暂停模式 }); setInterval(() => { rs.resume();//重新设置为流动模式,开始读取数据 }, 1000); rs.on("end", () => { console.log("end"); }); rs.on("error", err => { console.log(err); });
手写可读流第一步
上面我们说过,node可读流是基于node的核心模块事件来完成的,所以在实现我们自己的可读流时需要继承events模块,代码如下:
let fs = require('fs'); let EventEmitter = require('events'); class ReadStream extends EventEmitter { }
继承了EventEmitter类,我们就可以使用EventEmitter类中的各个方法,并且同样是采用发布订阅的模式了处理事件。
第二步:处理可读流配置的参数
上面我们提到,node中创建可读流时可以对这个流配置具体的参数,比如
let rs = fs.createReadStream("./a.js", { highWaterMark: 3, encoding: "utf8", autoClose: true, start: 0, end: 9 });
那么对于这些参数,我们自己实现的可读流类也需要对这些参数进行处理,那么这些参数该如何进行处理呢?
constructor(path, options = {}) { super(); this.path = path; //指定要读取的文件地址 this.highWaterMark = options.highWaterMark || 64 * 1024; this.autoClose = options.autoClose || true; //是否自动关闭文件 this.start = options.start || 0; //从文件哪个位置开始读取 this.pos = this.start; // pos会随着读取的位置改变 this.end = options.end || null; // null表示没传递 this.encoding = options.encoding || null;// buffer编码 this.flags = options.flags || 'r'; this.flowing = null; // 模式开关 this.buffer = Buffer.alloc(this.highWaterMark);// 根据设置创建一个buffer存储读出来的数 this.open(); }
通常配置的原则是以用户配置的参数为准,如果用户没有对这个参数进行设置的话,就采用默认的配置。
实现可读流第三步:打开文件
这里原理是使用node模块fs中的open方法。首先我们来回顾下fs.open()方法的使用。
fs.open(filename,flags,[mode],callback); //实例 fs.open('./1,txt','r',function(err,fd){});
这里需要说明下,回调函数callback中有2个参数:
- 第一个是error,node中异步回调都会返回的一个参数,用来说明具体的错误信息
- 第二个参数是fd,是文件描述符,用来标识文件,等价于open函数的第一个参数
好了,现在我们来看看我们自己的可读流的open方法该如何实现吧:
open() { fs.open(this.path, this.flags, (err, fd) => { //fd标识的就是当前this.path这个文件,从3开始(number类型) if (err) { if (this.autoClose) { // 如果需要自动关闭则去关闭文件 this.destroy(); // 销毁(关闭文件,触发关闭事件) } this.emit('error', err); // 如果有错误触发error事件 return; } this.fd = fd; // 保存文件描述符 this.emit('open', this.fd); // 触发文件的打开的方法 }); }
从代码上我们可以看出:
fs.open函数是异步函数,也就是说callback是异步执行的,在成功打开文件的情况下,fd这个属性也是异步获取到的,这点需要注意。
另外重要的一点是,如果在打开文件发生错误时,则表明打开文件失败,那么此时就需要将文件关闭。
实现可读流第四步:读取文件内容
上面我们详细说过,可读流自身定义了一个"开关",当我们要读取文件中的内容的时候,我们需要将这个"开关"打开,那么node可读流本身是如何来打开这个"开关"的呢?
监听data事件
node可读流通过监听data事件来实现这个"开关"的开启:
rs.on("data", data => { console.log(data); });
当用户监听data事件的时候,"开关"开启,不停的从文件中读取内容。那么node是怎么监听data事件的呢?
答案就是 事件模块的newListener
这是因为node可读流是基于事件的,而事件中,服务器就可以通过newListener事件监听到从用户这边过来的所有事件,每个事件都有对应的类型,当用户监听的是data事件的时候,我们就可以获取到,然后就可以去读取文件中的内容了,那我们自己的可读流该如何实现呢?
// 监听newListener事件,看是否监听了data事件,如果监听了data事件的话,就开始启动流动模式,读取文件中的内容 this.on("newListener", type => { if (type === "data") { // 开启流动模式,开始读取文件中的内容 this.flowing = true; this.read(); } });
好了,知道了这个"开关"是如何打开的,那么这个时候就到了真正读取文件中内容的关键时候了,先上代码先:
read() { // 第一次读取文件的话,有可能文件是还没有打开的,此时this.fd可能还没有值 if (typeof this.fd !== "number") { // 如果此时文件还是没有打开的话,就触发一次open事件,这样文件就真的打开了,然后再读取 return this.once("open", () => this.read()); } // 具体每次读取多少个字符,需要进行计算,因为最后一次读取倒的可能比highWaterMark小 let howMuchRead = this.end "data", srr); // 当读取到到内容长度和设置的highWaterMark一致的话,并且还是流动模式的话,就继续读取 if ((byteRead === this.highWaterMark) && this.flowing) { return this.read(); } // 没有更多的内容了,此时表示文件中的内容已经读取完毕 if (byteRead < this.highWaterMark) { // 读取完成,发布end方法,并关闭文件 this.emit("end"); this.destory(); } }); }
这里我们特别要注意的是:
- 文件是否已经打开,是否获取到fd,如果没有打开的话,则再次触发open方法
- 分批次读取文件内容,每次读取的内容是变化的,所以位置和偏移量是要动态计算的
- 控制读取停止的条件。
实现可读流第五步:关闭文件
好了,到现在,基础的读取工作已经完成,那么就需要将文件关闭了,上面的open和read方法里面都调用了一个方法:destory,没错,这个就是关闭文件的方法,好了,那么我们来看看这个方法该如何实现吧
destory() { if (typeof this.fd !== "number") { // 发布close事件 return this.emit("close"); } // 将文件关闭,发布close事件 fs.close(this.fd, () => { this.emit("close"); }); }
当然这块的原理就是调用fs模块的close方法啦。
实现可读流第六步:暂停和恢复
既然都说了,node可读流有一个神奇的"开关",就像大坝的阀门一样,可以控制水的流动,同样也可以控制水的暂停啦。当然在node可读流中的暂停是停止对文件的读取,恢复就是将开关打开,继续读取文件内容,那么这两个分别对应的方法就是pause()和resume()方法。
那么我们自己的可读流类里面该如何实现这两个方法的功能呢?非常简单:
我们在定义类的私有属性的时候,定义了这样一个属性flowing,当它的值为true时表示开关打开,反之关闭。
pause() { this.flowing = false;// 将流动模式设置成暂停模式,不会读取文件 } resume() { this.flowing = true;//将模式设置成流动模式,可以读取文件 this.read();// 重新开始读取文件 }
好了,关于node可读流的实现我们就写到这里,快快敲起代码,动手实现一个你自己的可读流吧!
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
稳了!魔兽国服回归的3条重磅消息!官宣时间再确认!
昨天有一位朋友在大神群里分享,自己亚服账号被封号之后居然弹出了国服的封号信息对话框。
这里面让他访问的是一个国服的战网网址,com.cn和后面的zh都非常明白地表明这就是国服战网。
而他在复制这个网址并且进行登录之后,确实是网易的网址,也就是我们熟悉的停服之后国服发布的暴雪游戏产品运营到期开放退款的说明。这是一件比较奇怪的事情,因为以前都没有出现这样的情况,现在突然提示跳转到国服战网的网址,是不是说明了简体中文客户端已经开始进行更新了呢?
更新日志
- 2024罗志祥《舞狀元》[FLAC/MP3][1G]
- 张美玲侯俊辉1999-福建情歌对唱[南方][WAV+CUE]
- 江希文.1994-伝说少女(饿狼伝说动画原声大碟)【嘉音】【WAV+CUE】
- 黄思婷2020-风中泪[豪记][WAV+CUE]
- 刘韵.1998-DENON.MASTERSONIC系列【EMI百代】【WAV+CUE】
- 群星.2024-你的谎言也动听影视原声带【韶愔音乐】【FLAC分轨】
- 群星.2003-难忘的影视金曲·港台篇【正大国际】【WAV+CUE】
- 试音天碟《原音HQCD》风林 [WAV+CUE][1.1G]
- 李思思《喜欢你》头版限量编号24K金碟[低速原抓WAV+CUE][1.1G]
- 王杰《这场游戏那场梦》 台湾华纳首版 [WAV+CUE][1G]
- 群星2005-《影视红声》2CD香港首版[WAV+CUE]
- 群星2017《聆听中国.风华国乐》试音碟[WAV+CUE]
- 群星2016-《环球词选.潘源良》环球[WAV+CUE]
- 张惠妹《爱的力量》10年情歌最精选 2CD[WAV+CUE][1.1G]
- 群星2009《LOVE TV情歌精选VOL.2》香港首版[WAV+CUE][1.1G]