Its been a while

So it’s been over 4 months since I last updated my blog, to me, it seems a lot longer, maybe that’s to do with the Lockdown? who knows?

So, what have I been up to? except drinking lots of Beer like most of the people under Lockdown are doing.  Well, I have been semi-productive, I would say pretty productive, however, compared to some of my friends (one who launched a radio station while on lockdown), I will retire to sem-productive.

Inmoov Robot

First 3 months of the year, the progress was fairly non-existent however due to the lockdown, I have come leaps and bounds.  I have rebuilt the head, neck, torso, fixed or improved tons of things, required most of it, and made a stand (which blow a hole in my ceiling and nearly killed me).  I am waiting for some potentiometer (Absolute pain to find the right ones) for the arms but I hope to have the biceps done soon.  I have made a start printing the legs for it via my new CopyMaster 400 3D printer which is pretty cool.

I have turned my conservatory into a robotics area especially for this robot which I have found has helped a lot.

Burf.co

So on the mission to learn C# and Azure, I have completely rewritten (all 20 lines of it) Burf.co Search Engine to be Azure Functions running in Docker Containers written in C#,  I actually really enjoyed doing this and again it’s come further along than the old Java/Kotlin one did in weeks versus months.  I have Azure functions that serve up the results to the website, parse and index websites, crawls sites and even a chatbot for fun 🙂 (SearchAI.uk)

I have even fired up the old HP DL580 server (currently keeping the house warm) to see if I can process data faster.

CommonCrawl WET File Processing

So I decided to try and write a script that would download the WET files from the CommonCrawl (56000 files, 8TB compressed).  These files contain 2.8 billion webpages or so and could be a really fun thing to process using ML etc.

Here is my V1.02 of this script, it’s hacky at best but its a start:

 

namespace SimonPlayGround
{
    class Program
    {
        private const string Path = "https://commoncrawl.s3.amazonaws.com/";
        private const string TargetFolder = @"z:\";
        public static int Jump = 2320;
        private const int Threads = 20;
        private const int Block = 10000;
        private const string UserId = "simon";

        public static async Task Main(string[] args)
        {
            var client = new WebClient();
            var paths = new List<string>();

            var mongo = new MongoClient(new MongoClientSettings()
            {
                Server = new MongoServerAddress("192.168.0.150"),
                MaxConnectionPoolSize = 500
            });

            var db = mongo.GetDatabase("WEB");
            var collection = db.GetCollection<Page>("wet");

            DownloadPaths(client);

            foreach (var line in File.ReadLines(TargetFolder + "wet"))
            {
                paths.Add(line);
            }

            // hack to remove done ones
            for (int i = 0; i < Jump; i++)
            {
                paths.RemoveAt(paths.Count - 1);
            }

            var tasks = new List<Task>();
            tasks.AsParallel();

            for (var i = 0; i < Threads; i++)
            {
                var filename = paths.Last();
                tasks.Add(Task.Run(() => Process(filename, collection)));
                paths.RemoveAt(paths.Count - 1);
            }

            while (tasks.Any())
            {
                await Task.WhenAny(tasks);
                var finishedTasks = tasks.Where(t => t.IsCompleted).ToList();
                foreach (var finishedTask in finishedTasks)
                {
                    tasks.Remove(finishedTask);
                    if (paths.Count > 0)
                    {
                        var filename = paths.Last();
                        tasks.Add(Task.Run(() => Process(filename, collection)));
                        paths.RemoveAt(paths.Count - 1);
                        Console.WriteLine($"Left {paths.Count} {tasks.Count} {Jump}");
                        // todo write here the number of files done
                    }
                }
            }
        }

        public static async Task Process(string filename, IMongoCollection<Page> collection)
        {
            var file = await DownloadWetAsync(filename);

            await ParseWet(file, collection);
            Console.WriteLine($"FILE PROCESSED");
            File.Delete(file);
            Jump += 1;
            // todo write here that file was completed
        }

