@@ -23,13 +23,6 @@ def __exit__(self, exc_type, exc_val, exc_tb):
2323 self .db .close ()
2424
2525
26- def _create_duckdb_table (db , table_name = DUCKDB_TABLE ):
27- sql_connection = frappe .db .get_connection ()
28- df = pd .read_sql ("SELECT * FROM `tabWeb Page View` LIMIT 0" , sql_connection )
29- db .register ("df" , df )
30- db .execute (f"CREATE TABLE IF NOT EXISTS { table_name } AS SELECT * FROM df" )
31-
32-
3326def _get_date_filter (from_date : str | None = None , to_date : str | None = None ):
3427 if not from_date or not to_date :
3528 return ""
@@ -58,28 +51,24 @@ def _get_route_filter(route: str | None = None, route_filter_type: str = "wildca
5851 return f"path LIKE '%{ route } %'"
5952
6053
61- def reset_duckdb_table (table_name = DUCKDB_TABLE ):
54+ def setup_duckdb_table (table_name = DUCKDB_TABLE ):
6255 with DuckDBConnection () as db :
63- print ("Reading Web Page View data from Frappe database..." )
64- start_time = time .time ()
6556 sql_connection = frappe .db .get_connection ()
66- df = pd .read_sql ("SELECT * FROM `tabWeb Page View`" , sql_connection )
67-
68- read_time = time .time ()
69- print (f"Read { len (df )} records in { read_time - start_time :.2f} seconds" )
70-
71- print (f"Creating DuckDB table '{ table_name } ' from DataFrame..." )
57+ df = pd .read_sql ("SELECT * FROM `tabWeb Page View`" , sql_connection ) # type: ignore
7258 db .register ("df" , df )
7359 db .execute (f"CREATE OR REPLACE TABLE { table_name } AS SELECT * FROM df" )
74-
75- insert_time = time .time ()
76- print (f"Created table in { insert_time - read_time :.2f} seconds" )
7760 print (f"Successfully ingested { len (df )} records into DuckDB" )
7861
7962
8063def ingest_web_page_views_to_duckdb (table_name = DUCKDB_TABLE ):
8164 with DuckDBConnection () as db :
82- _create_duckdb_table (db , table_name )
65+ table_exists = db .execute (
66+ f"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{ table_name } '"
67+ ).fetchone ()
68+ if table_exists and table_exists [0 ] == 0 :
69+ setup_duckdb_table (table_name )
70+ return
71+
8372 result = db .execute (f"SELECT MAX(creation) FROM { table_name } " ).fetchone ()
8473 last_record = result [0 ] if result and result [0 ] else None
8574
@@ -99,7 +88,6 @@ def ingest_web_page_views_to_duckdb(table_name=DUCKDB_TABLE):
9988 last_record = result [0 ] if result and result [0 ] else None
10089
10190 filters = {"creation" : [">" , last_record ]} if last_record else {}
102- start_time = time .time ()
10391 records = frappe .get_all (
10492 "Web Page View" ,
10593 filters = filters ,
@@ -108,19 +96,14 @@ def ingest_web_page_views_to_duckdb(table_name=DUCKDB_TABLE):
10896 limit = page_size ,
10997 order_by = "creation asc" ,
11098 )
111- end_time = time .time ()
112- print (f"Query execution time: { end_time - start_time :.2f} seconds" )
11399
114100 if not records :
115101 break
116102
117- start_insert_time = time .time ()
118103 db .executemany (
119104 f"INSERT INTO { table_name } (creation, is_unique, path, referrer, time_zone, user_agent) VALUES (?, ?, ?, ?, ?, ?)" ,
120105 records ,
121106 )
122- end_insert_time = time .time ()
123- print (f"Insert execution time: { end_insert_time - start_insert_time :.2f} seconds" )
124107
125108 processed += len (records )
126109 progress = (processed / total_count ) * 100 if total_count > 0 else 100
0 commit comments