MapReduce for Hadoop via ‘mrjob’ in Python – Global Marine Traffic Dataset

Based on the global marine traffic dataset, several MapReduce functions are implemented in python via ‘mrjob’.

Task 1

"""
command to run local test on screen:
python task1.py -r local data/big.csv > outputs/out1.csv
"""



from mrjob.job import MRJob


class MRWordCount(MRJob):

    
    
    def mapper(self, _, line):

        # split at commas
        # commas found in ship & com pany names,
        # will be impoartant later to remove them
        # for now, they can stay cause the first two
        # columns dont have commas in them
        row = line.split(",")
        
        # create unique ID out of MMSI and timestamp
        identifier = row[0]+row[1]
        
        # yield timestamp as key and the line as value
        yield(identifier,line)
            
    
    
    def reducer(self, combi, line):
        # yield first entry per key since those keys
        # that have multiple values are duplicates
        yield(None,next(line))

        
if __name__ == '__main__':
    MRWordCount.run()

 

Task 2

"""
command to run local test to file:
python task2.py -r local data/big.csv > outputs/out2.csv
"""
from mrjob.job import MRJob

def string_to_list(string):
    """
    INPUT: String fro CSV
    OUTPUT: list
    
    Separates row from CSV line, respecting the string
    indicators to keep correct separation in case string
    contains a comma. This is necessary because Ship or 
    Company names might include comma.
    
    """
    import sys
    import re
    
    substrings = []
    while '"' in string:
        # find indexes of first subtring
        first = [m.start() for m in re.finditer(r'"',string)][0]
        second = [m.start() for m in re.finditer(r'"',string)][1]
        # append said substring to list
        substrings.append(string[first:second])
        
        # replace substring with placeholder
        string = string.replace(string[first:second+1],"XXX")
    
    # introduce new separator string
    string = string.replace(",",";-;")
    
    # replace placeholders with original names
    for i in substrings:
        string = string.replace("XXX",i,1)
    
    # further clean up string
    string = string.replace('\\','')
    string = string.replace('"','')
    
    # turn string into list based on custom separator
    ls = string.split(";-;")
    
    # keep only relevant columns (shown in documentaion)
    ls = ls[:12]
    
    # manipulate datatypes in list
    ls_final = []
    for i in ls:
        try:
            # append as float or int
            if "." in i:
                ls_final.append(float(i))
            else:
                ls_final.append(int(i))
        # append as str
        except ValueError:
            ls_final.append(i)
    
    # return final list
    return(ls_final)

def check_validity(ls):
    """
    0 - MMSI
    1 - TIME
    2 - LATITUDE
    3 - LONGITUDE
    4 - COG
    5 - SOG 
    6 - HEADING
    7 - NAVSTAT
    8 - IMO
    9 - NAME
    10 - CALLSIGN
    11 - TYPE
    
    Condition:
    MMSI and TIME and LATiTUDE and LONGITUDE are not null,
    -90 < LATITUDE < 90,
    -180 < LONGITUDE < 180,
    TIME has 25 characters // changed to 23 since that seems to be the norm
    """
    # initialize conditions
    condition1 = False
    condition2 = False
    condition3 = False
    
    # check for condition1
    if ls[1] != 0 and ls[2] != 0 and ls[2] != 0:
        condition1 = True
    
    # check for condition2
    if ls[2]>-90 and ls[2]<90 and ls[3]>-90 and ls[3]<90:
        condition2 = True
        
    # check for condition3
    if len(ls[1]) == 23:
        condition3 = True
    
    # return final answer
    if condition1 and condition2 and condition3:
        return(True)
    else:
        return(False)
    
    