        public static async Task ParseWet(string filename, IMongoCollection<Page> collection)
        {
            using StreamReader sr = File.OpenText(filename);
            string s;
            StringBuilder sb = new StringBuilder();
            var foundDoc = false;
            var foundURL = false;
            var url = string.Empty;
            var count = 0;
            var pages = new List<Page>();

            Console.WriteLine($"Processing {filename}");

            while ((s = sr.ReadLine()) != null)
            {
                if (foundDoc == false && s.Equals("WARC-Type: conversion"))
                {
                    sb.Append(s + Environment.NewLine);
                    foundDoc = true;
                }
                else if (foundDoc == true && s.Equals("WARC/1.0"))
                {
                    var from = sb.ToString().IndexOf("Content-Length: ", StringComparison.Ordinal) + "Content-Length: ".Length;
                    var text = sb.ToString()[@from..sb.Length];
                    var body = text.Substring(text.IndexOf(Environment.NewLine, StringComparison.Ordinal) + 1);
                    foundDoc = false;
                    foundURL = false;
                    sb.Clear();

                    try
                    {
                        count += 1;
                        pages.Add(new Page()
                        {
                            Url = url,
                            Body = body
                        });

                        if (count % 1000 == 0)
                        {
                            Console.WriteLine($"Procsessed {count} {DateTime.Now}");
                        }

                        if (count == Block)
                        {
                            count = 0;
                            await BulkSave(pages, collection);
                            Console.WriteLine($"{Block} done {DateTime.Now}");
                            pages.Clear();
                        }
                    }
                    catch
                    {

                    }
                }
                else if (foundDoc == true)
                {
                    sb.Append(s + Environment.NewLine);

                    if (foundURL == false && s.StartsWith("WARC-Target-URI: "))
                    {
                        var from = s.IndexOf("WARC-Target-URI: ", StringComparison.Ordinal) + "WARC-Target-URI: ".Length;
                        url = s[@from..s.Length];
                        foundURL = true;
                    }
                }
            }

            // save any left over
            if (pages.Count > 0)
            {
                await BulkSave(pages, collection);
            }
        }

        public static async Task BulkSave(List<Page> pages, IMongoCollection<Page> collection)
        {
            try
            {
                var updateOneModels = pages.Select(x =>
                {
                    var filterDefinition = Builders<Page>.Filter.Eq(p => p.Url, x.Url);
                    var updateDefinition = Builders<Page>.Update.SetOnInsert(p => p.Body, x.Body);

                    return new UpdateOneModel<Page>(filterDefinition, updateDefinition) { IsUpsert = true };
                }).ToList();

                var resultWrites = await collection.BulkWriteAsync(updateOneModels);
                Console.WriteLine($"OK?: {resultWrites.IsAcknowledged} - Inserted Count: {resultWrites.InsertedCount} {resultWrites.ModifiedCount}");

                updateOneModels.Clear();

            }
            catch
            {

            }
        }
        public class HttpRetryMessageHandler : DelegatingHandler
        {
            public HttpRetryMessageHandler(HttpClientHandler handler) : base(handler) { }

            protected override Task<HttpResponseMessage> SendAsync(
                HttpRequestMessage request,
                CancellationToken cancellationToken) =>
                Policy
                    .Handle<HttpRequestException>()
                    .Or<TaskCanceledException>()
                    .OrResult<HttpResponseMessage>(x => !x.IsSuccessStatusCode)
                    .WaitAndRetryAsync(10, retryAttempt => TimeSpan.FromSeconds(Math.Pow(3, retryAttempt)))
                    .ExecuteAsync(() => base.SendAsync(request, cancellationToken));
        }

        public static async Task<string> DownloadWetAsync(string line)
        {
            var filename = line.Split('/').Last();

            if (!File.Exists(TargetFolder + filename))
            {
                Console.WriteLine($"downloading {filename}");

                using (HttpClient client = new HttpClient(new HttpRetryMessageHandler(new HttpClientHandler())))
                {
                    using (HttpResponseMessage response = await client.GetAsync(Path + line, HttpCompletionOption.ResponseHeadersRead))

                    using (Stream streamToReadFrom = await response.Content.ReadAsStreamAsync())
                    {
                        using (Stream streamToWriteTo = File.Open(TargetFolder + filename, FileMode.Create))
                        {
                            await streamToReadFrom.CopyToAsync(streamToWriteTo);
                        }
                    }
                }
            }
            else
            {
                Console.WriteLine($"GZ exist {filename}");
            }

            var wetFile = TargetFolder + filename.Substring(0, filename.Length - 3);

            if (!File.Exists(wetFile))
            {
                Console.WriteLine($"Decompressing {filename}");
                DecompressGZip(TargetFolder + filename, wetFile);
            }
            else
            {
                Console.WriteLine($"WET exist {wetFile}");
            }

            return wetFile;
        }

        public static void DownloadPaths(WebClient client)
        {
            client.DownloadFile("https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2020-16/wet.paths.gz", TargetFolder + "wet.gz");
            DecompressGZip(TargetFolder + "wet.gz", TargetFolder + "wet");
        }


        public static void DecompressGZip(String fileRoot, String destRoot)
        {
            using FileStream fileStram = new FileStream(fileRoot, FileMode.Open, FileAccess.Read);
            using GZipInputStream zipStream = new GZipInputStream(fileStram);
            using StreamReader sr = new StreamReader(zipStream);
            var data = sr.ReadToEnd();
            File.WriteAllText(destRoot, data);
        }
    }

    public class Page
    {
        [BsonId] public ObjectId Id { get; set; }
        [BsonElement("url")] public string Url { get; set; }
        [BsonElement("body")] public string Body { get; set; }

    }
}

Leave a Reply