1
- use std:: sync:: Arc ;
2
-
3
1
use derive_builder:: Builder ;
4
2
use spider:: website:: Website ;
5
- use tokio:: { runtime:: Handle , sync:: RwLock } ;
6
3
7
4
use swiftide_core:: {
8
5
indexing:: { IndexingStream , Node } ,
@@ -38,7 +35,7 @@ impl ScrapingLoader {
38
35
39
36
impl Loader for ScrapingLoader {
40
37
fn into_stream ( mut self ) -> IndexingStream {
41
- let ( tx, rx) = std :: sync:: mpsc:: channel ( ) ;
38
+ let ( tx, rx) = tokio :: sync:: mpsc:: channel ( 1000 ) ;
42
39
let mut spider_rx = self
43
40
. spider_website
44
41
. subscribe ( 0 )
@@ -58,7 +55,7 @@ impl Loader for ScrapingLoader {
58
55
59
56
tracing:: debug!( ?node, "[Spider] Received node from spider" ) ;
60
57
61
- if let Err ( error) = tx. send ( node) {
58
+ if let Err ( error) = tx. send ( node) . await {
62
59
tracing:: error!( ?error, "[Spider] Failed to send node to stream" ) ;
63
60
break ;
64
61
}
@@ -69,14 +66,14 @@ impl Loader for ScrapingLoader {
69
66
70
67
let _scrape_thread = tokio:: spawn ( async move {
71
68
tracing:: info!( "[Spider] Starting scrape loop" ) ;
72
- spider_website. scrape ( ) . await ;
73
- spider_website. unsubscribe ( ) ;
69
+ // TODO: It would be much nicer if this used `scrape` instead, as it is supposedly
70
+ // more concurrent
71
+ spider_website. crawl ( ) . await ;
74
72
tracing:: info!( "[Spider] Scrape loop finished" ) ;
75
73
} ) ;
76
74
77
75
// NOTE: Handles should stay alive because of rx, but feels a bit fishy
78
-
79
- IndexingStream :: iter ( rx)
76
+ rx. into ( )
80
77
}
81
78
82
79
fn into_stream_boxed ( self : Box < Self > ) -> IndexingStream {
@@ -86,11 +83,12 @@ impl Loader for ScrapingLoader {
86
83
87
84
#[ cfg( test) ]
88
85
mod tests {
89
- use crate :: scraping:: loader:: ScrapingLoader ;
90
- use futures_util:: StreamExt as _;
86
+ use super :: * ;
87
+ use anyhow:: Result ;
88
+ use futures_util:: StreamExt ;
91
89
use swiftide_core:: indexing:: Loader ;
92
90
use wiremock:: matchers:: { method, path} ;
93
- use wiremock:: { Mock , MockServer , ResponseTemplate } ;
91
+ use wiremock:: { Mock , MockServer , Request , ResponseTemplate } ;
94
92
95
93
#[ test_log:: test( tokio:: test( flavor = "multi_thread" ) ) ]
96
94
async fn test_scraping_loader_with_wiremock ( ) {
@@ -109,17 +107,58 @@ mod tests {
109
107
let loader = ScrapingLoader :: from_url ( mock_server. uri ( ) ) ;
110
108
111
109
// Execute the into_stream method
112
- let mut stream = loader. into_stream ( ) ;
110
+ let stream = loader. into_stream ( ) ;
113
111
114
112
// Process the stream to check if we get the expected result
115
- while let Some ( node) = stream. next ( ) . await {
116
- tracing:: info!( ?node, "Received node from stream" ) ;
117
- // Assert the scraped content against expected content
118
- assert_eq ! ( node. unwrap( ) . chunk, body) ;
119
- }
113
+ let nodes = stream. collect :: < Vec < Result < Node > > > ( ) . await ;
114
+
115
+ assert_eq ! ( nodes. len( ) , 1 ) ;
116
+
117
+ let first_node = nodes. first ( ) . unwrap ( ) . as_ref ( ) . unwrap ( ) ;
118
+
119
+ assert_eq ! ( first_node. chunk, body) ;
120
+ }
121
+
122
+ #[ test_log:: test( tokio:: test( flavor = "multi_thread" ) ) ]
123
+ async fn test_scraping_loader_multiple_pages ( ) {
124
+ // Set up the wiremock server to simulate the remote web server
125
+ let mock_server = MockServer :: start ( ) . await ;
126
+
127
+ // Mocked response for the page we will scrape
128
+ let body = "<html><body><h1>Test Page</h1><a href=\" /other\" >link</a></body></html>" ;
129
+ Mock :: given ( method ( "GET" ) )
130
+ . and ( path ( "/" ) )
131
+ . respond_with ( ResponseTemplate :: new ( 200 ) . set_body_string ( body) )
132
+ . mount ( & mock_server)
133
+ . await ;
134
+
135
+ let body2 = "<html><body><h1>Test Page 2</h1></body></html>" ;
136
+ Mock :: given ( method ( "GET" ) )
137
+ . and ( path ( "/other" ) )
138
+ . respond_with ( move |_req : & Request | {
139
+ std:: thread:: sleep ( std:: time:: Duration :: from_secs ( 1 ) ) ;
140
+ ResponseTemplate :: new ( 200 ) . set_body_string ( body2)
141
+ } )
142
+ . mount ( & mock_server)
143
+ . await ;
144
+
145
+ // Create an instance of ScrapingLoader using the mock server's URL
146
+ let loader = ScrapingLoader :: from_url ( mock_server. uri ( ) ) ;
147
+
148
+ // Execute the into_stream method
149
+ let stream = loader. into_stream ( ) ;
150
+
151
+ // Process the stream to check if we get the expected result
152
+ let mut nodes = stream. collect :: < Vec < Result < Node > > > ( ) . await ;
153
+
154
+ assert_eq ! ( nodes. len( ) , 2 ) ;
155
+
156
+ let first_node = nodes. pop ( ) . unwrap ( ) . unwrap ( ) ;
157
+
158
+ assert_eq ! ( first_node. chunk, body2) ;
120
159
121
- tracing :: info! ( "Stream finished" ) ;
160
+ let second_node = nodes . pop ( ) . unwrap ( ) . unwrap ( ) ;
122
161
123
- drop ( stream ) ;
162
+ assert_eq ! ( second_node . chunk , body ) ;
124
163
}
125
164
}
0 commit comments