(* $Id: seqdb_fsys_ht.ml 16180 2008-01-18 20:57:28Z gerd $ *) open Seqdb_fsys_types open Seqdb_containers open Printf (* FIXME: * - DTOTSZ is not correct in so far it does not include the space for * kvseq management (keys, lengths etc.) *) module HI = Autoindex(Kvseq) type mtime = [ `Mutable of int64 | `Immutable of int64 ] type ht_file_descr = { mutable hf_open : bool; mutable hf_acount : int; mutable hf_ientry : Kvseq.entry option; (* the inode entry *) mutable hf_writable : bool; (* whether open for writing *) mutable hf_name : string; (* file name *) mutable hf_isize : int; (* inode size *) mutable hf_fileid : int64; (* the file ID *) mutable hf_lsize : int64; (* logical file size *) mutable hf_ftype : char; (* file type *) mutable hf_mtime : mtime; (* date of last modification *) mutable hf_data_pnt : (Kvseq.pointer * int64) list; (* pointers to data strings *) } let new_file_descr() = { hf_open = true; hf_acount = (-1); hf_ientry = None; hf_writable = false; hf_name = ""; hf_isize = 0; hf_fileid = 0L; hf_lsize = 0L; hf_ftype = 'x'; hf_mtime = `Mutable 0L; hf_data_pnt = []; } type params = { ht_inode_size : int; ht_table_size : int; ht_hash_algo : Seqdb_containers.Hash_algo.hash_algo; ht_index_type : [ `Plain | `Stored_hashes ]; ht_have_dups : bool; } type stats = { ht_table_total : int; ht_table_del : int; ht_data : int64; ht_dead_data : int64; ht_used_data : int64; } type fsys_config = { hindex_caching : bool; data_caching : bool; sync_every : int; big_readahead : bool; fully_buffered_index : bool; (* From plugin API: *) mutable_mtime : bool; inode_relocatable : bool; onsync : unit -> unit; read_only_mode : bool; } type iterator_type = [`Data|`Index] type dedup_mode = [ `Off | `Twopass | `Indexcheck ] class type ['file_descr] x_file_system_iteration = object inherit ['file_descr] file_system_iteration method current_entry : Kvseq.entry end class type ['file_descr] x_file_system_iterator = object method start : unit -> 'file_descr x_file_system_iteration end class type ht_file_system_t = object inherit [ht_file_descr] file_system method get_config : fsys_config method configure : fsys_config -> unit method check : unit -> bool method willneed : unit -> unit method params : params method stats : stats method is_open : bool method iterator : string option -> dedup_mode -> ht_file_descr file_system_iterator method idx_iterator : unit -> ht_file_descr file_system_iterator method reindex : bool -> bool -> bool -> iterator_type -> params -> bool method create_same : string -> bool -> ht_file_system_t method rename_files : string -> unit end type ht_base = { ht_dir : string; ht_sys : (string, ht_file_system_t) Hashtbl.t; mutable ht_glock_opt : Unix.file_descr option } exception Filesys_exists of string exception Filesys_not_found of string let sbsize = 4096 (* A 4K superblock should be enough for now *) let ( +@ ) = Int64.add let ( -@ ) = Int64.sub let ( *@ ) = Int64.mul let ( /@ ) = Int64.div let ( ~> ) = Int64.of_int (* ">" = "convert to the bigger type" *) let ( ~< ) = Int64.to_int (* "<" = "convert to the smaller type" *) let ( ~<< ) = fun n -> if n > Int64.of_int max_int then failwith "Integer too large"; Int64.to_int n let read_int64 s = Rtypes.int64_of_int8 (Rtypes.read_int8 s 0) let write_int64 n = Rtypes.int8_as_string (Rtypes.int8_of_int64 n) let i64array_of_string s = let l = String.length s in assert(l mod 8 = 0); let a = Array.make (l/8) 0L in for k = 0 to l/8 - 1 do a.(k) <- read_int64 (String.sub s (k*8) 8) done; a let blit_i64array_to_string a s = for k = 0 to Array.length a - 1 do String.blit (write_int64 a.(k)) 0 s (k*8) 8 done let compute_cksum a n = (* Compute the checksum of the first n array elements (except first el) *) let s = ref 0L in for k = 1 to n-1 do s := (!s *@ 7L) +@ a.(k) done; !s let key_regexp = Pcre.regexp "^[\x20-\xff]*/[ID][0-9]+$" let rec map_last f l = (* map the last element of l by calling f *) match l with | [ last ] -> [ f last ] | x :: l' -> x :: map_last f l' | [] -> [] let rec last l = match l with | [ l ] -> l | x :: l' -> last l' | [] -> raise Not_found let check_data_key e = let key = Kvseq.get_key e in let l = String.length key in let state = ref `Digit in let k = ref (l-1) in while !k >= 0 && !state <> `Ok && !state <> `Error do let c = key.[ !k ] in decr k; match !state with | `Digit -> if c = 'D' then state := `D else if c < '0' || c > '9' then state := `Error | `D -> if c = '/' then state := `Ok else state := `Error | `Ok | `Error -> assert false done; if !state <> `Ok then failwith "filesys: bad data key found" let lk_debug = false (* Whether to output debug messages on stderr about locking *) let lk_log before file cmd = if lk_debug then ( fprintf stderr "LOCKING: %s process %d file %s cmd %s\n%!" (if before then "getting lock" else "got lock") (Unix.getpid()) file (match cmd with | Unix.F_ULOCK -> "Unlocking" | Unix.F_LOCK -> "Exclusive lock" | Unix.F_RLOCK -> "Shared lock" | _ -> "???" ) ) let default_syncdelta = 600 let cellsz_of_params params = match params.ht_index_type with | `Plain -> 1 | `Stored_hashes -> 2 let configure_hi fsys_config hi = HI.configure ~flush_every:50 ~auto_fadvise:(not fsys_config.hindex_caching) (* ~random_fadvise:true *) hi let configure_kv fsys_config kv hi = let auto_sync = if fsys_config.sync_every >= 0 then Some fsys_config.sync_every else None in Kvseq.configure ~flush_every:50 ~auto_sync ~auto_fadvise:(not fsys_config.data_caching) ~onsync:(fun () -> HI.sync hi; fsys_config.onsync()) kv let filenames dir fsysname = let fn_lock = Filename.concat dir fsysname ^ ".lock" in let fn_kv = Filename.concat dir fsysname ^ ".data" in let fn_hi = Filename.concat dir fsysname ^ ".idx" in (fn_lock, fn_kv, fn_hi) let create_filesys_files dir fsysname suffix params filemodes = let (fn_lock, fn_kv, fn_hi) = filenames dir fsysname in let fd_lock = Unix.openfile fn_lock ([Unix.O_RDWR; Unix.O_CREAT] @ filemodes) 0o666 in try lk_log true fn_lock Unix.F_LOCK; Netsys.restart (Unix.lockf fd_lock Unix.F_LOCK) 0; lk_log false fn_lock Unix.F_LOCK; let fd_kv = Unix.openfile fn_kv ([Unix.O_RDWR; Unix.O_CREAT] @ filemodes) 0o666 in try let fd_hi = Unix.openfile fn_hi ([Unix.O_RDWR; Unix.O_CREAT] @ filemodes) 0o666 in try let io_kv = ( object method file_descr = fd_kv method dispose_hint() = () end ) in let io_hi = ( object method file_descr = fd_hi method dispose_hint() = () end ) in let kv = Kvseq.create ~sbsize ~keyrepr:`Int8 ~purpose:"FSYSDATA" ~buffer_size:65536 ~suggested_hash_algo:params.ht_hash_algo io_kv in Superblock.set_variable (Kvseq.superblock kv) "ISZ" (Int64.of_int params.ht_inode_size); Superblock.set_variable (Kvseq.superblock kv) "ITOTSZ" 0L; Superblock.set_variable (Kvseq.superblock kv) "DTOTSZ" 0L; Superblock.set_variable (Kvseq.superblock kv) "HAVEDUPS" (if params.ht_have_dups then 1L else 0L); Kvseq.mark_superblock_as_dirty kv; let hi = HI.create ~sbsize ~htsize:(Int64.of_int params.ht_table_size) ~cellsz:(cellsz_of_params params) ~buffer_size:65536 ~hash_algo:params.ht_hash_algo ~purpose:"FSYSIDX" kv io_hi in let fsys_config = { hindex_caching = true; data_caching = true; sync_every = default_syncdelta; big_readahead = false; fully_buffered_index = false; mutable_mtime = true; inode_relocatable = true; onsync = (fun () -> ()); read_only_mode = false; } in configure_kv fsys_config kv hi; configure_hi fsys_config hi; (kv, hi, io_kv, io_hi, fd_lock, fsys_config) with | error -> Unix.close fd_hi; raise error with | error -> Unix.close fd_kv; raise error with | error -> Unix.close fd_lock; raise error ;; let rec handle_deadlock f arg = try f arg with | Unix.Unix_error(Unix.EDEADLK,_,_) -> Netsys.restart Unix.sleep 1; handle_deadlock f arg let mtime_flag flags = (* TODO: dup in Seqdb_fsys_ao *) try match List.find (function `Mtime _ -> true | _ -> false) flags with | `Mtime t -> Some t | _ -> assert false with | Not_found -> None class ht_file_system dir fsysname kv_opt_init hi_opt_init lock_fd_init lock_wr_init fsys_config_init : ht_file_system_t = object(self) val mutable kv_opt = kv_opt_init val mutable hi_opt = hi_opt_init val mutable acount = 0 val mutable lock_fd = lock_fd_init val mutable lock_wr = lock_wr_init (* Whether it is a write lock *) val mutable fsys_config = fsys_config_init method private activate ?(conservative=false) ?(rollback_flag=ref false) need_wr_lock = (* open kv/hi *) assert(kv_opt = None); assert(hi_opt = None); assert(lock_fd = None); if need_wr_lock && fsys_config.read_only_mode then failwith "Filesys: read-only mode"; let (fn_lock, fn_kv, fn_hi) = filenames dir fsysname in (* First try to get the lock. *) let fd_lock = Unix.openfile fn_lock [Unix.O_RDWR] 0 in ( try lock_fd <- Some fd_lock; let lock_cmd = if need_wr_lock then Unix.F_LOCK else Unix.F_RLOCK in lk_log true fn_lock lock_cmd; Netsys.restart (Unix.lockf fd_lock lock_cmd) 0; lk_log false fn_lock lock_cmd; lock_wr <- need_wr_lock; let fd_kv = Unix.openfile fn_kv [Unix.O_RDWR] 0 in try let fd_hi = Unix.openfile fn_hi [Unix.O_RDWR] 0 in try let io_kv = ( object method file_descr = fd_kv method dispose_hint() = () end ) in let io_hi = ( object method file_descr = fd_hi method dispose_hint() = () end ) in let kv = Kvseq.access ~buffer_size:65536 ~conservative io_kv in rollback_flag := Kvseq.rollback_flag kv; let hi = HI.access ~fully_buffered_index:fsys_config.fully_buffered_index ~buffer_size:65536 kv io_hi in configure_kv fsys_config kv hi; configure_hi fsys_config hi; kv_opt <- Some (kv, io_kv); hi_opt <- Some (hi, io_hi); acount <- acount + 1; with | error -> hi_opt <- None; Unix.close fd_hi; raise error with | error -> kv_opt <- None; Unix.close fd_kv; raise error with | error -> lock_fd <- None; Unix.close fd_lock; raise error ); if fsys_config.read_only_mode then ( (* Give up the lock after opening the files (which implies that the superblock is read) *) Unix.close fd_lock; lock_fd <- None; ) method get_config = fsys_config method configure new_config = fsys_config <- new_config; match kv_opt, hi_opt with | Some (kv, io_kv), Some (hi, io_hi) -> configure_kv fsys_config kv hi; configure_hi fsys_config hi; | None, None -> () | _ -> assert false method is_open = kv_opt <> None method check () = self # dispose(); let rollback_flag = ref false in self # activate ~conservative:true ~rollback_flag true; !rollback_flag (* Note: it is essential that the rollback is not immediately done. This way, Seqdb_fsys_ao can also check the size of its file. Otherwise, the sync would make this impossible! *) method willneed() = let (_, _) = self # kv_hi false in match kv_opt, hi_opt with | Some (_, _), Some (_, io_hi) -> let fd = io_hi # file_descr in Linux.fadvise fd 0L 0L Linux.FADV_WILLNEED; self # dispose(); | _ -> assert false method private flush() = match kv_opt, hi_opt with | Some (kv, io_kv), Some (hi, io_hi) -> Kvseq.flush kv; HI.flush hi; | None, None -> () | _ -> assert false method exclusive_access() = let (_, _) = self # kv_hi true in () method dispose() = ( match kv_opt, hi_opt with | Some (kv, io_kv), Some (hi, io_hi) -> Kvseq.flush kv; HI.flush hi; if not fsys_config.data_caching then Kvseq.fadvise_wontneed kv; if not fsys_config.hindex_caching then HI.fadvise_wontneed hi; Unix.close io_kv#file_descr; Unix.close io_hi#file_descr; kv_opt <- None; hi_opt <- None; | None, None -> () | _ -> assert false ); ( match lock_fd with | Some lfd -> let fn_lock = Filename.concat dir fsysname ^ ".lock" in lk_log true fn_lock Unix.F_ULOCK; Unix.close lfd; (* implicitly releases any lock *) lock_fd <- None | None -> () ) method private kv_hi ?(retry_on_deadlock = false) need_wr_lock = if kv_opt = None && hi_opt = None then self#activate need_wr_lock else ( if need_wr_lock && not lock_wr then ( (* Upgrade the lock from shared to exclusive: *) let fd = match lock_fd with | Some fd -> fd | None -> (* This can only happen in read-only mode. Intentionally unsupported. *) assert(fsys_config.read_only_mode); failwith "Filesys: read-only mode"; in let fn_lock = Filename.concat dir fsysname ^ ".lock" in lk_log true fn_lock Unix.F_LOCK; let f = if retry_on_deadlock then handle_deadlock else (fun g arg -> g arg) in f (Netsys.restart (Unix.lockf fd Unix.F_LOCK)) 0; lk_log false fn_lock Unix.F_LOCK; lock_wr <- true ) ); match kv_opt, hi_opt with | Some (kv,_), Some (hi,_) -> (kv, hi) | _ -> assert false method superblock_variable name = let (kv, _) = self # kv_hi false in let sb = Kvseq.superblock kv in Superblock.variable sb name method set_superblock_variable name v = let (kv, _) = self # kv_hi true in let sb = Kvseq.superblock kv in Superblock.set_variable sb name v; Kvseq.mark_superblock_as_dirty kv method checkpoint ?(soft=false) () = let (kv, hi) = self # kv_hi true in (* need excl. lock because the sb is written *) Kvseq.flush kv; HI.flush hi; if soft then ( let now = Int64.of_float(Unix.gettimeofday()) in let sb = Kvseq.superblock kv in let synctime = try Superblock.variable sb Sb_consts.synctime_name with Not_found -> 0L in let need_sync = fsys_config.sync_every >= 0 && now >= synctime +@ (~> (fsys_config.sync_every)) in if need_sync then Kvseq.sync kv ) else Kvseq.sync kv (* implies a sync of hi *) method private update_dtotsz delta = let (kv, _) = self # kv_hi true in let old_dtotsz = try Superblock.variable (Kvseq.superblock kv) "DTOTSZ" with Not_found -> 0L in Superblock.set_variable (Kvseq.superblock kv) "DTOTSZ" (old_dtotsz +@ delta) method private update_itotsz delta = let (kv, _) = self # kv_hi true in let old_itotsz = try Superblock.variable (Kvseq.superblock kv) "ITOTSZ" with Not_found -> 0L in Superblock.set_variable (Kvseq.superblock kv) "ITOTSZ" (old_itotsz +@ delta) method private read_inode hf ientry writable = (* may raise Not_found! *) let (kv, _) = self # kv_hi writable in if Kvseq.get_delflag ientry then raise Not_found; let ikey = Kvseq.get_key ientry in if String.length ikey < 3 then failwith "read_inode: bad key [test 0]"; let ikeyl = String.length ikey in if String.sub ikey (ikeyl-3) 3 <> "/I0" then failwith "read_inode: bad key [test 1]"; let istr = Kvseq.get_value ientry in if String.length istr mod 8 <> 0 then failwith "read_inode: bad inode length [test 2]"; let iarray = i64array_of_string istr in if Array.length iarray < 7 then failwith "read_inode: bad inode length [test 3]"; let cksum = iarray.(0) in if iarray.(1) <> 0L then failwith "read_inode: bad inode [test 4]"; let fileid = iarray.(2) in let lsize = iarray.(3) in if lsize < 0L then failwith "read_inode: bad inode [test 5]"; let ftype = iarray.(4) in if ftype < 0L || ftype > 255L then failwith "read_inode: bad inode [test 6]"; let mtime = iarray.(5) in let dcount = iarray.(6) in if (dcount *@ 2L) +@ 7L > (~> (Array.length iarray)) then failwith "read_inode: bad inode [test 7]"; let cksum' = compute_cksum iarray (~<< ((dcount *@ 2L) +@ 7L)) in if cksum <> cksum' then failwith "read_inode: bad inode [test 8]"; let data_pnt = ref [] in let total_size = ref 0L in for k = 0 to (~< dcount) - 1 do let pnt = Kvseq.pointer_of_string (write_int64 iarray.(2*k+7)) in let size = iarray.(2*k+8) in if not (Kvseq.validate_pointer kv pnt) then failwith "read_inode: bad inode [test 9]"; if size < 0L then failwith "read_inode: bad inode [test 10]"; data_pnt := (pnt, size) :: !data_pnt; let new_total_size = !total_size +@ size in if new_total_size < !total_size then failwith "read_inode: bad inode [test 11]"; (* int64 overflow *) total_size := new_total_size done; if lsize > !total_size then failwith "read_inode: bad inode [test 12]"; hf.hf_acount <- acount; hf.hf_ientry <- Some ientry; hf.hf_name <- String.sub ikey 0 (ikeyl-3); hf.hf_isize <- String.length istr; hf.hf_fileid <- fileid; hf.hf_lsize <- lsize; hf.hf_ftype <- Char.chr (Int64.to_int ftype); hf.hf_mtime <- ( if fsys_config.mutable_mtime then `Mutable mtime else `Immutable mtime ); hf.hf_data_pnt <- List.rev !data_pnt; hf.hf_writable <- writable; method private write_inode ?(enforce_new_entry=false) ?(unindexed=false) hf = let (kv, hi) = self # kv_hi true in assert(hf.hf_acount = acount); let dcount = List.length hf.hf_data_pnt in let n = 7 + 2*dcount in let s = ref hf.hf_isize in while 8*n > !s do s := 2 * !s done; let istr = String.make !s '\000' in let iarray = Array.make n 0L in iarray.(1) <- 0L; iarray.(2) <- hf.hf_fileid; iarray.(3) <- hf.hf_lsize; iarray.(4) <- Int64.of_int (Char.code hf.hf_ftype); iarray.(5) <- ( match hf.hf_mtime with | (`Mutable t | `Immutable t) -> t (* I LOVE or patterns *) ); iarray.(6) <- Int64.of_int dcount; let k = ref 7 in List.iter (fun (pnt,size) -> iarray.( !k ) <- Kvseq.int64_of_pointer pnt; iarray.( !k + 1) <- size; k := !k + 2 ) hf.hf_data_pnt; let cksum = compute_cksum iarray !k in iarray.(0) <- cksum; blit_i64array_to_string iarray istr; let data = { Kvseq.delflag = false; key = hf.hf_name ^ "/I0"; value = istr } in if not enforce_new_entry && !s = hf.hf_isize && hf.hf_ientry <> None then ( match hf.hf_ientry with | None -> assert false | Some e -> Kvseq.replace e data; ) else ( (* Inode is too small: allocate a new one *) if hf.hf_ientry <> None && not fsys_config.inode_relocatable then failwith "inodes not relocatable"; let d = ref 0L in ( match hf.hf_ientry with | None -> () | Some e -> d := !d -@ Kvseq.get_total_length e; Kvseq.delete e; ); let e = Kvseq.add kv data in d := !d +@ Kvseq.get_total_length e; let fileid = if hf.hf_fileid = 0L then Kvseq.int64_of_pointer(Kvseq.get_pointer e) else hf.hf_fileid in if fileid <> hf.hf_fileid then ( iarray.(2) <- fileid; let cksum = compute_cksum iarray !k in let s = write_int64 fileid in Kvseq.blit_from_string s 0 e 16L 8; let s = write_int64 cksum in Kvseq.blit_from_string s 0 e 0L 8; hf.hf_fileid <- fileid; ); self # update_itotsz !d; Kvseq.mark_superblock_as_dirty kv; hf.hf_ientry <- Some e; hf.hf_isize <- !s; (* We also have to change the pointer to the inode in the hindex: *) if not unindexed then ( try let e_idx = HI.lookup hi (hf.hf_name ^ "/I0") in HI.replace e_idx e with | Not_found -> (* Adding it for the first time (called from open_file_wr): *) ignore(HI.add hi e); self # check_hi_size() ); ) method private delete_inode hf = match hf.hf_ientry with | None -> () | Some e -> let s = Kvseq.get_total_length e in Kvseq.delete e; self # update_itotsz (0L -@ s) method private reread_inode hf = (* Similar to open_file_rd *) let wr = hf.hf_writable in let name = hf.hf_name in let (_, hi) = self # kv_hi false in let hi_entry = try HI.lookup hi (name ^ "/I0") with Not_found -> raise(File_not_found name) in try self # read_inode hf (HI.get_contents hi_entry) wr with | Not_found -> raise(File_not_found name) method private check_hi_size() = (* Log an alert if the hindex is filled more than 50 % *) let (_, hi) = self # kv_hi false in try let n = HI.num_entries hi in let t = HI.htsize hi in let m = t /@ 2L in if n > m then ( Log.logf `Alert "Filesys idx file is quite filled - consider compacting it: %s/%s.idx" dir fsysname ) with | _ -> () method guess_filepos name = let (_, hi) = self # kv_hi false in try let p = HI.pointer_hint hi (name ^ "/I0") in Some(Kvseq.int64_of_pointer p) with Not_found -> None method cmp_filepos name1 name2 = let (_, hi) = self # kv_hi false in let hi1_pos_opt = try Some(HI.pointer_hint hi (name1 ^ "/I0")) with Not_found -> None in let hi2_pos_opt = try Some(HI.pointer_hint hi (name2 ^ "/I0")) with Not_found -> None in match (hi1_pos_opt, hi2_pos_opt) with | None,None -> 0 | None, Some _ -> (-1) | Some _, None -> 1 | Some hi1_pos, Some hi2_pos -> let hi1_int = Kvseq.int64_of_pointer hi1_pos in let hi2_int = Kvseq.int64_of_pointer hi2_pos in Int64.compare hi1_int hi2_int method open_file_rd name ftl = let (kv, hi) = self # kv_hi false in let hi_entry = try HI.lookup hi (name ^ "/I0") with Not_found -> raise(File_not_found name) in let hf = new_file_descr() in try let kv_entry = HI.get_contents hi_entry in if fsys_config.big_readahead then ( let fpos = Kvseq.int64_of_pointer(Kvseq.get_pointer kv_entry) in Kvseq.fadvise_willneed kv fpos 1048576L (* 1MB readahead *) ); self # read_inode hf kv_entry false; if ftl <> [] && not(List.mem hf.hf_ftype ftl) then raise(File_type_mismatch name); hf with | Not_found -> raise(File_not_found name) method open_file_wr name ftl creat_opt = self # open_file_wr_ext [] name ftl creat_opt method open_file_wr_ext flags name ftl creat_opt = let mflag = mtime_flag flags in let excl = List.mem `Excl flags in let (kv, hi) = self # kv_hi true in try let hi_entry = try HI.lookup hi (name ^ "/I0") with Not_found -> raise(File_not_found name) in let hf = new_file_descr() in try self # read_inode hf (HI.get_contents hi_entry) true; if excl then raise(File_exists name); if ftl <> [] && not(List.mem hf.hf_ftype ftl) then raise(File_type_mismatch name); (hf, false) with | Not_found -> raise(File_not_found name) with | File_not_found _ when creat_opt <> None -> ( match creat_opt with | None -> assert false | Some ft -> let isz = try Superblock.variable (Kvseq.superblock kv) "ISZ" with Not_found -> failwith "open_file_wr: ISZ missing" in let hf = new_file_descr() in hf.hf_acount <- acount; hf.hf_ientry <- None; hf.hf_writable <- true; hf.hf_name <- name; hf.hf_isize <- ~<< isz; (* hf.hf_fileid is set by write_inode if it is 0L *) hf.hf_lsize <- 0L; hf.hf_ftype <- ft; hf.hf_mtime <- ( match mflag with | Some t -> `Immutable t | None -> let t = Int64.of_float(Unix.time()) in if fsys_config.mutable_mtime then `Mutable t else `Immutable t ); hf.hf_data_pnt <- []; self # write_inode hf; (hf, true) ) method delete_file hf = if not hf.hf_open then failwith "filesys: descriptor is closed"; if not hf.hf_writable then failwith "filesys: descriptor is read-only"; let (_, hi) = self # kv_hi true in if hf.hf_acount <> acount then self # reread_inode hf; let dsize = List.fold_left (fun acc (_,size) -> size +@ acc) 0L hf.hf_data_pnt in (* We only mark the inode as deleted, but not the data strings. *) self # delete_inode hf; ( try let e = HI.lookup hi (hf.hf_name ^ "/I0") in HI.delete e with | Not_found -> () ); self # update_dtotsz (0L -@ dsize); hf.hf_open <- false method delete_name name = let (_, hi) = self # kv_hi true in let hi_entry = try HI.lookup hi (name ^ "/I0") with Not_found -> raise(File_not_found name) in let hf = new_file_descr() in ( try self # read_inode hf (HI.get_contents hi_entry) true; with | Not_found -> raise(File_not_found name) ); let dsize = List.fold_left (fun acc (_,size) -> size +@ acc) 0L hf.hf_data_pnt in (* We only mark the inode as deleted, but not the data strings. *) self # delete_inode hf; ( try let e = HI.lookup hi (hf.hf_name ^ "/I0") in HI.delete e with | Not_found -> () ); self # update_dtotsz (0L -@ dsize); hf.hf_open <- false method delete_name_from_index name = let (_, hi) = self # kv_hi true in if not self#have_dups (* Requires at least a shared lock! *) then invalid_arg "Seqdb_fsys_ht#delete_name_from_index: Only allowed if HAVE_DUPS is set"; ( try let e = HI.lookup hi (name ^ "/I0") in HI.delete e with | Not_found -> () ) method rename_file hf new_name = if not hf.hf_open then failwith "filesys: descriptor is closed"; if not hf.hf_writable then failwith "filesys: descriptor is read-only"; let (kv, hi) = self # kv_hi true in if hf.hf_acount <> acount then self # reread_inode hf; ( try let e = HI.lookup hi (new_name ^ "/I0") in let e' = HI.get_contents e in if not (Kvseq.get_delflag e') then raise (File_exists new_name) with | Not_found -> () (* From HI.lookup *) ); (* Write a new inode (with renamed key): *) let old_name = hf.hf_name in hf.hf_name <- new_name; ( match Kvseq.keyrepr kv with | `Lim8 _ -> (* We can directly rename the entry: *) let e = HI.lookup hi (old_name ^ "/I0") in let e' = HI.get_contents e in ( try HI.delete e with Not_found -> ()); Kvseq.rename e' (new_name ^ "/I0"); ignore(HI.add hi e'); self # check_hi_size(); hf.hf_ientry <- None; self # reread_inode hf | _ -> (* The following call deletes the old inode (from kvseq but not from the index), allocates a new one, and creates an index entry for it: *) self # write_inode ~enforce_new_entry:true hf; (* Update the index: *) ( match hf.hf_ientry with | None -> assert false | Some e_inode -> ( try (* Note that the following works even if the corresponding kvseq entry is already marked as deleted: *) let e = HI.lookup hi (old_name ^ "/I0") in HI.delete e; with | Not_found -> () ) ) ) method read_file hf s s_pos f_pos len = if not hf.hf_open then failwith "filesys: descriptor is closed"; if f_pos < 0L || len < 0 || s_pos < 0 || s_pos + len > String.length s then invalid_arg "read_file"; let (kv, _) = self # kv_hi false in if hf.hf_acount <> acount then self # reread_inode hf; if f_pos > hf.hf_lsize then failwith "read_file: file position out of range"; (* First advise, then read *) let sp = ref s_pos in let fp = ref f_pos in let tp = ref 0L in let n = ref len in List.iter (fun (pnt,size) -> let size' = max 0L (min size (hf.hf_lsize -@ !tp)) in (* Read from this data string? *) if !n > 0 && !fp >= !tp && !fp < !tp +@ size' then ( (* yes: advise to read the value in *) (* let pnt64 = Kvseq.int64_of_pointer pnt in *) let offset = !fp -@ !tp in let m = ~< (min (~> !n) (size' -@ offset)) in sp := !sp + m; fp := !fp +@ (~> m); n := !n - m ); tp := !tp +@ size ) hf.hf_data_pnt; (* Now actually read *) sp := s_pos; fp := f_pos; tp := 0L; n := len; List.iter (fun (pnt,size) -> let size' = max 0L (min size (hf.hf_lsize -@ !tp)) in (* Read from this data string? *) if !n > 0 && !fp >= !tp && !fp < !tp +@ size' then ( (* yes: read the value in *) let e = Kvseq.lookup kv pnt in check_data_key e; let v_len = Kvseq.get_value_length e in (* (* DEBUG *) if size <> v_len then ( let p = Kvseq.get_pointer e in eprintf "size <> v_len\n"; eprintf "size = %Ld, v_len = %Ld\n" size v_len; eprintf "p = %Ld\n" (Kvseq.int64_of_pointer p); List.iter (fun (pnt,size) -> eprintf "pnt = %Ld, size = %Ld\n" (Kvseq.int64_of_pointer pnt) size ) hf.hf_data_pnt ); *) if size <> v_len then failwith "Mismatching data region size"; let offset = !fp -@ !tp in let m = ~< (min (~> !n) (size' -@ offset)) in Kvseq.blit_to_string e offset s !sp m; sp := !sp + m; fp := !fp +@ (~> m); n := !n - m ); tp := !tp +@ size ) hf.hf_data_pnt; len - !n method write_file hf s s_pos f_pos len = if not hf.hf_open then failwith "filesys: descriptor is closed"; if not hf.hf_writable then failwith "filesys: descriptor is read-only"; if f_pos < 0L || len < 0 || s_pos < 0 || s_pos + len > String.length s then invalid_arg "write_file"; let (kv, _) = self # kv_hi true in if hf.hf_acount <> acount then self # reread_inode hf; if f_pos > hf.hf_lsize then failwith "write_file: file position out of range"; (* First fadvise, then write *) let sp = ref s_pos in let fp = ref f_pos in let tp = ref 0L in let n = ref len in List.iter (fun (pnt,size) -> (* Write to this data string? *) if !n > 0 && !fp >= !tp && !fp < !tp +@ size then ( (* yes, so give an advice: *) (* let pnt64 = Kvseq.int64_of_pointer pnt in *) let offset = (!fp -@ !tp) in let m = ~< (min (~> !n) (size -@ offset)) in sp := !sp + m; fp := !fp +@ (~> m); n := !n - m ); tp := !tp +@ size ) hf.hf_data_pnt; (* Now actually write *) sp := s_pos; fp := f_pos; tp := 0L; n := len; let to_repl = ref None in List.iter (fun (pnt,size) -> (* let size' = max 0L (min size (hf.hf_lsize -@ !tp)) in *) ( match !to_repl with | None -> () | Some (e, cur_e_pos, cur_s_pos, cur_len) -> Kvseq.blit_from_string s cur_s_pos e cur_e_pos cur_len; to_repl := None; ); (* Write to this data string? *) if !n > 0 && !fp >= !tp && !fp < !tp +@ size then ( (* yes: *) let e = Kvseq.lookup kv pnt in check_data_key e; let v_len = Kvseq.get_value_length e in assert(size = v_len); let offset = (!fp -@ !tp) in let m = ~< (min (~> !n) (size -@ offset)) in to_repl := Some(e, offset, !sp, m); sp := !sp + m; fp := !fp +@ (~> m); n := !n - m ); tp := !tp +@ size ) hf.hf_data_pnt; let d = !n in (* Maybe we have to extend the file. First try to extend the last block *) let is_last e = try ignore(Kvseq.next_entry e); false with End_of_file -> true in ( match !to_repl with | None -> if !n > 0 && hf.hf_data_pnt <> [] then ( let (pnt,size) = last hf.hf_data_pnt in let e = Kvseq.lookup kv pnt in if is_last e then ( (* We are appending to the file *) n := 0; fp := !fp +@ (~> len); Kvseq.blit_from_string s s_pos e size len; hf.hf_data_pnt <- map_last (fun (pnt, _) -> (pnt, Kvseq.get_value_length e)) hf.hf_data_pnt ) ) | Some (e, cur_e_pos, cur_s_pos, cur_len) -> let eff_len = if !n > 0 && is_last e then ( let m = cur_len + !n in n := 0; fp := !fp +@ (~> (m-cur_len)); m ) else cur_len in Kvseq.blit_from_string s cur_s_pos e cur_e_pos eff_len; if eff_len <> cur_len then ( (* update hf_data_pnt: *) hf.hf_data_pnt <- map_last (fun (pnt, _) -> (pnt, Kvseq.get_value_length e)) hf.hf_data_pnt ) ); (* Still to extend? Create a new block *) if !n > 0 then ( let v = String.create !n in String.blit s !sp v 0 !n; fp := !fp +@ (~> !n); let hex_fileid = sprintf "%016Lx" hf.hf_fileid in let data = { Kvseq.delflag = false; key = hex_fileid ^ "/D" ^ string_of_int (List.length hf.hf_data_pnt); value = v } in let e = Kvseq.add kv data in hf.hf_data_pnt <- hf.hf_data_pnt @ [ Kvseq.get_pointer e, ~> !n ]; ); (* Write new inode *) if !fp > hf.hf_lsize then hf.hf_lsize <- !fp; ( match hf.hf_mtime with | `Mutable _ -> hf.hf_mtime <- `Mutable(Int64.of_float (Unix.time())); | `Immutable _ -> () ); self # write_inode hf; self # update_dtotsz (~> d) method reserve hf n = () (* TODO *) method filepos hf = if not hf.hf_open then failwith "filesys: descriptor is closed"; let (_, _) = self # kv_hi false in if hf.hf_acount <> acount then self # reread_inode hf; match hf.hf_ientry with | None -> assert false | Some e -> Kvseq.int64_of_pointer (Kvseq.get_pointer e) method is_last_file hf = if not hf.hf_open then failwith "filesys: descriptor is closed"; let (kv, _) = self # kv_hi false in if hf.hf_acount <> acount then self # reread_inode hf; (* The only way to test this is to check whether we find another inode past this position (for Seqdb_fsys_ao this test is quite cheap because it usually evaluates to [true]) *) match hf.hf_ientry with | None -> assert false | Some e -> ( try let e' = self # find_next_inode kv e in (* DEBUG *) let p' = Kvseq.get_pointer e' in eprintf "is_last_file: p' = %Ld\n%!" (Kvseq.int64_of_pointer p'); false with | End_of_file -> true ) method file_size hf = if not hf.hf_open then failwith "filesys: descriptor is closed"; let (_, _) = self # kv_hi false in if hf.hf_acount <> acount then self # reread_inode hf; hf.hf_lsize method file_type hf = if not hf.hf_open then failwith "filesys: descriptor is closed"; let (_, _) = self # kv_hi false in if hf.hf_acount <> acount then self # reread_inode hf; hf.hf_ftype method file_mtime hf = if not hf.hf_open then failwith "filesys: descriptor is closed"; let (_, _) = self # kv_hi false in if hf.hf_acount <> acount then self # reread_inode hf; ( match hf.hf_mtime with | (`Mutable t | `Immutable t) -> t ) method set_file_type hf ft = if not hf.hf_open then failwith "filesys: descriptor is closed"; if not hf.hf_writable then failwith "filesys: descriptor is read-only"; let (_, _) = self # kv_hi true in if hf.hf_acount <> acount then self # reread_inode hf; hf.hf_ftype <- ft; self # write_inode hf method set_file_mtime hf mt = if not hf.hf_open then failwith "filesys: descriptor is closed"; if not hf.hf_writable then failwith "filesys: descriptor is read-only"; if not(fsys_config.mutable_mtime) then failwith "immutable mtime"; let (_, _) = self # kv_hi true in if hf.hf_acount <> acount then self # reread_inode hf; (* Even if hf_mtime is immutable set it. We have already caught the case that modifying the mtime is really forbidden. *) hf.hf_mtime <- `Mutable mt; self # write_inode hf method truncate hf size = let (_, _) = self # kv_hi true in if hf.hf_acount <> acount then self # reread_inode hf; if not hf.hf_open then failwith "filesys: descriptor is closed"; if size > hf.hf_lsize then failwith "truncate: cannot extend file"; hf.hf_lsize <- size; self # write_inode hf method close_file hf = hf.hf_open <- false method private data_iterator start_opt : ht_file_descr x_file_system_iterator = ( object(iself) method start() = let (kv, _) = self # kv_hi false in Kvseq.fadvise_iterating kv; let cur_e = ref None in let get_e() = match !cur_e with | None -> self#find_first_inode kv start_opt | Some e -> e in ( object method current_name = let key = Kvseq.get_key (get_e()) in let l = String.length key in String.sub key 0 (l-3) method current_file = let hf = new_file_descr() in try self # read_inode hf (get_e()) false; hf with Not_found -> assert false method current_entry = (get_e()) method next () = cur_e := Some(self # find_next_inode kv (get_e())) method next_recoverable() = let e' = match !cur_e with | None -> self # find_first_inode ~recover:true kv start_opt | Some e -> self # find_next_inode ~recover:true kv e in cur_e := Some e'; end ) end ) method private twopass_iterator start_opt : ht_file_descr x_file_system_iterator = (* This version takes have_dups into account *) ( object(iself) method start() = (* The two passes: First collect all names, and remember their positions. Second pass is the real one, and dups are filtered out. *) let pass1 = (self#data_iterator start_opt) # start() in (* or End_of_file *) let names = Hashtbl.create 10000 in ( try while true do ( try let n = pass1#current_name in let p = Kvseq.get_pointer (pass1#current_entry) in Hashtbl.replace names n p; pass1#next() with | End_of_file as e -> raise e | Failure _ -> pass1#next_recoverable() ) done with End_of_file -> () ); let pass2 = (self#data_iterator start_opt) # start() in let rec skip_dups failsafe = let n = pass2#current_name in let p = Kvseq.get_pointer (pass2#current_entry) in let is_dup = try Hashtbl.find names n <> p with Not_found -> false in if is_dup then ( ( try pass2#next() with | Failure _ when failsafe -> pass2#next_recoverable() ); skip_dups failsafe ) in let initial_skip = ref true in ( object method current_name = if !initial_skip then ( skip_dups false; initial_skip := false ); pass2#current_name method current_file = if !initial_skip then ( skip_dups false; initial_skip := false ); pass2#current_file method current_entry = if !initial_skip then ( skip_dups false; initial_skip := false ); pass2#current_entry method next() = if !initial_skip then ( skip_dups false; initial_skip := false ); pass2#next(); skip_dups false method next_recoverable() = pass2#next_recoverable(); skip_dups true; initial_skip := false end ) end ) method private indexcheck_iterator start_opt : ht_file_descr x_file_system_iterator = (* This version takes have_dups into account *) ( object(iself) method start() = let di = (self # data_iterator start_opt) # start() in let (_, hi) = self # kv_hi false in let rec skip_dups failsafe = let name = di#current_name in let exists_in_index = try ignore(HI.lookup hi (name ^ "/I0")); true with Not_found -> false in if not exists_in_index then ( ( try di#next() with | _ when failsafe -> di#next_recoverable() ); skip_dups failsafe ) in let initial_skip = ref true in ( object method current_name = if !initial_skip then ( skip_dups false; initial_skip := false ); di#current_name method current_file = if !initial_skip then ( skip_dups false; initial_skip := false ); di#current_file method current_entry = if !initial_skip then ( skip_dups false; initial_skip := false ); di#current_entry method next() = if !initial_skip then ( skip_dups false; initial_skip := false ); di#next(); skip_dups false method next_recoverable() = di#next_recoverable(); skip_dups true; initial_skip := false end ) end ) method private eff_iterator start_opt dedup_mode = (* Select the right iterator: *) ( object(iself) method start() = let i = if self#have_dups then ( match dedup_mode with | `Off -> self # data_iterator start_opt | `Twopass -> self # twopass_iterator start_opt | `Indexcheck -> self # indexcheck_iterator start_opt ) else self # data_iterator start_opt in i#start() end ) method iterator start_opt dedup_mode = let i = self # eff_iterator start_opt dedup_mode in (i :> ht_file_descr file_system_iterator) method idx_iterator() = (self # eff_idx_iterator() :> ht_file_descr file_system_iterator) method private eff_idx_iterator() : ht_file_descr x_file_system_iterator = ( object(iself) method start() = (* First iterate over index, and collect all valid positions *) let (kv, hi) = self # kv_hi false in let l = ref [] in ( try let e = ref (HI.first_entry hi) in l := (HI.Container.string_of_pointer (HI.get_pointer !e)) :: !l; while true do e := HI.next_entry !e; l := (HI.Container.string_of_pointer (HI.get_pointer !e)) :: !l; done with | End_of_file -> () ); (* Now sort the positions. They are represented as strings. These * are int64 numbers in big endian representation. *) l := List.sort String.compare !l; (* Iterate over the positions *) let cur_e = ref (self # idx_find_inode kv l) in ( object(iiself) method current_name = let key = Kvseq.get_key !cur_e in let l = String.length key in String.sub key 0 (l-3) method current_file = let hf = new_file_descr() in try self # read_inode hf !cur_e false; hf with Not_found -> assert false method current_entry = !cur_e method next () = cur_e := self # idx_find_inode kv l method next_recoverable() = cur_e := self # idx_find_inode ~recover:true kv l end ) end ) method private is_inode e = let is_del = Kvseq.get_delflag e in not is_del && let key = Kvseq.get_key e in let l = String.length key in l >= 3 && (String.sub key (l-3) 3 = "/I0") method private find_first_inode ?(recover = false) kv start_opt = let e_opt = try let e = match start_opt with | None -> Kvseq.first_entry kv (* or End_of_file *) | Some start -> Kvseq.lookup kv (Kvseq.pointer_of_string start) in Some e with | Failure _ when recover -> None in match e_opt with | Some e -> if self#is_inode e then e else self#find_next_inode ~recover kv e | None -> let startpnt = match start_opt with | None -> None | Some start -> Some(Kvseq.pointer_of_string start) in let e = Kvseq.recover_entry key_regexp kv startpnt in if self#is_inode e then e else self#find_next_inode ~recover kv e method private find_next_inode ?(recover = false) kv e = let e' = if recover then Kvseq.recover_entry key_regexp kv (Some (Kvseq.get_pointer e)) else Kvseq.next_entry e in if self#is_inode e' then e' else self # find_next_inode ~recover kv e' method private idx_find_inode ?(recover = false) kv l = match !l with | [] -> raise End_of_file | p :: l' -> l := l'; try let e = Kvseq.lookup kv (Kvseq.pointer_of_string p) in if self#is_inode e then e else self # idx_find_inode ~recover kv l with | _ when recover -> self # idx_find_inode ~recover kv l method reindex fault_tolerant repair fully_buffered_index iteratortype params = let (kv, hi) = self # kv_hi false in (* Only a shared lock is needed! *) self # flush(); let fn_hi = Filename.concat dir fsysname ^ ".idx" in let fn_hi_new = Filename.concat dir fsysname ^ ".idx.new" in let fd_hi_new = Unix.openfile fn_hi_new [Unix.O_RDWR; Unix.O_CREAT; Unix.O_TRUNC ] 0o666 in try let io_hi_new = ( object method file_descr = fd_hi_new method dispose_hint() = () end ) in let hi_new = HI.create ~sbsize ~htsize:(Int64.of_int params.ht_table_size) ~cellsz:(cellsz_of_params params) ~hash_algo:params.ht_hash_algo ~purpose:"FSYSIDX" kv io_hi_new in HI.flush hi_new; let hi_new = HI.access ~buffer_size:(1024*1024) ~fully_buffered_index kv io_hi_new in HI.configure ~flush_every:max_int ~auto_fadvise:(not fsys_config.hindex_caching) hi_new; let have_errors = ref false in ( try let iterator = match iteratortype with | `Data -> self # eff_iterator None `Twopass | `Index -> self # eff_idx_iterator() in let iter = iterator # start() in while true do (try let hf = iter # current_file in ( match hf.hf_ientry with | None -> assert false | Some e -> ignore(HI.add hi_new e) ) with | error -> Log.logf `Err "reindex: Exception at %s: %s " iter#current_name (Printexc.to_string error); have_errors := true; if repair then ( let e = iter#current_entry in Log.logf `Notice "reindex: Deleting inode %s" iter#current_name; Kvseq.delete e ) ); let last_name = iter#current_name in try iter # next() with | End_of_file as e -> raise e | error -> Log.logf `Err "reindex: Exception after %s: %s " last_name (Printexc.to_string error); have_errors := true; iter # next_recoverable(); Log.logf `Err "reindex: Skipping damaged file region. Next file is %s" iter#current_name done with | End_of_file -> () ); HI.sync hi_new; configure_kv fsys_config kv hi_new; configure_hi fsys_config hi_new; if fault_tolerant || not !have_errors then ( (* Now get write lock. Note that this may deadlock! *) let (_, _) = self # kv_hi ~retry_on_deadlock:true true in Unix.rename fn_hi_new fn_hi; hi_opt <- Some(hi_new, io_hi_new); true ) else ( Log.logf `Notice "reindex: Keeping old files because of errors"; false ) with | error -> Unix.close fd_hi_new; raise error method private have_dups = let (kv, _) = self # kv_hi false in try Superblock.variable (Kvseq.superblock kv) "HAVEDUPS" <> 0L with | Not_found -> false method params = let (kv, hi) = self # kv_hi false in let isz = try Superblock.variable (Kvseq.superblock kv) "ISZ" with Not_found -> failwith "params: ISZ missing" in let cellsz = try Superblock.variable (HI.superblock hi) "CELLSZ" with Not_found -> 1L in { ht_inode_size = ~<< isz; ht_table_size = ~<< (HI.htsize hi); ht_hash_algo = HI.hash_algo hi; ht_index_type = ( match cellsz with | 1L -> `Plain | 2L -> `Stored_hashes | _ -> failwith "params: invalid CELLSZ" ); ht_have_dups = self#have_dups } method stats = let (kv, hi) = self # kv_hi false in let sb = Kvseq.superblock kv in let itotsz = try Superblock.variable sb "ITOTSZ" with Not_found -> assert false in let dtotsz = try Superblock.variable sb "DTOTSZ" with Not_found -> assert false in let used = itotsz +@ dtotsz in let fsize = try Superblock.variable sb "FILESIZE" with Not_found -> assert false in { ht_table_total = ~<< (HI.num_entries hi); ht_table_del = ~<< (HI.num_entries hi -@ HI.num_active_entries hi); ht_data = fsize; ht_dead_data = fsize -@ used; ht_used_data = used } method create_same new_fsysname truncate = let flags = if truncate then [Unix.O_TRUNC] else [Unix.O_EXCL] in let (kv, hi, io_kv, io_hi, fd_lock, fsys_config) = create_filesys_files dir new_fsysname "" self#params flags in (* TODO: Some configs aren't effective, e.g. fully_buffered_index *) let sys = new ht_file_system dir new_fsysname (Some(kv,io_kv)) (Some(hi,io_hi)) (Some fd_lock) true fsys_config in sys method rename_files new_fsysname = let (fn_lock_old, fn_kv_old, fn_hi_old) = filenames dir fsysname in let (fn_lock_new, fn_kv_new, fn_hi_new) = filenames dir new_fsysname in Unix.rename fn_lock_old fn_lock_new; Unix.rename fn_kv_old fn_kv_new; Unix.rename fn_hi_old fn_hi_new end class type plugin = object method impl_name : string method compatibility : ht_base -> string -> int method get_derived : ht_base -> string -> ht_file_system_t -> ht_file_system_t end let new_base dir = { ht_dir = dir; ht_sys = Hashtbl.create 50; ht_glock_opt = None } let lock_fd b = match b.ht_glock_opt with | Some fd -> Some(fd,false) | None -> try let dir = b.ht_dir in Some (Unix.openfile (Filename.concat dir "GLOBAL.lock") [Unix.O_RDWR] 0, true ) with | Unix.Unix_error(Unix.ENOENT,_,_) -> None let lock_base b = let glock = match lock_fd b with | None -> failwith "Cannot lock the base: No GLOBAL.lock file" | Some(fd,_) -> fd in Netsys.restart (Unix.lockf glock Unix.F_RLOCK) 0; b.ht_glock_opt <- Some glock let ht_filesys b fsysname = (* This function doesn't take plugins into account *) let fsys_config = { hindex_caching = true; data_caching = true; sync_every = default_syncdelta; big_readahead = false; fully_buffered_index = false; mutable_mtime = true; inode_relocatable = true; onsync = (fun () -> ()); read_only_mode = false; } in let fn_lock = Filename.concat b.ht_dir fsysname ^ ".lock" in if Sys.file_exists fn_lock then ( new ht_file_system b.ht_dir fsysname None None None false fsys_config ) else failwith ("Filesys does not exist: " ^ fsysname) ;; let plugins = ref ([] : plugin list) let register_plugin p = plugins := p :: !plugins let filesys b fsysname = try Hashtbl.find b.ht_sys fsysname with | Not_found -> let ht = ht_filesys b fsysname in let best = ref None in List.iter (fun p -> let n = p#compatibility b fsysname in if n > 0 then ( match !best with | Some (_, best_n) -> if n > best_n then best := Some(p,n) | None -> best := Some(p,n) ) ) !plugins; let ht' = match !best with | None -> ht | Some (p,_) -> p#get_derived b fsysname ht in Hashtbl.replace b.ht_sys fsysname ht'; ht' ;; let override b fsysname ht = Hashtbl.replace b.ht_sys fsysname ht ;; let base_dir b = b.ht_dir ;; let names b = List.map (fun n -> Filename.chop_suffix n ".data") (List.filter (fun n -> Filename.check_suffix n ".data" && (not (Filename.check_suffix n ".new.data")) && ( let n_base = Filename.chop_suffix n ".data" in Sys.file_exists (Filename.concat b.ht_dir (n_base ^ ".lock")) && Sys.file_exists (Filename.concat b.ht_dir (n_base ^ ".idx")) ) ) (Array.to_list (Sys.readdir b.ht_dir)) ) ;; let get_ht_filesys b fsysname = (filesys b fsysname :> ht_file_descr file_system) ;; let configure_filesys ?hindex_caching ?data_caching ?sync_every ?big_readahead ?fully_buffered_index b fsysname = let fsys = filesys b fsysname in let old_config = fsys # get_config in let new_config = { old_config with hindex_caching = ( match hindex_caching with | None -> old_config.hindex_caching | Some flag -> flag ); data_caching = ( match data_caching with | None -> old_config.data_caching | Some flag -> flag ); sync_every = ( match sync_every with | None -> old_config.sync_every | Some n -> n ); big_readahead = ( match big_readahead with | None -> old_config.big_readahead | Some flag -> flag ); fully_buffered_index = ( match fully_buffered_index with | None -> old_config.fully_buffered_index | Some flag -> flag ); } in fsys # configure new_config ;; let plugin_reconfigure b fsysname plugin f = (* plugin appears in the signature to ensure that only plugin implementers call this - otherwise dangerous - function *) let fsys = filesys b fsysname in let old_config = fsys # get_config in let new_config = f old_config in fsys # configure new_config ;; let willneed_filesys b fsysname = (* We have to be a bit careful. The filesystem is in the default configuration, and shouldn't leave it in ht_sys in this state *) let is_known = Hashtbl.mem b.ht_sys fsysname in (filesys b fsysname) # willneed(); if not is_known then Hashtbl.remove b.ht_sys fsysname ;; let willneed_all_filesys b = let l = names b in List.iter (willneed_filesys b) l ;; let check_all_filesys b = let cont, glock_opt, opened = match lock_fd b with | None -> (true, None, false) | Some(glock, opened) -> ( try Unix.lockf glock Unix.F_TLOCK 0; (true, Some glock, opened) with Unix.Unix_error(Unix.EAGAIN,_,_) -> if opened then Unix.close glock; (false, None, false) ) in if cont then ( let l = names b in Log.logf `Notice "File system check for %d fsys files in directory %s" (List.length l) b.ht_dir; List.iter (fun n -> let fsys = filesys b n in let rb = fsys # check() in (* This checks for integrity *) if rb then ( fsys # checkpoint(); (* This performs the rollback! *) Log.logf `Alert "File system has been rolled back: %s" n; ); fsys # dispose(); ) l; Log.logf `Notice "File system check done"; ( match glock_opt with | None -> () | Some glock -> if opened then Unix.close glock else Unix.lockf glock Unix.F_RLOCK 0; ) ) else Log.logf `Notice "Omitting file system check because there are already users" ;; let check_filesys b name = let cont, glock_opt, opened = match lock_fd b with | None -> (true, None, false) | Some(glock, opened) -> ( try Unix.lockf glock Unix.F_TLOCK 0; (true, Some glock, opened) with Unix.Unix_error(Unix.EAGAIN,_,_) -> if opened then Unix.close glock; (false, None, false) ) in if cont then ( Log.logf `Notice "File system check for fsys %s/%s" b.ht_dir name; let fsys = filesys b name in let rb = fsys # check() in (* This checks for integrity *) if rb then ( fsys # checkpoint(); (* This performs the rollback! *) Log.logf `Alert "File system has been rolled back: %s/%s" b.ht_dir name; ); fsys # dispose(); Log.logf `Notice "File system check done"; ( match glock_opt with | None -> () | Some glock -> if opened then Unix.close glock else Unix.lockf glock Unix.F_RLOCK 0; ) ) else Log.logf `Notice "Omitting file system check because there are already users" ;; let get_iterator ?at_filepos ?(dedup_mode=`Off) b fsysname = let sys = filesys b fsysname in (sys # iterator at_filepos dedup_mode :> ht_file_descr file_system_iterator) ;; let get_idx_iterator b fsysname = let sys = filesys b fsysname in (sys # idx_iterator() :> ht_file_descr file_system_iterator) ;; let filesys_params b name = let sys = filesys b name in sys # params ;; let dispose_all b = Hashtbl.iter (fun _ sys -> sys#dispose()) b.ht_sys ;; let checkpoint_and_dispose_all ?soft b = (* First release all locks - to avoid deadlocks for sure *) dispose_all b; let l = names b in List.iter (fun name -> let fsys = filesys b name in fsys#checkpoint ?soft (); fsys#dispose() ) l ;; let get_stats b name = let sys = filesys b name in sys # stats ;; let reindex ?(fault_tolerant=false) ?(repair=false) ?(itype=`Data) ?(fully_buffered_index=false) b name p = let sys = filesys b name in sys # reindex fault_tolerant repair fully_buffered_index itype p ;; let forget b name = try let fsys = Hashtbl.find b.ht_sys name in fsys # dispose(); Hashtbl.remove b.ht_sys name with Not_found -> () ;; let compact ?(fault_tolerant=false) ?(itype=`Data) ?(fully_buffered_index=false) b name p = let sys = filesys b name in let name' = name ^ ".new" in forget b name'; let sys' = sys#create_same name' true in configure_filesys ~fully_buffered_index ~hindex_caching:true ~data_caching:true ~sync_every:(-1) b name'; let have_errors = ref false in (* Now copy all files from [sys] to [sys']: *) let s = String.create 1048576 (* 1M *) in ( try let iterator = match itype with | `Data -> sys # iterator None `Twopass | `Index -> sys # idx_iterator () in let iter = iterator # start() in while true do let name = iter # current_name in let to_del = ref None in ( try let hf = iter # current_file in (* Here we would need [reserve]... *) let mtime = sys # file_mtime hf in let ftype = sys # file_type hf in let hf', _ = sys' # open_file_wr_ext [ `Mtime mtime ] name [] (Some ftype) in to_del := Some hf'; (* in case of an error *) let k = ref 0L in let size = sys # file_size hf in while !k < size do let n = sys # read_file hf s 0 !k (String.length s) in assert(n <> 0); sys' # write_file hf' s 0 !k n; k := !k +@ (~> n) done with | error -> Log.logf `Err "compact: exception at %s: %s" name (Printexc.to_string error); have_errors := true; ( match !to_del with | None -> () | Some hf' -> sys' # delete_file hf'; ) ); try iter # next() with | End_of_file as e -> raise e | error -> Log.logf `Err "compact: Exception after %s: %s " name (Printexc.to_string error); have_errors := true; iter # next_recoverable(); Log.logf `Err "compact: Skipping damaged file region. Next file is %s" iter#current_name done with | End_of_file -> () ); (* Finally turn [sys'] into [sys]: *) sys' # checkpoint(); let flag = if fault_tolerant || not !have_errors then ( (* Now get write lock: *) handle_deadlock sys # exclusive_access(); (* Rename files: *) sys' # rename_files name; true ) else ( Log.logf `Notice "compact: Keeping old files because of errors"; false ) in (* Fix up base: *) forget b name; forget b name'; (* Unlock: *) sys' # dispose(); sys # dispose(); flag let get_filesys b fsysname = (filesys b fsysname :> ht_file_descr file_system) let create_ht_filesys b fsysname params = let (kv, hi, io_kv, io_hi, fd_lock, fsys_config) = create_filesys_files b.ht_dir fsysname "" params [Unix.O_EXCL] in let sys = new ht_file_system b.ht_dir fsysname (Some(kv,io_kv)) (Some(hi,io_hi)) (Some fd_lock) true fsys_config in Hashtbl.replace b.ht_sys fsysname sys; sys ;; let create_filesys b fsysname params = let sys = create_ht_filesys b fsysname params in (sys :> ht_file_descr file_system) ;;