text block
text block
text block
import json
import pandas as pd
import numpy as np
import re
from sqlalchemy import create_engine
import psycopg2
from config import db_password
import time
# Add the clean movie function that takes in the argument, "movie".
def clean_movie ( movie ):
movie = dict ( movie ) #create a non-destructive copy
alt_titles = {}
# combine alternate titles into one list
for key in [ 'Also known as' , 'Arabic' , 'Cantonese' , 'Chinese' , 'French' ,
'Hangul' , 'Hebrew' , 'Hepburn' , 'Japanese' , 'Literally' ,
'Mandarin' , 'McCune-Reischauer' , 'Original title' , 'Polish' ,
'Revised Romanization' , 'Romanized' , 'Russian' ,
'Simplified' , 'Traditional' , 'Yiddish' ]:
if key in movie :
alt_titles [ key ] = movie [ key ]
movie . pop ( key )
if len ( alt_titles ) > 0 :
movie [ 'alt_titles' ] = alt_titles
# merge column names
def change_column_name ( old_name , new_name ):
if old_name in movie :
movie [ new_name ] = movie . pop ( old_name )
change_column_name ( 'Adaptation by' , 'Writer(s)' )
change_column_name ( 'Country of origin' , 'Country' )
change_column_name ( 'Directed by' , 'Director' )
change_column_name ( 'Distributed by' , 'Distributor' )
change_column_name ( 'Edited by' , 'Editor(s)' )
change_column_name ( 'Length' , 'Running time' )
change_column_name ( 'Original release' , 'Release date' )
change_column_name ( 'Music by' , 'Composer(s)' )
change_column_name ( 'Produced by' , 'Producer(s)' )
change_column_name ( 'Producer' , 'Producer(s)' )
change_column_name ( 'Productioncompanies ' , 'Production company(s)' )
change_column_name ( 'Productioncompany ' , 'Production company(s)' )
change_column_name ( 'Released' , 'Release Date' )
change_column_name ( 'Release Date' , 'Release date' )
change_column_name ( 'Screen story by' , 'Writer(s)' )
change_column_name ( 'Screenplay by' , 'Writer(s)' )
change_column_name ( 'Story by' , 'Writer(s)' )
change_column_name ( 'Theme music composer' , 'Composer(s)' )
change_column_name ( 'Written by' , 'Writer(s)' )
return movie
# 1 Add the function that takes in three arguments;
# Wikipedia data, Kaggle metadata, and MovieLens rating data (from Kaggle)
def extract_transform_load ( wiki_data , kaggle_data , ratings_data ):
# Read in the kaggle metadata and MovieLens ratings CSV files as Pandas DataFrames.
kaggle_metadata = pd . read_csv ( kaggle_data )
ratings = pd . read_csv ( ratings_data , low_memory = False )
# Open and read the Wikipedia data JSON file.
with open ( wiki_data , mode = 'r' ) as file :
wiki_movies_raw = json . load ( file )
# Write a list comprehension to filter out TV shows.
wiki_movies = [ movie for movie in wiki_movies_raw
if ( 'Director' in movie or 'Directed by' in movie )
and 'imdb_link' in movie
and 'No. of episodes' not in movie ]
# Write a list comprehension to iterate through the cleaned wiki movies list
# and call the clean_movie function on each movie.
wiki_movies = [ clean_movie ( movie ) for movie in wiki_movies ]
# Read in the cleaned movies list from Step 4 as a DataFrame.
wiki_movies_df = pd . DataFrame ( wiki_movies )
# Write a try-except block to catch errors while extracting the IMDb ID using a regular expression string and
# dropping any imdb_id duplicates. If there is an error, capture and print the exception.
try :
wiki_movies_df [ 'imdb_id' ] = wiki_movies_df [ 'imdb_link' ]. str . extract ( r '(tt\d{7})' )
wiki_movies_df . drop_duplicates ( subset = 'imdb_id' , inplace = True )
except Exception :
print ( Exception )
# Write a list comprehension to keep the columns that don't have null values from the wiki_movies_df DataFrame.
wiki_columns_to_keep = [ column for column in wiki_movies_df . columns if wiki_movies_df [ column ]. isnull (). sum () < len ( wiki_movies_df ) * 0.9 ]
wiki_movies_df = wiki_movies_df [ wiki_columns_to_keep ]
# Create a variable that will hold the non-null values from the “Box office” column.
box_office = wiki_movies_df [ 'Box office' ]
# Convert the box office data created in Step 8 to string values using the lambda and join functions.
box_office = box_office . apply ( lambda x : ' ' . join ( x ) if type ( x ) == list else x )
# Write a regular expression to match the six elements of "form_one" of the box office data.
form_one = r '\$\s*\d+\.?\d*\s*[mb]illi?on'
# Write a regular expression to match the three elements of "form_two" of the box office data.
form_two = r '\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)'
# Add the parse_dollars function.
def parse_dollars ( s ):
# if s is not a string, return NaN
if type ( s ) != str :
return np . nan
# if input is of the form $###.# million
if re . match ( r '\$\s*\d+\.?\d*\s*milli?on' , s , flags = re . IGNORECASE ):
# remove dollar sign and " million"
s = re . sub ( '\$|\s|[a-zA-Z]' , '' , s )
# convert to float and multiply by a million
value = float ( s ) * 10 ** 6
# return value
return value
# if input is of the form $###.# billion
elif re . match ( r '\$\s*\d+\.?\d*\s*billi?on' , s , flags = re . IGNORECASE ):
# remove dollar sign and " billion"
s = re . sub ( '\$|\s|[a-zA-Z]' , '' , s )
# convert to float and multiply by a billion
value = float ( s ) * 10 ** 9
# return value
return value
# if input is of the form $###,###,###
elif re . match ( r '\$\s*\d{1,3}(?:[,\.]\d{3})+(?!\s[mb]illion)' , s , flags = re . IGNORECASE ):
# remove dollar sign and commas
s = re . sub ( '\$|,' , '' , s )
# convert to float
value = float ( s )
# return value
return value
# otherwise, return NaN
else :
return np . nan
# Clean the box office column in the wiki_movies_df DataFrame.
wiki_movies_df [ 'box_office' ] = box_office . str . extract ( f '( { form_one } | { form_two } )' , flags = re . IGNORECASE )[ 0 ]. apply ( parse_dollars )
wiki_movies_df . drop ( 'Box office' , axis = 1 , inplace = True )
# Clean the budget column in the wiki_movies_df DataFrame.
budget = wiki_movies_df [ 'Budget' ]. dropna ()
budget = budget . map ( lambda x : ' ' . join ( x ) if type ( x ) == list else x )
budget = budget . str . replace ( r '\$.*[-—–](?![a-z])' , '$' , regex = True )
budget = budget . str . replace ( r '\[\d+\]\s*' , '' )
wiki_movies_df [ 'budget' ] = budget . str . extract ( f '( { form_one } | { form_two } )' , flags = re . IGNORECASE )[ 0 ]. apply ( parse_dollars )
# Clean the release date column in the wiki_movies_df DataFrame.
release_date = wiki_movies_df [ 'Release date' ]. dropna (). apply ( lambda x : ' ' . join ( x ) if type ( x ) == list else x )
date_form_one = r '(?:January|February|March|April|May|June|July|August|September|October|November|December)\s[123]\d,\s\d{4}'
date_form_two = r '\d{4}.[01]\d.[123]\d'
date_form_three = r '(?:January|February|March|April|May|June|July|August|September|October|November|December)\s\d{4}'
date_form_four = r '\d{4}'
release_date . str . extract ( f '( { date_form_one } | { date_form_two } | { date_form_three } | { date_form_four } )' , flags = re . IGNORECASE )
wiki_movies_df [ 'release_date' ] = pd . to_datetime ( release_date . str . extract ( f '( { date_form_one } | { date_form_two } | { date_form_three } | { date_form_four } )' )[ 0 ], infer_datetime_format = True )
# Clean the running time column in the wiki_movies_df DataFrame.
running_time = wiki_movies_df [ 'Running time' ]. dropna (). apply ( lambda x : ' ' . join ( x ) if type ( x ) == list else x )
running_time_extract = running_time . str . extract ( r '(\d+)\s*ho?u?r?s?\s*(\d*)|(\d+)\s*m' )
running_time_extract = running_time_extract . apply ( lambda col : pd . to_numeric ( col , errors = 'coerce' )). fillna ( 0 )
wiki_movies_df [ 'running_time' ] = running_time_extract . apply ( lambda row : row [ 0 ] * 60 + row [ 1 ] if row [ 2 ] == 0 else row [ 2 ], axis = 1 )
wiki_movies_df . drop ( 'Running time' , axis = 1 , inplace = True )
# 2. Clean the Kaggle metadata.
kaggle_metadata = kaggle_metadata [ kaggle_metadata [ 'adult' ] == 'False' ]. drop ( 'adult' , axis = 'columns' )
kaggle_metadata [ 'video' ] = kaggle_metadata [ 'video' ] == 'True'
kaggle_metadata [ 'budget' ] = kaggle_metadata [ 'budget' ]. astype ( int )
kaggle_metadata [ 'id' ] = pd . to_numeric ( kaggle_metadata [ 'id' ], errors = 'raise' )
kaggle_metadata [ 'popularity' ] = pd . to_numeric ( kaggle_metadata [ 'popularity' ], errors = 'raise' )
kaggle_metadata [ 'release_date' ] = pd . to_datetime ( kaggle_metadata [ 'release_date' ])
# 3. Merged the two DataFrames into the movies DataFrame.
movies_df = pd . merge ( wiki_movies_df , kaggle_metadata , on = 'imdb_id' , suffixes = [ '_wiki' , '_kaggle' ])
# 4. Drop unnecessary columns from the merged DataFrame.
movies_df . drop ( columns = [ 'title_wiki' , 'release_date_wiki' , 'Language' , 'Production company(s)' ], inplace = True )
# 5. Add in the function to fill in the missing Kaggle data.
def fill_missing_kaggle_data ( df , kaggle_column , wiki_column ):
df [ kaggle_column ] = df . apply (
lambda row : row [ wiki_column ] if row [ kaggle_column ] == 0 else row [ kaggle_column ], axis = 1 )
df . drop ( columns = wiki_column , inplace = True )
# 6. Call the function in Step 5 with the DataFrame and columns as the arguments.
fill_missing_kaggle_data ( movies_df , 'runtime' , 'running_time' )
fill_missing_kaggle_data ( movies_df , 'budget_kaggle' , 'budget_wiki' )
fill_missing_kaggle_data ( movies_df , 'revenue' , 'box_office' )
# 7. Filter the movies DataFrame for specific columns.
movies_df = movies_df . loc [:, [ 'imdb_id' , 'id' , 'title_kaggle' , 'original_title' , 'tagline' , 'belongs_to_collection' , 'url' , 'imdb_link' ,
'runtime' , 'budget_kaggle' , 'revenue' , 'release_date_kaggle' , 'popularity' , 'vote_average' , 'vote_count' ,
'genres' , 'original_language' , 'overview' , 'spoken_languages' , 'Country' ,
'production_companies' , 'production_countries' , 'Distributor' ,
'Producer(s)' , 'Director' , 'Starring' , 'Cinematography' , 'Editor(s)' , 'Writer(s)' , 'Composer(s)' , 'Based on'
]]
# 8. Rename the columns in the movies DataFrame.
movies_df . rename ({ 'id' : 'kaggle_id' ,
'title_kaggle' : 'title' ,
'url' : 'wikipedia_url' ,
'budget_kaggle' : 'budget' ,
'release_date_kaggle' : 'release_date' ,
'Country' : 'country' ,
'Distributor' : 'distributor' ,
'Producer(s)' : 'producers' ,
'Director' : 'director' ,
'Starring' : 'starring' ,
'Cinematography' : 'cinematography' ,
'Editor(s)' : 'editors' ,
'Writer(s)' : 'writers' ,
'Composer(s)' : 'composers' ,
'Based on' : 'based_on'
}, axis = 'columns' , inplace = True )
# 9. Transform and merge the ratings DataFrame.
rating_counts = ratings . groupby ([ 'movieId' , 'rating' ], as_index = False ). count () \
. rename ({ 'userId' : 'count' }, axis = 1 ) \
. pivot ( index = 'movieId' , columns = 'rating' , values = 'count' )
rating_counts . columns = [ 'rating_' + str ( col ) for col in rating_counts . columns ]
movies_with_ratings_df = pd . merge ( movies_df , rating_counts , left_on = 'kaggle_id' , right_index = True , how = 'left' )
movies_with_ratings_df [ rating_counts . columns ] = movies_with_ratings_df [ rating_counts . columns ]. fillna ( 0 )
# return wiki_movies_df, movies_with_ratings_df, movies_df
# create movie database
db_string = f "postgres://postgres: { db_password } @localhost:5432/movie_data"
engine = create_engine ( db_string )
# load movie data into database
movies_df . to_sql ( name = 'movies' , con = engine , if_exists = 'replace' )
# load ratings data into database
# import raw ratings data
rows_imported = 0
# get the start_time from time.time()
start_time = time . time ()
for data in pd . read_csv ( f ' { file_dir } ratings.csv' , chunksize = 1000000 ):
print ( f 'importing rows { rows_imported } to { rows_imported + len ( data ) } ...' , end = '' )
data . to_sql ( name = 'ratings' , con = engine , if_exists = 'append' )
rows_imported += len ( data )
# add elapsed time to final print out
print ( f 'Done. { time . time () - start_time } total seconds elapsed' )
# 10. Create the path to your file directory and variables for the three files.
file_dir = '../Resource/'
# The Wikipedia data
wiki_file = f ' { file_dir } /wikipedia-movies.json'
# The Kaggle metadata
kaggle_file = f ' { file_dir } /movies_metadata.csv'
# The MovieLens rating data.
ratings_file = f ' { file_dir } /ratings.csv'
# refactor code and run.
extract_transform_load ( wiki_file , kaggle_file , ratings_file )
importing rows 0 to 1000000...Done. 137.85508799552917 total seconds elapsed
importing rows 1000000 to 2000000...Done. 273.88098549842834 total seconds elapsed
importing rows 2000000 to 3000000...Done. 406.1924214363098 total seconds elapsed
importing rows 3000000 to 4000000...Done. 539.3021867275238 total seconds elapsed
importing rows 4000000 to 5000000...Done. 671.0426962375641 total seconds elapsed
importing rows 5000000 to 6000000...Done. 802.4983911514282 total seconds elapsed
importing rows 6000000 to 7000000...Done. 934.711020231247 total seconds elapsed
importing rows 7000000 to 8000000...Done. 1065.2456183433533 total seconds elapsed
importing rows 8000000 to 9000000...Done. 1196.2444491386414 total seconds elapsed
importing rows 9000000 to 10000000...Done. 1327.53653383255 total seconds elapsed
importing rows 10000000 to 11000000...Done. 1458.803716659546 total seconds elapsed
importing rows 11000000 to 12000000...Done. 1592.3240537643433 total seconds elapsed
importing rows 12000000 to 13000000...Done. 1722.772427558899 total seconds elapsed
importing rows 13000000 to 14000000...Done. 1853.881242275238 total seconds elapsed
importing rows 14000000 to 15000000...Done. 2030.8236718177795 total seconds elapsed
importing rows 15000000 to 16000000...Done. 2162.319616317749 total seconds elapsed
importing rows 16000000 to 17000000...Done. 2290.9916903972626 total seconds elapsed
importing rows 17000000 to 18000000...Done. 2429.507855653763 total seconds elapsed
importing rows 18000000 to 19000000...Done. 2572.5861887931824 total seconds elapsed
importing rows 19000000 to 20000000...Done. 2715.2314240932465 total seconds elapsed
importing rows 20000000 to 21000000...Done. 2857.637410402298 total seconds elapsed
importing rows 21000000 to 22000000...Done. 2999.3215153217316 total seconds elapsed
importing rows 22000000 to 23000000...Done. 3146.1462848186493 total seconds elapsed
importing rows 23000000 to 24000000...Done. 3295.7891578674316 total seconds elapsed
importing rows 24000000 to 25000000...Done. 3440.9191019535065 total seconds elapsed
importing rows 25000000 to 26000000...Done. 3576.793061733246 total seconds elapsed
importing rows 26000000 to 26024289...Done. 3580.56000995636 total seconds elapsed