GNBF5010 Course Project¶

Student id: 1155228903

Step 1: Implementation without dictionary¶

  • Workflow
In [1]:
def main_part1():
    # Example DNA sequence
    seq = ('ctccaaagaaattgtagttttcttctggcttagaggtagatcatcttggtccaatcagactgaaa'
           'tgccttgaggctagatttcagtctttgtGGCAGCTGgtgaatttctagtttgccttttcagctagggatt'
           'agctttttaggggtcccaatgcctagggagatttctaggtcctctgttccttgctgacctccaat')
    
    # Test the function with k=1 and k=2
    for k in [1, 2, 3, 4, 5]:
        counts = count_kmers_non_dict(seq, k)
        write2file_non_dict(counts, f"output_k{k}.txt")

# Implement a function that counts the number of k-mers in a DNA sequence
def count_kmers_non_dict(seq, k):
    # Convert the entire sequence to uppercase
    seq = seq.upper()
    counts = []
    for i in range(len(seq) - k + 1):
        kmer = seq[i:i+k]
        if all(n in "ATCG" for n in kmer):  # Check if kmer contains only ATCG
            found = False
            for j in range(len(counts)):
                if counts[j][0] == kmer:
                    counts[j] = (counts[j][0], counts[j][1] + 1)
                    found = True
                    break
            if not found:
                counts.append((kmer, 1))
    return counts

# Write the counts to a file
def write2file_non_dict(counts, filename):
    with open(filename, 'w') as f:
        for kmer, count in sorted(counts):  # Sort by kmer
            f.write(f"{kmer}:{count}\n")

# Run Part 1
if __name__ == "__main__":
    main_part1()
    print("<Implementation without dictionary, done!>")
<Implementation without dictionary, done!>

Step 2: Implementation with dictionary¶

  • Workflow
In [2]:
def main_part2():
    # Example DNA sequence
    seq = ('ctccaaagaaattgtagttttcttctggcttagaggtagatcatcttggtccaatcagactgaaa'
           'tgccttgaggctagatttcagtctttgtGGCAGCTGgtgaatttctagtttgccttttcagctagggatt'
           'agctttttaggggtcccaatgcctagggagatttctaggtcctctgttccttgctgacctccaat')
    
    # Test the function with k=1 and k=2
    for k in [1, 2, 3, 4, 5]:
        counts = count_kmers(seq, k)
        write2file(counts, f"output_k{k}.txt")

# Implement a function that counts the number of k-mers in a DNA sequence
def count_kmers(seq, k):
    # Convert the entire sequence to uppercase
    seq = seq.upper()
    counts = {}
    for i in range(len(seq) - k + 1):
        kmer = seq[i:i+k]
        if all(n in "ATCG" for n in kmer):  # Check if kmer contains only ATCG
            counts[kmer] = counts.get(kmer, 0) + 1
    return counts

# Write the counts to a file
def write2file(counts, filename):
    with open(filename, 'w') as f:
        for kmer, count in sorted(counts.items()):  # Sort by kmer
            f.write(f"{kmer}:{count}\n")

# Run Part 2
if __name__ == "__main__":
    main_part2()
    print("<Implementation with dictionary, done!>")
<Implementation with dictionary, done!>
  • Performance test
In [3]:
import time 

# Test the performance of the two implementations
def main():
    # Implementation without dictionary
    start_time1 = time.time()
    main_part1()
    end_time1 = time.time() 
    print("> Time for implementation without dictionary:", end_time1 - start_time1)
    # Implementation with dictionary
    start_time2 = time.time()
    main_part2()
    end_time2 = time.time() 
    print("> Time for implementation with dictionary:", end_time2 - start_time2)
    # Determine which implementation is faster
    if end_time1 - start_time1 < end_time2 - start_time2:
        print("Result: the implementation without dictionary is faster.")
    else:
        print("Result: the implementation with dictionary is faster.")

if __name__ == "__main__":
    main()
    print("<Performance test, done!>")
> Time for implementation without dictionary: 0.0055446624755859375
> Time for implementation with dictionary: 0.003008604049682617
Result: the implementation with dictionary is faster.
<Performance test, done!>

Step 3: Generalization¶

Function: count the occurrences of k-mers in a given biological sequence file.

Example Command: python count_kmers.py example_chromosome21.fasta 3 -c

  • python count_kmers.py: The command to run the script.
  • example_chromosome21.fasta: The input file in FASTA or FASTQ format.
  • 3: A single k-mer length (in this case, 3) to be counted in the input sequence file.
  • -c: The flag to compare the performance of the two k-mer counting implementations. When the comparison flag is used, it logs the time taken by each method to a log file named "log.txt".

Output file example (k=1):

A:41
C:41
G:47
T:71
In [ ]:
import time
import os
import argparse
from Bio import SeqIO

