(* $Id: seqdb_fsys_ao.ml 16180 2008-01-18 20:57:28Z gerd $ *) open Seqdb_fsys_types open Seqdb_fsys_ht open Seqdb_containers open Printf type ao_params = { ao_time_mark_period : int; } exception Append_only_restriction of string let read_int64 s = Rtypes.int64_of_int8 (Rtypes.read_int8 s 0) let write_int64 n = Rtypes.int8_as_string (Rtypes.int8_of_int64 n) let ( +@ ) = Int64.add let ( -@ ) = Int64.sub let ( *@ ) = Int64.mul let ( /@ ) = Int64.div let ( ~> ) = Int64.of_int (* ">" = "convert to the bigger type" *) let ( ~< ) = Int64.to_int (* "<" = "convert to the smaller type" *) let configure_tm tm = Kvseq.configure ~flush_every:50 ~auto_sync:None ~auto_fadvise:false tm let open_time_file ?conservative fsyspath = let fn_time = fsyspath ^ ".time" in let fd_time = Unix.openfile fn_time [Unix.O_RDWR] 0 in let io_time = ( object method file_descr = fd_time method dispose_hint() = () end ) in let tm = Kvseq.access ~buffer_size:4096 ~chunk_size:4096 ?conservative io_time in configure_tm tm; (tm, io_time) let ao_params fsyspath = let (tm, io_time) = open_time_file fsyspath in try let sb = Kvseq.superblock tm in let timemp = try Superblock.variable sb "TIMEMP" with | Not_found -> failwith "Seqdb_fsys_ao: superblock variable TIMEMP not found" in Unix.close io_time#file_descr; { ao_time_mark_period = Int64.to_int timemp } with | error -> Unix.close io_time#file_descr; raise error let sbsize = 4096 (* CHECK: dup in Seqdb_fsys_ht *) let create_time_file ht fsyspath ao_params flags = let fn_time = fsyspath ^ ".time" in let fd_time = Unix.openfile fn_time ([Unix.O_RDWR; Unix.O_CREAT] @ flags) 0o666 in let io_time = ( object method file_descr = fd_time method dispose_hint() = () end ) in let tm = Kvseq.create ~sbsize ~keyrepr:(`Fixed 8) ~valrepr:(`Fixed 8) ~supports_deletions:false ~fileincr:65536L ~purpose:"FSYSTIME" ~buffer_size:4096 ~chunk_size:4096 io_time in let sb = Kvseq.superblock tm in Superblock.set_variable sb "TIMEMP" (Int64.of_int ao_params.ao_time_mark_period); Kvseq.mark_superblock_as_dirty tm; (tm, io_time) let rollback fsyspath = let (tm,io_time) = open_time_file ~conservative:true fsyspath in try if Kvseq.rollback_flag tm then Kvseq.sync tm; Unix.close io_time#file_descr with | error -> Unix.close io_time#file_descr; raise error let mtime_flag flags = (* TODO: dup in Seqdb_fsys_ht *) try match List.find (function `Mtime _ -> true | _ -> false) flags with | `Mtime t -> Some t | _ -> assert false with | Not_found -> None let check_mtime tm t = try let sb = Kvseq.superblock tm in let lastmt = Superblock.variable sb "LASTMT" in (* or Not_found *) t >= lastmt with | Not_found -> true class ao_file_system ht dir fsysname : ht_file_system_t = (* Define the ao layer on top of ht *) let fsyspath = Filename.concat dir fsysname in object(self) initializer ( let config = ht#get_config in let config' = { config with mutable_mtime = false; inode_relocatable = false; onsync = self # sync } in ht#configure config'; ) val mutable tm_io_opt = None method private tm = match tm_io_opt with | None -> let (tm, io_time) = open_time_file fsyspath in tm_io_opt <- Some(tm, io_time); tm | Some (tm, _) -> tm method private maybe_write_timemark fd = (* This method is only called as post-action to file creation. Because of this we know that the fsys is exclusively locked. So adding a new entry to the time file is permitted. *) let now = ht#file_mtime fd in let tm = self#tm in let sb = Kvseq.superblock tm in let do_write, timemn = try let timemp = Superblock.variable sb "TIMEMP" in let timemark = Superblock.variable sb "TIMEMARK" in let timemn = Superblock.variable sb "TIMEMN" in (* In order to avoid that many timemarks are written for sparse file regions (only a few files within a TIMEMARK period), we limit the number of timemarks. At least 1000 files must be between two timemarks. *) ( (now >= timemark +@ timemp) && (timemn >= 999L), timemn ) with Not_found -> (true, 0L) in let timemn' = if do_write then ( let _entry = Kvseq.add tm { Kvseq.delflag = false; key = write_int64 now; value = write_int64 (ht#filepos fd); } in Superblock.set_variable sb "TIMEMARK" now; 1L ) else timemn +@ 1L in (* In any case update the superblock: *) Superblock.set_variable sb "TIMEMN" timemn'; Superblock.set_variable sb "LASTMT" now; Kvseq.mark_superblock_as_dirty tm method private sync() = Kvseq.sync self#tm method exclusive_access = ht#exclusive_access method open_file_rd filename filetypes = ht#open_file_rd filename filetypes method open_file_wr filename filetypes crtype = (* Restriction: only allow creation of new files, not opening existing files *) let fd, created = try ht#open_file_wr_ext [`Excl] filename filetypes crtype with | Seqdb_fsys_types.File_exists _ -> raise(Append_only_restriction fsyspath) in self#maybe_write_timemark fd; (fd, created) method open_file_wr_ext flags filename filetypes crtype = (* Restriction: only allow creation of new files, not opening existing files. Furthermore, if an `Mtime flag is given, this time must not be smaller than the mtime of the last file in the fsys. *) ht#exclusive_access(); (* get write lock immediately to avoid deadlocks *) ( match mtime_flag flags with | Some t -> if not (check_mtime self#tm t) then ( raise(Append_only_restriction fsyspath) ) | None -> () ); let (fd, created) = try ht#open_file_wr_ext (`Excl :: flags) filename filetypes crtype with | Seqdb_fsys_types.File_exists _ when not(List.mem `Excl flags)-> raise(Append_only_restriction fsyspath) in self#maybe_write_timemark fd; (fd, created) method file_size fd = ht#file_size fd method file_type fd = ht#file_type fd method file_mtime fd = ht#file_mtime fd method set_file_type fd t = ht#set_file_type fd t method set_file_mtime fd tm = raise(Append_only_restriction fsyspath) method read_file fd buf bufpos filepos len = ht#read_file fd buf bufpos filepos len method write_file fd buf bufpos filepos len = (* Restriction: only allowed if fd is still the last file *) self # exclusive_access(); if not (ht#is_last_file fd) then ( raise(Append_only_restriction fsyspath); ); ht#write_file fd buf bufpos filepos len method truncate fd len = raise(Append_only_restriction fsyspath) method reserve fd len = (* Restriction: this is a no-op *) () method close_file fd = ht#close_file fd method delete_name name = ht#delete_name name method delete_file fd = ht#delete_file fd method delete_name_from_index name = ht#delete_name_from_index name method rename_file fd name = (* Restriction: it is not allowed that the name is so long that a bigger inode must be allocated. Because we disable relocatable inodes in [ht], we get this specific failure here in case a new inode would be needed *) try ht#rename_file fd name with | Failure "inodes not relocatable" -> raise(Append_only_restriction fsyspath) method filepos fd = ht#filepos fd method is_last_file fd = ht#is_last_file fd method cmp_filepos name1 name2 = ht#cmp_filepos name1 name2 method guess_filepos name = ht#guess_filepos name method checkpoint ?soft () = ht#checkpoint ?soft () method dispose() = ( match tm_io_opt with | None -> () | Some(tm,io_time) -> Kvseq.flush tm; Unix.close io_time#file_descr; tm_io_opt <- None; ); ht#dispose() method superblock_variable name = ht#superblock_variable name method set_superblock_variable name v = ht#set_superblock_variable name v (* ------------- Plugin API only: ---------------- *) method check() = (* We first check [ht]. If here a rollback has to be done, it is wise to also check whether we need a rollback. Because our sync is a post action of ht's sync, the rollback is ensured to go back to a valid size then. CHECK: The case that ht does not need a rollback, but we... *) let rb = ht#check() in assert(tm_io_opt = None); (* because ht#check disposes descriptors *) if rb then rollback fsyspath; rb method get_config = ht#get_config method configure cfg = (* Changing read_only_mode is only permitted when files are closed *) let old_cfg = ht#get_config in if old_cfg.read_only_mode <> cfg.read_only_mode && ht#is_open then failwith "Seqdb_fsys_ao#configure: Cannot change read_only_mode while files are open"; ht#configure cfg method willneed = ht#willneed method params = ht#params method stats = ht#stats method is_open = ht#is_open method iterator = ht#iterator method idx_iterator = ht#idx_iterator method reindex = ht#reindex method create_same new_fsysname truncate = let ht' = ht#create_same new_fsysname truncate in let new_fsyspath = Filename.concat dir new_fsysname in let flags = if truncate then [Unix.O_TRUNC] else [Unix.O_EXCL] in let (tm,io_time) = create_time_file ht new_fsyspath (ao_params fsyspath) flags in Kvseq.flush tm; Unix.close io_time#file_descr; ht'#set_superblock_variable "HAVEAO" 1L; new ao_file_system ht' dir new_fsysname method rename_files new_fsysname = ht#rename_files new_fsysname; let new_fsyspath = Filename.concat dir new_fsysname in let fn_time_old = fsyspath ^ ".time" in let fn_time_new = new_fsyspath ^ ".time" in Unix.rename fn_time_old fn_time_new end let derive_ao_filesys base name ht = new ao_file_system ht (Seqdb_fsys_ht.base_dir base) name let create_ao_filesys base name params ao_params = let ht = Seqdb_fsys_ht.create_ht_filesys base name params in let fsyspath = Filename.concat (Seqdb_fsys_ht.base_dir base) name in let (tm,io_time) = create_time_file ht fsyspath ao_params [Unix.O_EXCL] in Kvseq.flush tm; Unix.close io_time#file_descr; ht#set_superblock_variable "HAVEAO" 1L; let ao = derive_ao_filesys base name ht in Seqdb_fsys_ht.override base name ao; (ao :> ht_file_descr file_system) let filesys_ao_params base name = let fsyspath = Filename.concat (Seqdb_fsys_ht.base_dir base) name in ao_params fsyspath let time_key e = read_int64 (Kvseq.get_key e) let time_val e = Kvseq.get_value e let get_ao_iterator ?at_mtime base name = let fsyspath = Filename.concat (Seqdb_fsys_ht.base_dir base) name in let at_filepos = match at_mtime with | Some at_m -> (* Get a shared lock: *) let ao = Seqdb_fsys_ht.get_filesys base name in ignore(ao # guess_filepos ""); (* First iterate over tm until a record e is found with: - the mtime key of e is less than at_m - e is the last record, OR the next record has an mtime key greater than or equal to at_m *) let (tm, io_time) = open_time_file fsyspath in ( try let e = ref (Kvseq.first_entry tm) in (* or End_of_file *) let e_next_opt = ref (try Some(Kvseq.next_entry !e) with End_of_file -> None) in while time_key !e < at_m && ( match !e_next_opt with | None -> false | Some e_next -> time_key e_next < at_m ) do match !e_next_opt with | None -> assert false | Some e_next -> e := e_next; e_next_opt := (try Some(Kvseq.next_entry !e) with End_of_file -> None) done; Unix.close io_time#file_descr; (* The filepos of e is our starting position for the data file iteration! *) Some (time_val !e); with | End_of_file -> Unix.close io_time#file_descr; None | error -> Unix.close io_time#file_descr; raise error ) | None -> None in let it = Seqdb_fsys_ht.get_iterator ?at_filepos ~dedup_mode:`Indexcheck base name in (* Now [it] is positioned at the timemark. The actually demanded mtime may be somewhere later in the file, however, so we have to advance to this position. *) match at_mtime with | Some mtime -> ( object method start() = let ao = Seqdb_fsys_ht.get_filesys base name in let i = it#start() in let init = ref false in let rec do_init failsafe = (* Advance to the demanded position: *) try while ao#file_mtime i#current_file < mtime do i#next() done; init := true with | Failure _ when failsafe -> i#next_recoverable(); do_init failsafe in ( object method current_name = if not !init then do_init false; i#current_name method current_file = if not !init then do_init false; i#current_file method next() = if not !init then do_init false; i#next() method next_recoverable() = if not !init then do_init true; i#next_recoverable() end ) end ) | None -> it let ao_plugin = ( object method impl_name = "Seqdb_fsys_ao" method compatibility base name = let fsyspath = Filename.concat (Seqdb_fsys_ht.base_dir base) name in if Sys.file_exists (fsyspath ^ ".time") then 10 else 0 method get_derived base name ht = derive_ao_filesys base name ht end : Seqdb_fsys_ht.plugin ) let configure_ao_filesys ?read_only_mode base name = Seqdb_fsys_ht.plugin_reconfigure base name ao_plugin (fun cfg -> { cfg with read_only_mode = (match read_only_mode with | Some m -> m | None -> cfg.read_only_mode ) } ) let init() = Seqdb_fsys_ht.register_plugin ao_plugin