class MRWordCount(MRJob):
    
    # 8 cores machine, leave 1 free
    JOBCONF={'mapreduce.job.reduces':'7',}
    
    def mapper(self, _, line):
        
        # convert line to list
        row = string_to_list(line)
        
        # check if data is valid
        status = check_validity(row)
        
        # create unique ID from MMSI and timestamp
        identifier = str(row[0])+str(row[1])
 
        if status:
            # yield MMSI as key and line-list as value
            yield(identifier,row)
            
    
    
    def reducer(self, key, value):
        # Since key is individual ID from MMSI and time, output also
        # does not contain duplicates
        
        # yields unique ID as key, complete row is value
        # does not contain invalid rows anymore, they have been filtered 
        # by the mapper
        yield(key,next(value))
        
    combiner=reducer
        

        
if __name__ == '__main__':
    MRWordCount.run()

 

Task 3

"""
command to run local test to file:
python task3.py -r local data/big.csv > outputs/out3.csv
"""
from mrjob.job import MRJob

def string_to_list(string):
    """
    INPUT: String fro CSV
    OUTPUT: list
    
    Separates row from CSV line, respecting the string
    indicators to keep correct separation in case string
    contains a comma. This is necessary because Ship or 
    Company names might include comma.
    
    """
    import sys
    import re
    
    substrings = []
    while '"' in string:
        # find indexes of first subtring
        first = [m.start() for m in re.finditer(r'"',string)][0]
        second = [m.start() for m in re.finditer(r'"',string)][1]
        # append said substring to list
        substrings.append(string[first:second])
        
        # replace substring with placeholder
        string = string.replace(string[first:second+1],"XXX")
    
    # introduce new separator string
    string = string.replace(",",";-;")
    
    # replace placeholders with original names
    for i in substrings:
        string = string.replace("XXX",i,1)
    
    # further clean up string
    string = string.replace('\\','')
    string = string.replace('"','')
    
    # turn string into list based on custom separator
    ls = string.split(";-;")
    
    # keep only relevant columns (shown in documentaion)
    ls = ls[:12]
    
    # manipulate datatypes in list
    ls_final = []
    for i in ls:
        try:
            # append as float or int
            if "." in i:
                ls_final.append(float(i))
            else:
                ls_final.append(int(i))
        # append as str
        except ValueError:
            ls_final.append(i)
    
    # return final list
    return(ls_final)

def timestr_to_sec(time_str):
    import dateutil.parser
    parse_timestamp= dateutil.parser.parse(time_str)
    seconds= int(parse_timestamp.strftime('%s'))
    return(seconds)

def check_validity(ls):
    """
    0 - MMSI
    1 - TIME
    2 - LATITUDE
    3 - LONGITUDE
    4 - COG
    5 - SOG 
    6 - HEADING
    7 - NAVSTAT
    8 - IMO
    9 - NAME
    10 - CALLSIGN
    11 - TYPE
    
    Condition:
    MMSI and TIME and LATiTUDE and LONGITUDE are not null,
    -90 < LATITUDE < 90,
    -180 < LONGITUDE < 180,
    TIME has 25 characters // changed to 23 since that seems to be the norm
    """
    # initialize conditions
    condition1 = False
    condition2 = False
    condition3 = False
    
    # check for condition1
    if ls[1] != 0 and ls[2] != 0 and ls[2] != 0:
        condition1 = True
    
    # check for condition2
    if ls[2]>-90 and ls[2]<90 and ls[3]>-90 and ls[3]<90:
        condition2 = True
        
    # check for condition3
    if len(ls[1]) == 23:
        condition3 = True
    
    # return final answer
    if condition1 and condition2 and condition3:
        return(True)
    else:
        return(False)
    
    
class MRWordCount(MRJob):
    
    # 8 cores machine, leave 1 free
    JOBCONF={'mapreduce.job.reduces':'7',}
    
    def mapper(self, _, line):
        
        # convert line to list
        row = string_to_list(line)
        
        # check if data is valid
        status = check_validity(row)
        
        # create unique ID from MMSI and timestamp
        identifier = str(row[0])+str(row[1])
        
        # get unix seconds from timestring
        sec = timestr_to_sec(row[1])
 
        # yield only if validity check is positive
        if status:
            # yield MMSI as key and line-list as value
            yield(identifier,sec)
            
    
    
    def reducer(self, key, value):
        # Since key is individual ID from MMSI and time, output also
        # does not contain duplicates
        
        # Yield seconds since UNIX time for each signal
        yield(key,next(value))
        
    combiner=reducer
        

        