def main():
    # Parse command-line arguments
    parser = argparse.ArgumentParser(description="Count k-mers in a sequence file")
    parser.add_argument("input_file", help="Input sequence file (FASTA or FASTQ)")
    parser.add_argument("kmer_lengths", help="Comma-separated list of k-mer lengths")
    parser.add_argument("-c", action="store_true", help="Compare performance of k-mer counting functions")
    args = parser.parse_args()

    # Read sequences from input file
    input_file = args.input_file
    kmer_lengths = list(map(int, args.kmer_lengths.split(',')))
    compare = args.c
    sequences = read_sequence(input_file)

    # Count k-mers for each k-mer length
    for k in kmer_lengths:
        # Count k-mers using the implementation without dictionary
        start_time = time.time()
        total_counts = {}
        for seq in sequences:
            counts = count_kmers(seq, k)
            for kmer, count in counts.items():
                total_counts[kmer] = total_counts.get(kmer, 0) + count
        end_time = time.time()
        elapsed_time = end_time - start_time

        output_file = f"kmer_counts_k{k}.txt"
        with open(output_file, "w") as f:
            for kmer, count in total_counts.items():
                f.write(f"{kmer}:{count}\n")

        # Count k-mers using the implementation with dictionary if compare (-c) is True
        if compare and len(kmer_lengths) == 1:
            start_time_non_dict = time.time()
            total_counts_non_dict = []
            for seq in sequences:
                counts_non_dict = count_kmers_non_dict(seq, k)
                for kmer, count in counts_non_dict:
                    found = False
                    for i in range(len(total_counts_non_dict)):
                        if total_counts_non_dict[i][0] == kmer:
                            total_counts_non_dict[i] = (total_counts_non_dict[i][0], total_counts_non_dict[i][1] + count)
                            found = True
                            break
                    if not found:
                        total_counts_non_dict.append((kmer, count))
            end_time_non_dict = time.time()
            elapsed_time_non_dict = end_time_non_dict - start_time_non_dict

            # Write the results to the log file
            with open("log.txt", "a") as log_file:
                log_file.write(f"> k={k}, dict_time={elapsed_time}, non_dict_time={elapsed_time_non_dict}\n")

# Part 1: Implementation with dictionary
def count_kmers(seq, k):
    counts = {}
    for i in range(len(seq) - k + 1):
        kmer = seq[i:i+k]
        if all(n in "ATCG" for n in kmer):
            counts[kmer] = counts.get(kmer, 0) + 1
    return counts

# Part 2: Implementation without dictionary
def count_kmers_non_dict(seq, k):
    counts = []
    for i in range(len(seq) - k + 1):
        kmer = seq[i:i+k]
        if all(n in "ATCG" for n in kmer):
            found = False
            for j in range(len(counts)):
                if counts[j][0] == kmer:
                    counts[j] = (counts[j][0], counts[j][1] + 1)
                    found = True
                    break
            if not found:
                counts.append((kmer, 1))
    return counts

# Read sequences from the input file (FASTA or FASTQ)
def read_sequence(file_path):
    file_ext = os.path.splitext(file_path)[1].lower()
    if file_ext in ['.fasta', '.fa', '.fna']:
        format = 'fasta'
    elif file_ext in ['.fastq', '.fq']:
        format = 'fastq'
    else:
        raise ValueError("Unsupported file format")
    
    sequences = []
    with open(file_path, "r") as handle:
        for record in SeqIO.parse(handle, format):
            sequences.append(str(record.seq))
    return sequences

if __name__ == "__main__":
    main()

Step 4: Extra task¶

Function: Find best matches to a protein query in a protein database using a simple method based on the Jaccard index of tetrapeptide composition.

Example Command: python simple_search.py query.fasta ecoli-k12.fasta

  • python simple_search.py: The command to run the script.
  • query.fasta: The input file containing one or more protein sequences in FASTA format.
  • ecoli-k12.fasta: The input file containing the protein database to be searched against in FASTA format.

Output file example:

QUERY_ID SUBJECT_ID SIMILARITY_SCORE
Query1 Subject1 0.90
Query1 Subject2 0.85
Query1 Subject3 0.80
Query1 Subject4 0.72
Query1 Subject5 0.50
Query2 Subject1 0.98
Query2 Subject2 0.50
Query2 Subject3 0.45
...
In [ ]:
import argparse
from count_kmers import count_kmers, read_sequence
from Bio import SeqIO

def main():
    # Parse command-line arguments
    parser = argparse.ArgumentParser(description="Find best matches to a protein query in a protein database")
    parser.add_argument("query_file", help="Protein query file (FASTA)")
    parser.add_argument("database_file", help="Protein database file (FASTA)")
    args = parser.parse_args()

    # Read sequences and headers from the input files
    query_sequences = read_sequence(args.query_file)
    database_sequences = read_sequence(args.database_file)
    query_headers = read_fasta_headers(args.query_file)
    database_headers = read_fasta_headers(args.database_file)

    # Find the best matches for each query sequence in the database
    results = []
    for query_id, query_seq in enumerate(query_sequences):
        query_kmers = set(count_kmers(query_seq, 4).keys())
        # Calculate Jaccard index for each pair of query and subject sequences
        scores = []
        for subject_id, subject_seq in enumerate(database_sequences):
            subject_kmers = set(count_kmers(subject_seq, 4).keys())
            score = jaccard_index(query_kmers, subject_kmers)
            scores.append((subject_id, score))
        # Sort the scores in descending order and select the top 5 hits
        scores.sort(key=lambda x: x[1], reverse=True)
        top_hits = scores[:5]
        for subject_id, score in top_hits:
            results.append((query_headers[query_id], database_headers[subject_id], score))

    # Write the results to the results file
    with open("search_results.txt", "w") as f:
        f.write("QUERY_ID\tSUBJECT_ID\tSIMILARITY_SCORE\n")
        for result in results:
            f.write(f"{result[0]}\t{result[1]}\t{result[2]:.0f}\n")

# Calculate the Jaccard index between two sets
def jaccard_index(set1, set2):
    intersection = len(set1 & set2)
    union = len(set1 | set2)
    return (intersection / union) * 100 if union != 0 else 0

# Read FASTA headers from the input file
def read_fasta_headers(file_path):
    headers = []
    with open(file_path, "r") as handle:
        for record in SeqIO.parse(handle, "fasta"):
            headers.append(record.id)
    return headers

if __name__ == "__main__":
    main()