Skip to content

Commit 6f2bfa6

Browse files
author
Vincent Landgraf
committed
implements exchangeable filesystem library
1 parent 34d65c7 commit 6f2bfa6

File tree

5 files changed

+45
-18
lines changed

5 files changed

+45
-18
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,17 @@ maildir conventions.
1818
var Queue = require('file-queue').Queue,
1919
queue = new Queue('.', callback);
2020

21+
### Dealing with many files and common filesystem errors
22+
23+
If you deal with lots of files EMFILE errors (too many open files errors)
24+
can occur. Issacs wrote the `graceful-fs` package to deal with these errors.
25+
To use it simply pass the filesystem library that you prefer:
26+
27+
var queue = new Queue({
28+
path: 'tmp',
29+
fs: require('graceful-fs')
30+
}, done);
31+
2132
## Pushing and popping messages from the queue
2233

2334
Popping a message can be done at any time. If the queue doesn't contain an item at the

lib/maildir.js

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
var util = require('util'),
44
events = require('events'),
5-
fs = require('fs'),
65
os = require('os'),
76
crypto = require('crypto'),
87
async = require('async'),
@@ -16,14 +15,15 @@ function Maildir(root) {
1615
this.dirPaths = [path.resolve(path.join(root, 'tmp')),
1716
path.resolve(path.join(root, 'new')),
1817
path.resolve(path.join(root, 'cur'))];
18+
this.fs = require('fs');
1919
this.pushed = 0;
2020
}
2121

2222
util.inherits(Maildir, events.EventEmitter);
2323

2424
// Finds the length of the queue (list of files in new)
2525
Maildir.prototype.length = function(callback) {
26-
fs.readdir(this.dirPaths[NEW], function(err, files) {
26+
this.fs.readdir(this.dirPaths[NEW], function(err, files) {
2727
if (err) { callback(err); }
2828
else { callback(null, files.length); }
2929
});
@@ -49,13 +49,13 @@ Maildir.prototype.generateUniqueName = function(callback) {
4949
Maildir.prototype.create = function(persistent, cb) {
5050
var that = this;
5151
async.each(this.dirPaths, function(path, callback) {
52-
fs.exists(path, function(exists) {
52+
that.fs.exists(path, function(exists) {
5353
if (exists) { callback(); }
54-
else { fs.mkdir(path, callback); }
54+
else { that.fs.mkdir(path, callback); }
5555
});
5656
}, function() {
5757
if (persistent) {
58-
that.watcher = fs.watch(that.dirPaths[NEW], {}, function(err, messages) {
58+
that.watcher = that.fs.watch(that.dirPaths[NEW], {}, function(err, messages) {
5959
that.emit('new', [messages]);
6060
});
6161
}
@@ -78,23 +78,23 @@ Maildir.prototype.newFile = function(data, callback) {
7878
else {
7979
var tmpPath = path.join(that.dirPaths[TMP], uniqueName),
8080
newPath = path.join(that.dirPaths[NEW], uniqueName);
81-
fs.writeFile(tmpPath, data, function(err) {
81+
that.fs.writeFile(tmpPath, data, function(err) {
8282
if (err) { callback(err); }
83-
else { fs.rename(tmpPath, newPath, callback); }
83+
else { that.fs.rename(tmpPath, newPath, callback); }
8484
});
8585
}
8686
});
8787
};
8888

8989
// Lists all messages in the new folder
9090
Maildir.prototype.listNew = function(callback) {
91-
fs.readdir(this.dirPaths[NEW], callback);
91+
this.fs.readdir(this.dirPaths[NEW], callback);
9292
};
9393

9494
// Clears all messages from all folders
9595
Maildir.prototype.clear = function(callback) {
9696
var that = this;
97-
async.map(this.dirPaths, fs.readdir, function(err, results) {
97+
async.map(this.dirPaths, that.fs.readdir, function(err, results) {
9898
if (err) { callback(err); }
9999
else {
100100
var unlinks = [], i, fn, len = that.dirPaths.length,
@@ -107,29 +107,30 @@ Maildir.prototype.clear = function(callback) {
107107
fn = pushDir(that.dirPaths[i]);
108108
results[i].forEach(fn);
109109
}
110-
async.each(unlinks, fs.unlink, callback);
110+
async.each(unlinks, that.fs.unlink, callback);
111111
}
112112
});
113113
};
114114

115115
// Processes one message from the queue (if possible)
116116
Maildir.prototype.process = function(message, callback) {
117117
var newPath = path.join(this.dirPaths[NEW], message),
118-
curPath = path.join(this.dirPaths[CUR], message);
118+
curPath = path.join(this.dirPaths[CUR], message),
119+
that = this;
119120

120-
fs.rename(newPath, curPath, function(err) {
121+
this.fs.rename(newPath, curPath, function(err) {
121122
// if message could not be moved, another process probably already works
122123
// on it, so we try to pop again, but we try further on the list
123124
if (err) { callback(err); }
124125
else {
125-
fs.readFile(curPath, function(err, data) {
126+
that.fs.readFile(curPath, function(err, data) {
126127
if (err) { callback(err); }
127128
else {
128129
callback(null, data,
129130
// commit function
130-
function(cb) { fs.unlink(curPath, cb); },
131+
function(cb) { that.fs.unlink(curPath, cb); },
131132
// rollback function
132-
function(cb) { fs.rename(curPath, newPath, cb); }
133+
function(cb) { that.fs.rename(curPath, newPath, cb); }
133134
);
134135
}
135136
});

package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "file-queue",
3-
"version": "0.2.1",
3+
"version": "0.3.0",
44
"description": "A file system based queue (implemented using maildir)",
55
"main": "queue.js",
66
"scripts": {
@@ -13,7 +13,8 @@
1313
"devDependencies": {
1414
"mocha": "1.13.0",
1515
"jshint": "*",
16-
"istanbul": "0.2.6"
16+
"istanbul": "0.2.6",
17+
"graceful-fs": "*"
1718
},
1819
"keywords": [
1920
"queue",

queue.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@ function Queue(options, cb) {
1717
this.maildir = new Maildir(path);
1818
this.laterPop = [];
1919

20+
// determine if different fs access library is used
21+
if (typeof options.fs !== 'undefined') {
22+
this.maildir.fs = options.fs;
23+
} else {
24+
this.maildir.fs = require('fs');
25+
}
26+
2027
// be notified, when new messages are available
2128
this.maildir.on('new', function(messages) {
2229
var callback = that.laterPop.shift();

test/queue.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,14 @@ describe('Queue', function() {
77
var queue;
88

99
beforeEach(function(done) {
10-
queue = new Queue('tmp', done);
10+
if (Math.random() > 0.5) {
11+
queue = new Queue('tmp', done);
12+
} else {
13+
queue = new Queue({
14+
path: 'tmp',
15+
fs: require('graceful-fs')
16+
}, done);
17+
}
1118
});
1219

1320
afterEach(function(done) {

0 commit comments

Comments
 (0)