762 lines
21 KiB
Go
Raw Normal View History

2018-06-21 10:49:35 -04:00
// mgo - MongoDB driver for Go
//
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package mgo
import (
"crypto/md5"
"encoding/hex"
"errors"
"hash"
"io"
"os"
"sync"
"time"
"gopkg.in/mgo.v2/bson"
)
type GridFS struct {
Files *Collection
Chunks *Collection
}
type gfsFileMode int
const (
gfsClosed gfsFileMode = 0
gfsReading gfsFileMode = 1
gfsWriting gfsFileMode = 2
)
type GridFile struct {
m sync.Mutex
c sync.Cond
gfs *GridFS
mode gfsFileMode
err error
chunk int
offset int64
wpending int
wbuf []byte
wsum hash.Hash
rbuf []byte
rcache *gfsCachedChunk
doc gfsFile
}
type gfsFile struct {
Id interface{} "_id"
ChunkSize int "chunkSize"
UploadDate time.Time "uploadDate"
Length int64 ",minsize"
MD5 string
Filename string ",omitempty"
ContentType string "contentType,omitempty"
Metadata *bson.Raw ",omitempty"
}
type gfsChunk struct {
Id interface{} "_id"
FilesId interface{} "files_id"
N int
Data []byte
}
type gfsCachedChunk struct {
wait sync.Mutex
n int
data []byte
err error
}
func newGridFS(db *Database, prefix string) *GridFS {
return &GridFS{db.C(prefix + ".files"), db.C(prefix + ".chunks")}
}
func (gfs *GridFS) newFile() *GridFile {
file := &GridFile{gfs: gfs}
file.c.L = &file.m
//runtime.SetFinalizer(file, finalizeFile)
return file
}
func finalizeFile(file *GridFile) {
file.Close()
}
// Create creates a new file with the provided name in the GridFS. If the file
// name already exists, a new version will be inserted with an up-to-date
// uploadDate that will cause it to be atomically visible to the Open and
// OpenId methods. If the file name is not important, an empty name may be
// provided and the file Id used instead.
//
// It's important to Close files whether they are being written to
// or read from, and to check the err result to ensure the operation
// completed successfully.
//
// A simple example inserting a new file:
//
// func check(err error) {
// if err != nil {
// panic(err.String())
// }
// }
// file, err := db.GridFS("fs").Create("myfile.txt")
// check(err)
// n, err := file.Write([]byte("Hello world!"))
// check(err)
// err = file.Close()
// check(err)
// fmt.Printf("%d bytes written\n", n)
//
// The io.Writer interface is implemented by *GridFile and may be used to
// help on the file creation. For example:
//
// file, err := db.GridFS("fs").Create("myfile.txt")
// check(err)
// messages, err := os.Open("/var/log/messages")
// check(err)
// defer messages.Close()
// err = io.Copy(file, messages)
// check(err)
// err = file.Close()
// check(err)
//
func (gfs *GridFS) Create(name string) (file *GridFile, err error) {
file = gfs.newFile()
file.mode = gfsWriting
file.wsum = md5.New()
file.doc = gfsFile{Id: bson.NewObjectId(), ChunkSize: 255 * 1024, Filename: name}
return
}
// OpenId returns the file with the provided id, for reading.
// If the file isn't found, err will be set to mgo.ErrNotFound.
//
// It's important to Close files whether they are being written to
// or read from, and to check the err result to ensure the operation
// completed successfully.
//
// The following example will print the first 8192 bytes from the file:
//
// func check(err error) {
// if err != nil {
// panic(err.String())
// }
// }
// file, err := db.GridFS("fs").OpenId(objid)
// check(err)
// b := make([]byte, 8192)
// n, err := file.Read(b)
// check(err)
// fmt.Println(string(b))
// check(err)
// err = file.Close()
// check(err)
// fmt.Printf("%d bytes read\n", n)
//
// The io.Reader interface is implemented by *GridFile and may be used to
// deal with it. As an example, the following snippet will dump the whole
// file into the standard output:
//
// file, err := db.GridFS("fs").OpenId(objid)
// check(err)
// err = io.Copy(os.Stdout, file)
// check(err)
// err = file.Close()
// check(err)
//
func (gfs *GridFS) OpenId(id interface{}) (file *GridFile, err error) {
var doc gfsFile
err = gfs.Files.Find(bson.M{"_id": id}).One(&doc)
if err != nil {
return
}
file = gfs.newFile()
file.mode = gfsReading
file.doc = doc
return
}
// Open returns the most recently uploaded file with the provided
// name, for reading. If the file isn't found, err will be set
// to mgo.ErrNotFound.
//
// It's important to Close files whether they are being written to
// or read from, and to check the err result to ensure the operation
// completed successfully.
//
// The following example will print the first 8192 bytes from the file:
//
// file, err := db.GridFS("fs").Open("myfile.txt")
// check(err)
// b := make([]byte, 8192)
// n, err := file.Read(b)
// check(err)
// fmt.Println(string(b))
// check(err)
// err = file.Close()
// check(err)
// fmt.Printf("%d bytes read\n", n)
//
// The io.Reader interface is implemented by *GridFile and may be used to
// deal with it. As an example, the following snippet will dump the whole
// file into the standard output:
//
// file, err := db.GridFS("fs").Open("myfile.txt")
// check(err)
// err = io.Copy(os.Stdout, file)
// check(err)
// err = file.Close()
// check(err)
//
func (gfs *GridFS) Open(name string) (file *GridFile, err error) {
var doc gfsFile
err = gfs.Files.Find(bson.M{"filename": name}).Sort("-uploadDate").One(&doc)
if err != nil {
return
}
file = gfs.newFile()
file.mode = gfsReading
file.doc = doc
return
}
// OpenNext opens the next file from iter for reading, sets *file to it,
// and returns true on the success case. If no more documents are available
// on iter or an error occurred, *file is set to nil and the result is false.
// Errors will be available via iter.Err().
//
// The iter parameter must be an iterator on the GridFS files collection.
// Using the GridFS.Find method is an easy way to obtain such an iterator,
// but any iterator on the collection will work.
//
// If the provided *file is non-nil, OpenNext will close it before attempting
// to iterate to the next element. This means that in a loop one only
// has to worry about closing files when breaking out of the loop early
// (break, return, or panic).
//
// For example:
//
// gfs := db.GridFS("fs")
// query := gfs.Find(nil).Sort("filename")
// iter := query.Iter()
// var f *mgo.GridFile
// for gfs.OpenNext(iter, &f) {
// fmt.Printf("Filename: %s\n", f.Name())
// }
// if iter.Close() != nil {
// panic(iter.Close())
// }
//
func (gfs *GridFS) OpenNext(iter *Iter, file **GridFile) bool {
if *file != nil {
// Ignoring the error here shouldn't be a big deal
// as we're reading the file and the loop iteration
// for this file is finished.
_ = (*file).Close()
}
var doc gfsFile
if !iter.Next(&doc) {
*file = nil
return false
}
f := gfs.newFile()
f.mode = gfsReading
f.doc = doc
*file = f
return true
}
// Find runs query on GridFS's files collection and returns
// the resulting Query.
//
// This logic:
//
// gfs := db.GridFS("fs")
// iter := gfs.Find(nil).Iter()
//
// Is equivalent to:
//
// files := db.C("fs" + ".files")
// iter := files.Find(nil).Iter()
//
func (gfs *GridFS) Find(query interface{}) *Query {
return gfs.Files.Find(query)
}
// RemoveId deletes the file with the provided id from the GridFS.
func (gfs *GridFS) RemoveId(id interface{}) error {
err := gfs.Files.Remove(bson.M{"_id": id})
if err != nil {
return err
}
_, err = gfs.Chunks.RemoveAll(bson.D{{"files_id", id}})
return err
}
type gfsDocId struct {
Id interface{} "_id"
}
// Remove deletes all files with the provided name from the GridFS.
func (gfs *GridFS) Remove(name string) (err error) {
iter := gfs.Files.Find(bson.M{"filename": name}).Select(bson.M{"_id": 1}).Iter()
var doc gfsDocId
for iter.Next(&doc) {
if e := gfs.RemoveId(doc.Id); e != nil {
err = e
}
}
if err == nil {
err = iter.Close()
}
return err
}
func (file *GridFile) assertMode(mode gfsFileMode) {
switch file.mode {
case mode:
return
case gfsWriting:
panic("GridFile is open for writing")
case gfsReading:
panic("GridFile is open for reading")
case gfsClosed:
panic("GridFile is closed")
default:
panic("internal error: missing GridFile mode")
}
}
// SetChunkSize sets size of saved chunks. Once the file is written to, it
// will be split in blocks of that size and each block saved into an
// independent chunk document. The default chunk size is 255kb.
//
// It is a runtime error to call this function once the file has started
// being written to.
func (file *GridFile) SetChunkSize(bytes int) {
file.assertMode(gfsWriting)
debugf("GridFile %p: setting chunk size to %d", file, bytes)
file.m.Lock()
file.doc.ChunkSize = bytes
file.m.Unlock()
}
// Id returns the current file Id.
func (file *GridFile) Id() interface{} {
return file.doc.Id
}
// SetId changes the current file Id.
//
// It is a runtime error to call this function once the file has started
// being written to, or when the file is not open for writing.
func (file *GridFile) SetId(id interface{}) {
file.assertMode(gfsWriting)
file.m.Lock()
file.doc.Id = id
file.m.Unlock()
}
// Name returns the optional file name. An empty string will be returned
// in case it is unset.
func (file *GridFile) Name() string {
return file.doc.Filename
}
// SetName changes the optional file name. An empty string may be used to
// unset it.
//
// It is a runtime error to call this function when the file is not open
// for writing.
func (file *GridFile) SetName(name string) {
file.assertMode(gfsWriting)
file.m.Lock()
file.doc.Filename = name
file.m.Unlock()
}
// ContentType returns the optional file content type. An empty string will be
// returned in case it is unset.
func (file *GridFile) ContentType() string {
return file.doc.ContentType
}
// ContentType changes the optional file content type. An empty string may be
// used to unset it.
//
// It is a runtime error to call this function when the file is not open
// for writing.
func (file *GridFile) SetContentType(ctype string) {
file.assertMode(gfsWriting)
file.m.Lock()
file.doc.ContentType = ctype
file.m.Unlock()
}
// GetMeta unmarshals the optional "metadata" field associated with the
// file into the result parameter. The meaning of keys under that field
// is user-defined. For example:
//
// result := struct{ INode int }{}
// err = file.GetMeta(&result)
// if err != nil {
// panic(err.String())
// }
// fmt.Printf("inode: %d\n", result.INode)
//
func (file *GridFile) GetMeta(result interface{}) (err error) {
file.m.Lock()
if file.doc.Metadata != nil {
err = bson.Unmarshal(file.doc.Metadata.Data, result)
}
file.m.Unlock()
return
}
// SetMeta changes the optional "metadata" field associated with the
// file. The meaning of keys under that field is user-defined.
// For example:
//
// file.SetMeta(bson.M{"inode": inode})
//
// It is a runtime error to call this function when the file is not open
// for writing.
func (file *GridFile) SetMeta(metadata interface{}) {
file.assertMode(gfsWriting)
data, err := bson.Marshal(metadata)
file.m.Lock()
if err != nil && file.err == nil {
file.err = err
} else {
file.doc.Metadata = &bson.Raw{Data: data}
}
file.m.Unlock()
}
// Size returns the file size in bytes.
func (file *GridFile) Size() (bytes int64) {
file.m.Lock()
bytes = file.doc.Length
file.m.Unlock()
return
}
// MD5 returns the file MD5 as a hex-encoded string.
func (file *GridFile) MD5() (md5 string) {
return file.doc.MD5
}
// UploadDate returns the file upload time.
func (file *GridFile) UploadDate() time.Time {
return file.doc.UploadDate
}
// SetUploadDate changes the file upload time.
//
// It is a runtime error to call this function when the file is not open
// for writing.
func (file *GridFile) SetUploadDate(t time.Time) {
file.assertMode(gfsWriting)
file.m.Lock()
file.doc.UploadDate = t
file.m.Unlock()
}
// Close flushes any pending changes in case the file is being written
// to, waits for any background operations to finish, and closes the file.
//
// It's important to Close files whether they are being written to
// or read from, and to check the err result to ensure the operation
// completed successfully.
func (file *GridFile) Close() (err error) {
file.m.Lock()
defer file.m.Unlock()
if file.mode == gfsWriting {
if len(file.wbuf) > 0 && file.err == nil {
file.insertChunk(file.wbuf)
file.wbuf = file.wbuf[0:0]
}
file.completeWrite()
} else if file.mode == gfsReading && file.rcache != nil {
file.rcache.wait.Lock()
file.rcache = nil
}
file.mode = gfsClosed
debugf("GridFile %p: closed", file)
return file.err
}
func (file *GridFile) completeWrite() {
for file.wpending > 0 {
debugf("GridFile %p: waiting for %d pending chunks to complete file write", file, file.wpending)
file.c.Wait()
}
if file.err == nil {
hexsum := hex.EncodeToString(file.wsum.Sum(nil))
if file.doc.UploadDate.IsZero() {
file.doc.UploadDate = bson.Now()
}
file.doc.MD5 = hexsum
file.err = file.gfs.Files.Insert(file.doc)
}
if file.err != nil {
file.gfs.Chunks.RemoveAll(bson.D{{"files_id", file.doc.Id}})
}
if file.err == nil {
index := Index{
Key: []string{"files_id", "n"},
Unique: true,
}
file.err = file.gfs.Chunks.EnsureIndex(index)
}
}
// Abort cancels an in-progress write, preventing the file from being
// automically created and ensuring previously written chunks are
// removed when the file is closed.
//
// It is a runtime error to call Abort when the file was not opened
// for writing.
func (file *GridFile) Abort() {
if file.mode != gfsWriting {
panic("file.Abort must be called on file opened for writing")
}
file.err = errors.New("write aborted")
}
// Write writes the provided data to the file and returns the
// number of bytes written and an error in case something
// wrong happened.
//
// The file will internally cache the data so that all but the last
// chunk sent to the database have the size defined by SetChunkSize.
// This also means that errors may be deferred until a future call
// to Write or Close.
//
// The parameters and behavior of this function turn the file
// into an io.Writer.
func (file *GridFile) Write(data []byte) (n int, err error) {
file.assertMode(gfsWriting)
file.m.Lock()
debugf("GridFile %p: writing %d bytes", file, len(data))
defer file.m.Unlock()
if file.err != nil {
return 0, file.err
}
n = len(data)
file.doc.Length += int64(n)
chunkSize := file.doc.ChunkSize
if len(file.wbuf)+len(data) < chunkSize {
file.wbuf = append(file.wbuf, data...)
return
}
// First, flush file.wbuf complementing with data.
if len(file.wbuf) > 0 {
missing := chunkSize - len(file.wbuf)
if missing > len(data) {
missing = len(data)
}
file.wbuf = append(file.wbuf, data[:missing]...)
data = data[missing:]
file.insertChunk(file.wbuf)
file.wbuf = file.wbuf[0:0]
}
// Then, flush all chunks from data without copying.
for len(data) > chunkSize {
size := chunkSize
if size > len(data) {
size = len(data)
}
file.insertChunk(data[:size])
data = data[size:]
}
// And append the rest for a future call.
file.wbuf = append(file.wbuf, data...)
return n, file.err
}
func (file *GridFile) insertChunk(data []byte) {
n := file.chunk
file.chunk++
debugf("GridFile %p: adding to checksum: %q", file, string(data))
file.wsum.Write(data)
for file.doc.ChunkSize*file.wpending >= 1024*1024 {
// Hold on.. we got a MB pending.
file.c.Wait()
if file.err != nil {
return
}
}
file.wpending++
debugf("GridFile %p: inserting chunk %d with %d bytes", file, n, len(data))
// We may not own the memory of data, so rather than
// simply copying it, we'll marshal the document ahead of time.
data, err := bson.Marshal(gfsChunk{bson.NewObjectId(), file.doc.Id, n, data})
if err != nil {
file.err = err
return
}
go func() {
err := file.gfs.Chunks.Insert(bson.Raw{Data: data})
file.m.Lock()
file.wpending--
if err != nil && file.err == nil {
file.err = err
}
file.c.Broadcast()
file.m.Unlock()
}()
}
// Seek sets the offset for the next Read or Write on file to
// offset, interpreted according to whence: 0 means relative to
// the origin of the file, 1 means relative to the current offset,
// and 2 means relative to the end. It returns the new offset and
// an error, if any.
func (file *GridFile) Seek(offset int64, whence int) (pos int64, err error) {
file.m.Lock()
debugf("GridFile %p: seeking for %s (whence=%d)", file, offset, whence)
defer file.m.Unlock()
switch whence {
case os.SEEK_SET:
case os.SEEK_CUR:
offset += file.offset
case os.SEEK_END:
offset += file.doc.Length
default:
panic("unsupported whence value")
}
if offset > file.doc.Length {
return file.offset, errors.New("seek past end of file")
}
if offset == file.doc.Length {
// If we're seeking to the end of the file,
// no need to read anything. This enables
// a client to find the size of the file using only the
// io.ReadSeeker interface with low overhead.
file.offset = offset
return file.offset, nil
}
chunk := int(offset / int64(file.doc.ChunkSize))
if chunk+1 == file.chunk && offset >= file.offset {
file.rbuf = file.rbuf[int(offset-file.offset):]
file.offset = offset
return file.offset, nil
}
file.offset = offset
file.chunk = chunk
file.rbuf = nil
file.rbuf, err = file.getChunk()
if err == nil {
file.rbuf = file.rbuf[int(file.offset-int64(chunk)*int64(file.doc.ChunkSize)):]
}
return file.offset, err
}
// Read reads into b the next available data from the file and
// returns the number of bytes written and an error in case
// something wrong happened. At the end of the file, n will
// be zero and err will be set to io.EOF.
//
// The parameters and behavior of this function turn the file
// into an io.Reader.
func (file *GridFile) Read(b []byte) (n int, err error) {
file.assertMode(gfsReading)
file.m.Lock()
debugf("GridFile %p: reading at offset %d into buffer of length %d", file, file.offset, len(b))
defer file.m.Unlock()
if file.offset == file.doc.Length {
return 0, io.EOF
}
for err == nil {
i := copy(b, file.rbuf)
n += i
file.offset += int64(i)
file.rbuf = file.rbuf[i:]
if i == len(b) || file.offset == file.doc.Length {
break
}
b = b[i:]
file.rbuf, err = file.getChunk()
}
return n, err
}
func (file *GridFile) getChunk() (data []byte, err error) {
cache := file.rcache
file.rcache = nil
if cache != nil && cache.n == file.chunk {
debugf("GridFile %p: Getting chunk %d from cache", file, file.chunk)
cache.wait.Lock()
data, err = cache.data, cache.err
} else {
debugf("GridFile %p: Fetching chunk %d", file, file.chunk)
var doc gfsChunk
err = file.gfs.Chunks.Find(bson.D{{"files_id", file.doc.Id}, {"n", file.chunk}}).One(&doc)
data = doc.Data
}
file.chunk++
if int64(file.chunk)*int64(file.doc.ChunkSize) < file.doc.Length {
// Read the next one in background.
cache = &gfsCachedChunk{n: file.chunk}
cache.wait.Lock()
debugf("GridFile %p: Scheduling chunk %d for background caching", file, file.chunk)
// Clone the session to avoid having it closed in between.
chunks := file.gfs.Chunks
session := chunks.Database.Session.Clone()
go func(id interface{}, n int) {
defer session.Close()
chunks = chunks.With(session)
var doc gfsChunk
cache.err = chunks.Find(bson.D{{"files_id", id}, {"n", n}}).One(&doc)
cache.data = doc.Data
cache.wait.Unlock()
}(file.doc.Id, file.chunk)
file.rcache = cache
}
debugf("Returning err: %#v", err)
return
}