1+ use std:: path:: PathBuf ;
12use std:: sync:: Arc ;
23use std:: sync:: Mutex ;
34use std:: time:: Duration ;
@@ -10,6 +11,9 @@ use futures03::compat::Stream01CompatExt;
1011use futures03:: stream:: StreamExt ;
1112use futures03:: stream:: TryStreamExt ;
1213use lru_time_cache:: LruCache ;
14+ use object_store:: local:: LocalFileSystem ;
15+ use object_store:: path:: Path ;
16+ use object_store:: ObjectStore ;
1317use serde_json:: Value ;
1418
1519use crate :: derive:: CheapClone ;
@@ -29,22 +33,60 @@ enum Cache {
2933 Memory {
3034 cache : Arc < Mutex < LruCache < ContentPath , Vec < u8 > > > > ,
3135 } ,
36+ Disk {
37+ store : Arc < dyn ObjectStore > ,
38+ } ,
39+ }
40+
41+ fn log_err ( logger : & Logger , e : & object_store:: Error , log_not_found : bool ) {
42+ if log_not_found || !matches ! ( e, object_store:: Error :: NotFound { .. } ) {
43+ warn ! (
44+ logger,
45+ "Failed to get IPFS object from disk cache; fetching from IPFS" ;
46+ "error" => e. to_string( ) ,
47+ ) ;
48+ }
3249}
3350
3451impl Cache {
35- fn new ( capacity : usize ) -> Self {
36- Self :: Memory {
37- cache : Arc :: new ( Mutex :: new ( LruCache :: with_capacity ( capacity) ) ) ,
52+ fn new ( capacity : usize , path : Option < PathBuf > ) -> Self {
53+ match path {
54+ Some ( path) => {
55+ let fs = match LocalFileSystem :: new_with_prefix ( & path) {
56+ Err ( e) => {
57+ panic ! (
58+ "Failed to create IPFS file based cache at {}: {}" ,
59+ path. display( ) ,
60+ e
61+ ) ;
62+ }
63+ Ok ( fs) => fs,
64+ } ;
65+ Cache :: Disk {
66+ store : Arc :: new ( fs) ,
67+ }
68+ }
69+ None => Self :: Memory {
70+ cache : Arc :: new ( Mutex :: new ( LruCache :: with_capacity ( capacity) ) ) ,
71+ } ,
3872 }
3973 }
4074
41- async fn find ( & self , path : & ContentPath ) -> Option < Vec < u8 > > {
75+ async fn find ( & self , logger : & Logger , path : & ContentPath ) -> Option < Vec < u8 > > {
4276 match self {
4377 Cache :: Memory { cache } => cache. lock ( ) . unwrap ( ) . get ( path) . cloned ( ) ,
78+ Cache :: Disk { store } => {
79+ let log_err = |e : & object_store:: Error | log_err ( logger, e, false ) ;
80+
81+ let path = Path :: from ( path. cid ( ) . to_string ( ) ) ;
82+ let object = store. get ( & path) . await . inspect_err ( log_err) . ok ( ) ?;
83+ let data = object. bytes ( ) . await . inspect_err ( log_err) . ok ( ) ?;
84+ Some ( data. to_vec ( ) )
85+ }
4486 }
4587 }
4688
47- async fn insert ( & self , path : ContentPath , data : Vec < u8 > ) {
89+ async fn insert ( & self , logger : & Logger , path : ContentPath , data : Vec < u8 > ) {
4890 match self {
4991 Cache :: Memory { cache } => {
5092 let mut cache = cache. lock ( ) . unwrap ( ) ;
@@ -53,6 +95,15 @@ impl Cache {
5395 cache. insert ( path. clone ( ) , data. clone ( ) ) ;
5496 }
5597 }
98+ Cache :: Disk { store } => {
99+ let log_err = |e : & object_store:: Error | log_err ( logger, e, true ) ;
100+ let path = Path :: from ( path. cid ( ) . to_string ( ) ) ;
101+ store
102+ . put ( & path, data. into ( ) )
103+ . await
104+ . inspect_err ( log_err)
105+ . ok ( ) ;
106+ }
56107 }
57108 }
58109}
@@ -81,7 +132,10 @@ impl IpfsResolver {
81132
82133 Self {
83134 client,
84- cache : Cache :: new ( env. max_ipfs_cache_size as usize ) ,
135+ cache : Cache :: new (
136+ env. max_ipfs_cache_size as usize ,
137+ env. ipfs_cache_location . clone ( ) ,
138+ ) ,
85139 timeout : env. ipfs_timeout ,
86140 max_file_size : env. max_ipfs_file_bytes ,
87141 max_map_file_size : env. max_ipfs_map_file_size ,
@@ -111,7 +165,7 @@ impl LinkResolverTrait for IpfsResolver {
111165 let max_file_size = self . max_file_size ;
112166 let max_cache_file_size = self . max_cache_file_size ;
113167
114- if let Some ( data) = self . cache . find ( & path) . await {
168+ if let Some ( data) = self . cache . find ( & logger , & path) . await {
115169 trace ! ( logger, "IPFS cat cache hit" ; "hash" => path. to_string( ) ) ;
116170 return Ok ( data. to_owned ( ) ) ;
117171 }
@@ -132,7 +186,7 @@ impl LinkResolverTrait for IpfsResolver {
132186 . to_vec ( ) ;
133187
134188 if data. len ( ) <= max_cache_file_size {
135- self . cache . insert ( path. clone ( ) , data. clone ( ) ) . await ;
189+ self . cache . insert ( & logger , path. clone ( ) , data. clone ( ) ) . await ;
136190 } else {
137191 debug ! (
138192 logger,
0 commit comments