Based on the global marine traffic dataset, several MapReduce functions are implemented in python via ‘mrjob’.
Task 1
"""
command to run local test on screen:
python task1.py -r local data/big.csv > outputs/out1.csv
"""
from mrjob.job import MRJob
class MRWordCount(MRJob):
def mapper(self, _, line):
# split at commas
# commas found in ship & com pany names,
# will be impoartant later to remove them
# for now, they can stay cause the first two
# columns dont have commas in them
row = line.split(",")
# create unique ID out of MMSI and timestamp
identifier = row[0]+row[1]
# yield timestamp as key and the line as value
yield(identifier,line)
def reducer(self, combi, line):
# yield first entry per key since those keys
# that have multiple values are duplicates
yield(None,next(line))
if __name__ == '__main__':
MRWordCount.run()
Task 2
"""
command to run local test to file:
python task2.py -r local data/big.csv > outputs/out2.csv
"""
from mrjob.job import MRJob
def string_to_list(string):
"""
INPUT: String fro CSV
OUTPUT: list
Separates row from CSV line, respecting the string
indicators to keep correct separation in case string
contains a comma. This is necessary because Ship or
Company names might include comma.
"""
import sys
import re
substrings = []
while '"' in string:
# find indexes of first subtring
first = [m.start() for m in re.finditer(r'"',string)][0]
second = [m.start() for m in re.finditer(r'"',string)][1]
# append said substring to list
substrings.append(string[first:second])
# replace substring with placeholder
string = string.replace(string[first:second+1],"XXX")
# introduce new separator string
string = string.replace(",",";-;")
# replace placeholders with original names
for i in substrings:
string = string.replace("XXX",i,1)
# further clean up string
string = string.replace('\\','')
string = string.replace('"','')
# turn string into list based on custom separator
ls = string.split(";-;")
# keep only relevant columns (shown in documentaion)
ls = ls[:12]
# manipulate datatypes in list
ls_final = []
for i in ls:
try:
# append as float or int
if "." in i:
ls_final.append(float(i))
else:
ls_final.append(int(i))
# append as str
except ValueError:
ls_final.append(i)
# return final list
return(ls_final)
def check_validity(ls):
"""
0 - MMSI
1 - TIME
2 - LATITUDE
3 - LONGITUDE
4 - COG
5 - SOG
6 - HEADING
7 - NAVSTAT
8 - IMO
9 - NAME
10 - CALLSIGN
11 - TYPE
Condition:
MMSI and TIME and LATiTUDE and LONGITUDE are not null,
-90 < LATITUDE < 90,
-180 < LONGITUDE < 180,
TIME has 25 characters // changed to 23 since that seems to be the norm
"""
# initialize conditions
condition1 = False
condition2 = False
condition3 = False
# check for condition1
if ls[1] != 0 and ls[2] != 0 and ls[2] != 0:
condition1 = True
# check for condition2
if ls[2]>-90 and ls[2]<90 and ls[3]>-90 and ls[3]<90:
condition2 = True
# check for condition3
if len(ls[1]) == 23:
condition3 = True
# return final answer
if condition1 and condition2 and condition3:
return(True)
else:
return(False)
class MRWordCount(MRJob):
# 8 cores machine, leave 1 free
JOBCONF={'mapreduce.job.reduces':'7',}
def mapper(self, _, line):
# convert line to list
row = string_to_list(line)
# check if data is valid
status = check_validity(row)
# create unique ID from MMSI and timestamp
identifier = str(row[0])+str(row[1])
if status:
# yield MMSI as key and line-list as value
yield(identifier,row)
def reducer(self, key, value):
# Since key is individual ID from MMSI and time, output also
# does not contain duplicates
# yields unique ID as key, complete row is value
# does not contain invalid rows anymore, they have been filtered
# by the mapper
yield(key,next(value))
combiner=reducer
if __name__ == '__main__':
MRWordCount.run()
Task 3
"""
command to run local test to file:
python task3.py -r local data/big.csv > outputs/out3.csv
"""
from mrjob.job import MRJob
def string_to_list(string):
"""
INPUT: String fro CSV
OUTPUT: list
Separates row from CSV line, respecting the string
indicators to keep correct separation in case string
contains a comma. This is necessary because Ship or
Company names might include comma.
"""
import sys
import re
substrings = []
while '"' in string:
# find indexes of first subtring
first = [m.start() for m in re.finditer(r'"',string)][0]
second = [m.start() for m in re.finditer(r'"',string)][1]
# append said substring to list
substrings.append(string[first:second])
# replace substring with placeholder
string = string.replace(string[first:second+1],"XXX")
# introduce new separator string
string = string.replace(",",";-;")
# replace placeholders with original names
for i in substrings:
string = string.replace("XXX",i,1)
# further clean up string
string = string.replace('\\','')
string = string.replace('"','')
# turn string into list based on custom separator
ls = string.split(";-;")
# keep only relevant columns (shown in documentaion)
ls = ls[:12]
# manipulate datatypes in list
ls_final = []
for i in ls:
try:
# append as float or int
if "." in i:
ls_final.append(float(i))
else:
ls_final.append(int(i))
# append as str
except ValueError:
ls_final.append(i)
# return final list
return(ls_final)
def timestr_to_sec(time_str):
import dateutil.parser
parse_timestamp= dateutil.parser.parse(time_str)
seconds= int(parse_timestamp.strftime('%s'))
return(seconds)
def check_validity(ls):
"""
0 - MMSI
1 - TIME
2 - LATITUDE
3 - LONGITUDE
4 - COG
5 - SOG
6 - HEADING
7 - NAVSTAT
8 - IMO
9 - NAME
10 - CALLSIGN
11 - TYPE
Condition:
MMSI and TIME and LATiTUDE and LONGITUDE are not null,
-90 < LATITUDE < 90,
-180 < LONGITUDE < 180,
TIME has 25 characters // changed to 23 since that seems to be the norm
"""
# initialize conditions
condition1 = False
condition2 = False
condition3 = False
# check for condition1
if ls[1] != 0 and ls[2] != 0 and ls[2] != 0:
condition1 = True
# check for condition2
if ls[2]>-90 and ls[2]<90 and ls[3]>-90 and ls[3]<90:
condition2 = True
# check for condition3
if len(ls[1]) == 23:
condition3 = True
# return final answer
if condition1 and condition2 and condition3:
return(True)
else:
return(False)
class MRWordCount(MRJob):
# 8 cores machine, leave 1 free
JOBCONF={'mapreduce.job.reduces':'7',}
def mapper(self, _, line):
# convert line to list
row = string_to_list(line)
# check if data is valid
status = check_validity(row)
# create unique ID from MMSI and timestamp
identifier = str(row[0])+str(row[1])
# get unix seconds from timestring
sec = timestr_to_sec(row[1])
# yield only if validity check is positive
if status:
# yield MMSI as key and line-list as value
yield(identifier,sec)
def reducer(self, key, value):
# Since key is individual ID from MMSI and time, output also
# does not contain duplicates
# Yield seconds since UNIX time for each signal
yield(key,next(value))
combiner=reducer
if __name__ == '__main__':
MRWordCount.run()
Task 4
"""
command to run local test to file:
python task4.py -r local data/big.csv > outputs/out4.csv
"""
from mrjob.job import MRJob
def string_to_list(string):
"""
INPUT: String fro CSV
OUTPUT: list
Separates row from CSV line, respecting the string
indicators to keep correct separation in case string
contains a comma. This is necessary because Ship or
Company names might include comma.
"""
import sys
import re
substrings = []
while '"' in string:
# find indexes of first subtring
first = [m.start() for m in re.finditer(r'"',string)][0]
second = [m.start() for m in re.finditer(r'"',string)][1]
# append said substring to list
substrings.append(string[first:second])
# replace substring with placeholder
string = string.replace(string[first:second+1],"XXX")
# introduce new separator string
string = string.replace(",",";-;")
# replace placeholders with original names
for i in substrings:
string = string.replace("XXX",i,1)
# further clean up string
string = string.replace('\\','')
string = string.replace('"','')
# turn string into list based on custom separator
ls = string.split(";-;")
# keep only relevant columns (shown in documentaion)
ls = ls[:12]
# manipulate datatypes in list
ls_final = []
for i in ls:
try:
# append as float or int
if "." in i:
ls_final.append(float(i))
else:
ls_final.append(int(i))
# append as str
except ValueError:
ls_final.append(i)
# return final list
return(ls_final)
def timestr_to_sec(time_str):
import dateutil.parser
parse_timestamp= dateutil.parser.parse(time_str)
seconds= int(parse_timestamp.strftime('%s'))
return(seconds)
def check_validity(ls):
"""
0 - MMSI
1 - TIME
2 - LATITUDE
3 - LONGITUDE
4 - COG
5 - SOG
6 - HEADING
7 - NAVSTAT
8 - IMO
9 - NAME
10 - CALLSIGN
11 - TYPE
Condition:
MMSI and TIME and LATiTUDE and LONGITUDE are not null,
-90 < LATITUDE < 90,
-180 < LONGITUDE < 180,
TIME has 25 characters // changed to 23 since that seems to be the norm
"""
# initialize conditions
condition1 = False
condition2 = False
condition3 = False
# check for condition1
if ls[1] != 0 and ls[2] != 0 and ls[2] != 0:
condition1 = True
# check for condition2
if ls[2]>-90 and ls[2]<90 and ls[3]>-90 and ls[3]<90:
condition2 = True
# check for condition3
if len(ls[1]) == 23:
condition3 = True
# return final answer
if condition1 and condition2 and condition3:
return(True)
else:
return(False)
class MRWordCount(MRJob):
# 8 cores machine, leave 1 free
JOBCONF={'mapreduce.job.reduces':'7',}
def mapper(self, _, line):
# convert line to list
row = string_to_list(line)
# check if data is valid
status = check_validity(row)
# get unix seconds from timestring
sec = timestr_to_sec(row[1])
# get minute from unix
minute = int(sec/60)
# extract MMSI as ID for signal
mmsi = row[0]
# create unique identifier from MMSI and Minute
# to be used as key
identifier = "mmsi:"+str(mmsi)+"-min:"+str(minute)
# yield only if validity check is positive
if status:
# yield MMSI as key and line-list as value
yield(identifier,row)
def reducer(self, key, value):
# Since key is individual ID from MMSI and time, output also
# does not contain duplicates
# yields unique mmsi+min key and cleaned list as value
yield(key,next(value))
combiner=reducer
if __name__ == '__main__':
MRWordCount.run()
Task 5
"""
command to run local test to file:
python task5.py -r local data/big.csv > outputs/out5.csv
"""
from mrjob.job import MRJob
def string_to_list(string):
"""
INPUT: String fro CSV
OUTPUT: list
Separates row from CSV line, respecting the string
indicators to keep correct separation in case string
contains a comma. This is necessary because Ship or
Company names might include comma.
"""
import sys
import re
substrings = []
while '"' in string:
# find indexes of first subtring
first = [m.start() for m in re.finditer(r'"',string)][0]
second = [m.start() for m in re.finditer(r'"',string)][1]
# append said substring to list
substrings.append(string[first:second])
# replace substring with placeholder
string = string.replace(string[first:second+1],"XXX")
# introduce new separator string
string = string.replace(",",";-;")
# replace placeholders with original names
for i in substrings:
string = string.replace("XXX",i,1)
# further clean up string
string = string.replace('\\','')
string = string.replace('"','')
# turn string into list based on custom separator
ls = string.split(";-;")
# keep only relevant columns (shown in documentaion)
ls = ls[:12]
# manipulate datatypes in list
ls_final = []
for i in ls:
try:
# append as float or int
if "." in i:
ls_final.append(float(i))
else:
ls_final.append(int(i))
# append as str
except ValueError:
ls_final.append(i)
# return final list
return(ls_final)
def check_validity(ls):
"""
0 - MMSI
1 - TIME
2 - LATITUDE
3 - LONGITUDE
4 - COG
5 - SOG
6 - HEADING
7 - NAVSTAT
8 - IMO
9 - NAME
10 - CALLSIGN
11 - TYPE
Condition:
MMSI and TIME and LATiTUDE and LONGITUDE are not null,
-90 < LATITUDE < 90,
-180 < LONGITUDE < 180,
TIME has 25 characters // changed to 23 since that seems to be the norm
"""
# initialize conditions
condition1 = False
condition2 = False
condition3 = False
# check for condition1
if ls[1] != 0 and ls[2] != 0 and ls[2] != 0:
condition1 = True
# check for condition2
if ls[2]>-90 and ls[2]<90 and ls[3]>-90 and ls[3]<90:
condition2 = True
# check for condition3
if len(ls[1]) == 23:
condition3 = True
# return final answer
if condition1 and condition2 and condition3:
return(True)
else:
return(False)
class MRWordCount(MRJob):
# 8 cores machine, leave 1 free
JOBCONF={'mapreduce.job.reduces':'7',}
def mapper(self, _, line):
# convert line to list
row = string_to_list(line)
# check if data is valid
status = check_validity(row)
# get shipname and mmsi info
ship_name = row[9]
ship_mmsi = row[0]
# create uniqe ID key for mmsi+name combination
identifier = "name:"+str(ship_name)+"-mmsi:"+str(ship_mmsi)
# yield only if validity check is positive
if status:
# yield MMSI as key and line-list as value
yield(identifier,[ship_name,ship_mmsi])
def reducer(self, key, value):
# Since key is individual ID from MMSI and ship name, output also
# does not contain duplicates of this comboination
# yields unique mmsi+name key and relevant info as value list
yield(key,next(value))
combiner=reducer
if __name__ == '__main__':
MRWordCount.run()