if __name__ == '__main__':
    MRWordCount.run()

 

Task 4

"""
command to run local test to file:
python task4.py -r local data/big.csv > outputs/out4.csv
"""
from mrjob.job import MRJob

def string_to_list(string):
    """
    INPUT: String fro CSV
    OUTPUT: list
    
    Separates row from CSV line, respecting the string
    indicators to keep correct separation in case string
    contains a comma. This is necessary because Ship or 
    Company names might include comma.
    
    """
    import sys
    import re
    
    substrings = []
    while '"' in string:
        # find indexes of first subtring
        first = [m.start() for m in re.finditer(r'"',string)][0]
        second = [m.start() for m in re.finditer(r'"',string)][1]
        # append said substring to list
        substrings.append(string[first:second])
        
        # replace substring with placeholder
        string = string.replace(string[first:second+1],"XXX")
    
    # introduce new separator string
    string = string.replace(",",";-;")
    
    # replace placeholders with original names
    for i in substrings:
        string = string.replace("XXX",i,1)
    
    # further clean up string
    string = string.replace('\\','')
    string = string.replace('"','')
    
    # turn string into list based on custom separator
    ls = string.split(";-;")
    
    # keep only relevant columns (shown in documentaion)
    ls = ls[:12]
    
    # manipulate datatypes in list
    ls_final = []
    for i in ls:
        try:
            # append as float or int
            if "." in i:
                ls_final.append(float(i))
            else:
                ls_final.append(int(i))
        # append as str
        except ValueError:
            ls_final.append(i)
    
    # return final list
    return(ls_final)

def timestr_to_sec(time_str):
    import dateutil.parser
    parse_timestamp= dateutil.parser.parse(time_str)
    seconds= int(parse_timestamp.strftime('%s'))
    return(seconds)

def check_validity(ls):
    """
    0 - MMSI
    1 - TIME
    2 - LATITUDE
    3 - LONGITUDE
    4 - COG
    5 - SOG 
    6 - HEADING
    7 - NAVSTAT
    8 - IMO
    9 - NAME
    10 - CALLSIGN
    11 - TYPE
    
    Condition:
    MMSI and TIME and LATiTUDE and LONGITUDE are not null,
    -90 < LATITUDE < 90,
    -180 < LONGITUDE < 180,
    TIME has 25 characters // changed to 23 since that seems to be the norm
    """
    # initialize conditions
    condition1 = False
    condition2 = False
    condition3 = False
    
    # check for condition1
    if ls[1] != 0 and ls[2] != 0 and ls[2] != 0:
        condition1 = True
    
    # check for condition2
    if ls[2]>-90 and ls[2]<90 and ls[3]>-90 and ls[3]<90:
        condition2 = True
        
    # check for condition3
    if len(ls[1]) == 23:
        condition3 = True
    
    # return final answer
    if condition1 and condition2 and condition3:
        return(True)
    else:
        return(False)
    
    
class MRWordCount(MRJob):
    
    # 8 cores machine, leave 1 free
    JOBCONF={'mapreduce.job.reduces':'7',}
    
    def mapper(self, _, line):
        
        # convert line to list
        row = string_to_list(line)
        
        # check if data is valid
        status = check_validity(row)
        
        # get unix seconds from timestring
        sec = timestr_to_sec(row[1])
        # get minute from unix
        minute = int(sec/60)
        
        # extract MMSI as ID for signal
        mmsi = row[0]
        # create unique identifier from MMSI and Minute
        # to be used as key
        identifier = "mmsi:"+str(mmsi)+"-min:"+str(minute)
 
        # yield only if validity check is positive
        if status:
            # yield MMSI as key and line-list as value
            yield(identifier,row)
            
    
    
    def reducer(self, key, value):
        # Since key is individual ID from MMSI and time, output also
        # does not contain duplicates
        
        # yields unique mmsi+min key and cleaned list as value
        yield(key,next(value))
        
    combiner=reducer
        

        
