Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
00dc7bb
add fread connection support
ben-schwen Nov 8, 2025
78bce0e
fix testnum
ben-schwen Nov 8, 2025
0afd468
make linterse happy
ben-schwen Nov 8, 2025
fa79c8c
make linters even more happy
ben-schwen Nov 8, 2025
9590e22
remove read bytes %d since this can overflow
ben-schwen Nov 8, 2025
58b3386
add coverage
ben-schwen Nov 8, 2025
995d2dc
be fully experimental API compliant
ben-schwen Nov 8, 2025
3866b6d
more coverage
ben-schwen Nov 8, 2025
8294c6f
update error message for nrow and mmap
ben-schwen Nov 8, 2025
1b7cec7
add wording changes
ben-schwen Nov 10, 2025
9b3c387
add connections guard
ben-schwen Nov 10, 2025
3da8943
add strerrors
ben-schwen Nov 10, 2025
f6f9ed3
add errno lib
ben-schwen Nov 10, 2025
5a98e62
add reopen_connection generic
ben-schwen Nov 10, 2025
d76c3a5
close con on exit
ben-schwen Nov 10, 2025
d520cd4
adjust doc
ben-schwen Nov 10, 2025
c3f7cf6
update conncection info
ben-schwen Nov 10, 2025
4235a5c
reopen connection
ben-schwen Nov 10, 2025
e37b0ee
change modes
ben-schwen Nov 10, 2025
2bcfc6c
update docs
ben-schwen Nov 10, 2025
2e67cc2
add nocov
ben-schwen Nov 10, 2025
4383ae2
use R_ExecWithCleanup to clean up on errors
ben-schwen Nov 10, 2025
5e91780
add test for consuming before fread
ben-schwen Nov 11, 2025
5182c0c
use factory pattern
ben-schwen Nov 11, 2025
441c557
add aliases for S3 methods
ben-schwen Nov 11, 2025
63fa168
Merge branch 'master' into fread_connections
ben-schwen Nov 11, 2025
a609fda
capture print in test
ben-schwen Nov 11, 2025
daabbb7
fix namespace
ben-schwen Nov 11, 2025
1a98f38
rename to binary_reopener
ben-schwen Nov 18, 2025
5eef830
More #ifdef wrapping for connections API
aitap Nov 19, 2025
0c6eff5
R_FINITE will always be true for a size_t argument
aitap Nov 19, 2025
236bc5c
Fail when nrow_limit exceeds SIZE_MAX
aitap Nov 19, 2025
6f4d90f
Use translateChar() for native encoding string
aitap Nov 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export(rbindlist)
export(fifelse)
export(fcase)
export(fread)
export(binary_reopener)
export(fwrite)
export(foverlaps)
export(shift)
Expand Down
5 changes: 4 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,10 @@ See [#2611](https://github.com/Rdatatable/data.table/issues/2611) for details. T
# user system elapsed
# 0.028 0.000 0.005
```
20. `fread()` now supports the `comment.char` argument to skip trailing comments or comment-only lines, consistent with `read.table()`, [#856](https://github.com/Rdatatable/data.table/issues/856). The default remains `comment.char = ""` (no comment parsing) for backward compatibility and performance, in contrast to `read.table(comment.char = "#")`. Thanks to @arunsrinivasan and many others for the suggestion and @ben-schwen for the implementation.

20. `fread()` now supports the `comment.char` argument to skip trailing comments or comment-only lines, consistent with `read.table()`, [#856](https://github.com/Rdatatable/data.table/issues/856). The default remains `comment.char = ""` (no comment parsing) for backward compatibility and performance, in contrast to `read.table(comment.char = "#")`. Thanks to @arunsrinivasan and many others for the suggestion and @ben-schwen for the implementation.

21. `fread()` can now read from connections directly by spilling to a temporary file first, [#561](https://github.com/Rdatatable/data.table/issues/561). For the best throughput, point `tmpdir=` (or the global temp directory) to fast storage like an SSD or RAM. Thanks to Chris Neff for the report and @ben-schwen for the implementation.

### BUG FIXES

Expand Down
94 changes: 92 additions & 2 deletions R/fread.R
Original file line number Diff line number Diff line change
@@ -1,3 +1,39 @@
# nocov start
# S3 generic that returns a function to open connections in binary mode
binary_reopener = function(con, ...) {
UseMethod("binary_reopener")
}

binary_reopener.default = function(con, ...) {
con_class = class1(con)
stopf("Don't know how to reopen connection type '%s'. Need a connection opened in binary mode to continue.", con_class)
}

binary_reopener.file = function(con, ...) {
function(description) file(description, "rb", ...)
}

binary_reopener.gzfile = function(con, ...) {
function(description) gzfile(description, "rb", ...)
}

binary_reopener.bzfile = function(con, ...) {
function(description) bzfile(description, "rb", ...)
}

binary_reopener.url = function(con, ...) {
function(description) url(description, "rb", ...)
}

binary_reopener.unz = function(con, ...) {
function(description) unz(description, "rb", ...)
}

binary_reopener.pipe = function(con, ...) {
function(description) pipe(description, "rb", ...)
}
# nocov end

fread = function(
input="", file=NULL, text=NULL, cmd=NULL, sep="auto", sep2="auto", dec="auto", quote="\"", nrows=Inf, header="auto",
na.strings=getOption("datatable.na.strings","NA"), stringsAsFactors=FALSE, verbose=getOption("datatable.verbose",FALSE),
Expand Down Expand Up @@ -55,7 +91,16 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC")
input = text
}
}
else if (is.null(cmd)) {
# Check if input is a connection and read it into memory
input_is_con = FALSE
if (!missing(input) && inherits(input, "connection")) {
input_is_con = TRUE
} else if (!is.null(file) && inherits(file, "connection")) {
input = file
input_is_con = TRUE
file = NULL
}
if (!input_is_con && is.null(cmd) && is.null(text)) {
if (!is.character(input) || length(input)!=1L) {
stopf("input= must be a single character string containing a file name, a system command containing at least one space, a URL starting 'http[s]://', 'ftp[s]://' or 'file://', or, the input data itself containing at least one \\n or \\r")
}
Expand All @@ -81,6 +126,51 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC")
}
file = tmpFile
}
connection_spill_info = NULL
if (input_is_con) {
if (verbose) {
catf("[00] Spill connection to tempfile\n Connection class: %s\n Reading connection into a temporary file... ", toString(class(input)))
flush.console()
}
spill_started.at = proc.time()
con_open = isOpen(input)

needs_reopen = FALSE
if (con_open) {
con_summary = summary(input)
binary_modes = c("rb", "r+b")
if (!con_summary$mode %chin% binary_modes) needs_reopen = TRUE
}

close_con = NULL

if (needs_reopen) {
close(input)
input = binary_reopener(input)(con_summary$description)
close_con = input
} else if (!con_open) {
open(input, "rb")
close_con = input
}
if (!is.null(close_con)) on.exit(close(close_con), add=TRUE)
tmpFile = tempfile(tmpdir=tmpdir)
on.exit(unlink(tmpFile), add=TRUE)
bytes_copied = .Call(CspillConnectionToFile, input, tmpFile, as.numeric(nrows))
spill_elapsed = (proc.time() - spill_started.at)[["elapsed"]]

if (bytes_copied == 0) {
warningf("Connection has size 0. Returning a NULL %s.", if (data.table) 'data.table' else 'data.frame')
return(if (data.table) data.table(NULL) else data.frame(NULL))
}

if (verbose) {
catf("done in %s\n", timetaken(spill_started.at))
flush.console()
}
connection_spill_info = c(spill_elapsed, bytes_copied)
input = tmpFile
file = tmpFile
}
if (!is.null(file)) {
if (!is.character(file) || length(file)!=1L)
stopf("file= must be a single character string containing a filename, or URL starting 'http[s]://', 'ftp[s]://' or 'file://'")
Expand Down Expand Up @@ -293,7 +383,7 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC")
tz="UTC"
}
ans = .Call(CfreadR,input,identical(input,file),sep,dec,quote,header,nrows,skip,na.strings,strip.white,blank.lines.skip,comment.char,
fill,showProgress,nThread,verbose,warnings2errors,logical01,logicalYN,select,drop,colClasses,integer64,encoding,keepLeadingZeros,tz=="UTC")
fill,showProgress,nThread,verbose,warnings2errors,logical01,logicalYN,select,drop,colClasses,integer64,encoding,keepLeadingZeros,tz=="UTC",connection_spill_info)
if (!length(ans)) return(null.data.table()) # test 1743.308 drops all columns
nr = length(ans[[1L]])
require_bit64_if_needed(ans)
Expand Down
24 changes: 21 additions & 3 deletions inst/tests/tests.Rraw
Original file line number Diff line number Diff line change
Expand Up @@ -2737,11 +2737,13 @@ if (test_bit64) {
# getwd() has been set by test.data.table() to the location of this tests.Rraw file. Test files should be in the same directory.
if (test_R.utils) {
f = testDir("ch11b.dat.bz2") # http://www.stats.ox.ac.uk/pub/datasets/csb/ch11b.dat
test(900.1, fread(f, logical01=FALSE), as.data.table(read.table(f)))
test(900.1, DT<-fread(f, logical01=FALSE), as.data.table(read.table(f)))
test(900.15, fread(file(f), logical01=FALSE), DT)
test(900.2, fread(f, logical01=TRUE), as.data.table(read.table(f))[,V5:=as.logical(V5)])

f = testDir("1206FUT.txt.bz2") # a CRLF line ending file (DOS)
test(901.1, DT<-fread(f,strip.white=FALSE), setDT(read.table(f,sep="\t",header=TRUE,colClasses=as.vector(sapply(DT,class)))))
test(901.15, fread(file(f), strip.white=FALSE), DT)
test(901.2, DT<-fread(f), setDT(read.table(f,sep="\t",header=TRUE,colClasses=as.vector(sapply(DT,class)),strip.white=TRUE)))
}

Expand Down Expand Up @@ -6654,8 +6656,10 @@ if (test_bit64 && test_R.utils) {
ZBJBLOAJAQI = c("LHCYS AYE ZLEMYA IFU HEI JG FEYE", "", ""),
JKCRUUBAVQ = c("", ".\\YAPCNXJ\\004570_850034_757\\VWBZSS_848482_600874_487_PEKT-6-KQTVIL-7_30\\IRVQT\\HUZWLBSJYHZ\\XFWPXQ-WSPJHC-00-0770000855383.KKZ", "")
)
test(1449.1, fread(testDir("quoted_multiline.csv.bz2"))[c(1L, 43:44), c(1L, 22:24)], DT)
test(1449.2, fread(testDir("quoted_multiline.csv.bz2"), integer64='character', select = 'GPMLHTLN')[c(1L, 43:44)][[1L]], DT[ , as.character(GPMLHTLN)])
f = testDir("quoted_multiline.csv.bz2")
test(1449.1, fread(f)[c(1L, 43:44), c(1L, 22:24)], DT)
test(1449.15, fread(file(f))[c(1L, 43:44), c(1L, 22:24)], DT)
test(1449.2, fread(f, integer64='character', select = 'GPMLHTLN')[c(1L, 43:44)][[1L]], DT[ , as.character(GPMLHTLN)])
}

# Fix for #927
Expand Down Expand Up @@ -21858,3 +21862,17 @@ test(2344.04, key(DT[, .(V4 = c("b", "a"), V2, V5 = c("y", "x"), V1)]), c("V1",

# fread with quotes and single column #7366
test(2345, fread('"this_that"\n"2025-01-01 00:00:01"'), data.table(this_that = as.POSIXct("2025-01-01 00:00:01", tz="UTC")))

# fread supports connections #561
f = testDir("russellCRLF.csv")
test(2346.1, fread(file=file(f, "r"), verbose=TRUE), fread(f), output="Spill connection to tempfile")
test(2346.2, fread(file(f, "r"), nrows=0L), fread(f, nrows=0L))
test(2346.3, fread(file(f, "r"), nrows=5), fread(f, nrows=5))
test(2346.4, fread(file(f, "r"), nrows=5, header=FALSE), fread(f, nrows=5, header=FALSE))
# test with open connection consuming part of the connection before fread
con = file(f, "rb")
test(2346.5, {readLines(con, n=3); fread(con)}, fread(f, skip=3L))
close(con)
file.create(f <- tempfile())
test(2346.6, fread(file(f)), data.table(), warning="Connection has size 0.")
unlink(f)
40 changes: 40 additions & 0 deletions man/connection_opener.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
\name{binary_reopener}
\alias{binary_reopener}
\alias{binary_reopener.default}
\alias{binary_reopener.file}
\alias{binary_reopener.gzfile}
\alias{binary_reopener.bzfile}
\alias{binary_reopener.url}
\alias{binary_reopener.unz}
\alias{binary_reopener.pipe}
\title{ Create a function to open connections in binary mode }
\description{
S3 generic that returns a function to open a connection in binary read mode. Used internally by \code{fread}. Exported so packages with custom connection classes can define methods.
}
\usage{
binary_reopener(con, ...)
}
\arguments{
\item{con}{ A connection object. }
\item{...}{ Additional arguments passed to the connection constructor. }
}
\details{
Returns a function that accepts a description argument and opens a connection in binary read mode (\code{"rb"}). Methods are provided for \code{file}, \code{gzfile}, \code{bzfile}, \code{url}, \code{unz} and \code{pipe} connections.

To support custom connection types with \code{fread}, define a method for your connection class that returns an opener function.
}
\value{
A function that accepts a description argument and returns a connection object opened in binary read mode.
}
\examples{
\dontrun{
# Define a method for a custom connection class
binary_reopener.my_con = function(con, ...) {
function(description) my_con(description, mode = "rb", ...)
}
}
}
\seealso{
\code{\link{fread}}
}
\keyword{ data }
3 changes: 2 additions & 1 deletion src/data.table.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ SEXP setcharvec(SEXP, SEXP, SEXP);
SEXP chmatch_R(SEXP, SEXP, SEXP);
SEXP chmatchdup_R(SEXP, SEXP, SEXP);
SEXP chin_R(SEXP, SEXP);
SEXP freadR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP freadR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP spillConnectionToFile(SEXP, SEXP, SEXP);
SEXP fwriteR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rbindlist(SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP setlistelt(SEXP, SEXP, SEXP);
Expand Down
16 changes: 13 additions & 3 deletions src/fread.c
Original file line number Diff line number Diff line change
Expand Up @@ -1575,9 +1575,16 @@ int freadMain(freadMainArgs _args)
CloseHandle(hFile); // see https://msdn.microsoft.com/en-us/library/windows/desktop/aa366537(v=vs.85).aspx
if (mmp == NULL) {
#endif
int nbit = 8 * sizeof(char*); // #nocov
STOP(_("Opened %s file ok but could not memory map it. This is a %dbit process. %s."), filesize_to_str(fileSize), nbit, // # nocov
nbit <= 32 ? _("Please upgrade to 64bit") : _("There is probably not enough contiguous virtual memory available")); // # nocov
// # nocov start
int nbit = 8 * sizeof(char*);
if (nrowLimit < INT64_MAX) {
STOP(_("Opened %s file ok but could not memory map it. This is a %dbit process. Since you specified nrows=%"PRId64", try wrapping the file in a connection: fread(file('filename'), nrows=%"PRId64")."),
filesize_to_str(fileSize), nbit, nrowLimit, nrowLimit);
} else {
STOP(_("Opened %s file ok but could not memory map it. This is a %dbit process. %s."), filesize_to_str(fileSize), nbit,
nbit <= 32 ? _("Please upgrade to 64bit") : _("There is probably not enough contiguous virtual memory available"));
}
// # nocov end
}
sof = (const char*) mmp;
if (verbose) DTPRINT(_(" Memory mapped ok\n"));
Expand Down Expand Up @@ -2971,7 +2978,10 @@ int freadMain(freadMainArgs _args)

if (verbose) {
DTPRINT("=============================\n"); // # notranslate
tTot = tTot + (args.connectionSpillActive ? args.connectionSpillSeconds : 0.0);
if (tTot < 0.000001) tTot = 0.000001; // to avoid nan% output in some trivially small tests where tot==0.000s
if (args.connectionSpillActive)
DTPRINT(_("%8.3fs (%3.0f%%) Spill connection to tempfile (%.3fGiB)\n"), args.connectionSpillSeconds, 100.0 * args.connectionSpillSeconds / tTot, args.connectionSpillBytes / (1024.0 * 1024.0 * 1024.0));
DTPRINT(_("%8.3fs (%3.0f%%) Memory map %.3fGiB file\n"), tMap - t0, 100.0 * (tMap - t0) / tTot, 1.0 * fileSize / (1024 * 1024 * 1024));
DTPRINT(_("%8.3fs (%3.0f%%) sep="), tLayout - tMap, 100.0 * (tLayout - tMap) / tTot);
DTPRINT(sep == '\t' ? "'\\t'" : (sep == '\n' ? "'\\n'" : "'%c'"), sep); // # notranslate
Expand Down
Loading
Loading