if __name__ == '__main__':
    MRWordCount.run()

 

Task 5

"""
command to run local test to file:
python task5.py -r local data/big.csv > outputs/out5.csv
"""
from mrjob.job import MRJob

def string_to_list(string):
    """
    INPUT: String fro CSV
    OUTPUT: list
    
    Separates row from CSV line, respecting the string
    indicators to keep correct separation in case string
    contains a comma. This is necessary because Ship or 
    Company names might include comma.
    
    """
    import sys
    import re
    
    substrings = []
    while '"' in string:
        # find indexes of first subtring
        first = [m.start() for m in re.finditer(r'"',string)][0]
        second = [m.start() for m in re.finditer(r'"',string)][1]
        # append said substring to list
        substrings.append(string[first:second])
        
        # replace substring with placeholder
        string = string.replace(string[first:second+1],"XXX")
    
    # introduce new separator string
    string = string.replace(",",";-;")
    
    # replace placeholders with original names
    for i in substrings:
        string = string.replace("XXX",i,1)
    
    # further clean up string
    string = string.replace('\\','')
    string = string.replace('"','')
    
    # turn string into list based on custom separator
    ls = string.split(";-;")
    
    # keep only relevant columns (shown in documentaion)
    ls = ls[:12]
    
    # manipulate datatypes in list
    ls_final = []
    for i in ls:
        try:
            # append as float or int
            if "." in i:
                ls_final.append(float(i))
            else:
                ls_final.append(int(i))
        # append as str
        except ValueError:
            ls_final.append(i)
    
    # return final list
    return(ls_final)


def check_validity(ls):
    """
    0 - MMSI
    1 - TIME
    2 - LATITUDE
    3 - LONGITUDE
    4 - COG
    5 - SOG 
    6 - HEADING
    7 - NAVSTAT
    8 - IMO
    9 - NAME
    10 - CALLSIGN
    11 - TYPE
    
    Condition:
    MMSI and TIME and LATiTUDE and LONGITUDE are not null,
    -90 < LATITUDE < 90,
    -180 < LONGITUDE < 180,
    TIME has 25 characters // changed to 23 since that seems to be the norm
    """
    # initialize conditions
    condition1 = False
    condition2 = False
    condition3 = False
    
    # check for condition1
    if ls[1] != 0 and ls[2] != 0 and ls[2] != 0:
        condition1 = True
    
    # check for condition2
    if ls[2]>-90 and ls[2]<90 and ls[3]>-90 and ls[3]<90:
        condition2 = True
        
    # check for condition3
    if len(ls[1]) == 23:
        condition3 = True
    
    # return final answer
    if condition1 and condition2 and condition3:
        return(True)
    else:
        return(False)
    
    
class MRWordCount(MRJob):
    
    # 8 cores machine, leave 1 free
    JOBCONF={'mapreduce.job.reduces':'7',}
    
    def mapper(self, _, line):
        
        # convert line to list
        row = string_to_list(line)
        
        # check if data is valid
        status = check_validity(row)
        
        # get shipname and mmsi info
        ship_name = row[9]
        ship_mmsi = row[0]
        
        # create uniqe ID key for mmsi+name combination
        identifier = "name:"+str(ship_name)+"-mmsi:"+str(ship_mmsi)
 
        # yield only if validity check is positive
        if status:
            # yield MMSI as key and line-list as value
            yield(identifier,[ship_name,ship_mmsi])
            
            
    
    
    def reducer(self, key, value):
        # Since key is individual ID from MMSI and ship name, output also
        # does not contain duplicates of this comboination
        
        # yields unique mmsi+name key and relevant info as value list
        yield(key,next(value))
        
    combiner=reducer
        

        
if __name__ == '__main__':
    MRWordCount.run()

 
Further Reading
Recent Updates