So it’s been over 4 months since I last updated my blog, to me, it seems a lot longer, maybe that’s to do with the Lockdown? who knows?
So, what have I been up to? except drinking lots of Beer like most of the people under Lockdown are doing. Well, I have been semi-productive, I would say pretty productive, however, compared to some of my friends (one who launched a radio station while on lockdown), I will retire to sem-productive.
Inmoov Robot
First 3 months of the year, the progress was fairly non-existent however due to the lockdown, I have come leaps and bounds. I have rebuilt the head, neck, torso, fixed or improved tons of things, required most of it, and made a stand (which blow a hole in my ceiling and nearly killed me). I am waiting for some potentiometer (Absolute pain to find the right ones) for the arms but I hope to have the biceps done soon. I have made a start printing the legs for it via my new CopyMaster 400 3D printer which is pretty cool.
I have turned my conservatory into a robotics area especially for this robot which I have found has helped a lot.
Burf.co
So on the mission to learn C# and Azure, I have completely rewritten (all 20 lines of it) Burf.co Search Engine to be Azure Functions running in Docker Containers written in C#, I actually really enjoyed doing this and again it’s come further along than the old Java/Kotlin one did in weeks versus months. I have Azure functions that serve up the results to the website, parse and index websites, crawls sites and even a chatbot for fun 🙂 (SearchAI.uk)
I have even fired up the old HP DL580 server (currently keeping the house warm) to see if I can process data faster.
CommonCrawl WET File Processing
So I decided to try and write a script that would download the WET files from the CommonCrawl (56000 files, 8TB compressed). These files contain 2.8 billion webpages or so and could be a really fun thing to process using ML etc.
Here is my V1.02 of this script, it’s hacky at best but its a start:
namespace SimonPlayGround
{
class Program
{
private const string Path = "https://commoncrawl.s3.amazonaws.com/";
private const string TargetFolder = @"z:\";
public static int Jump = 2320;
private const int Threads = 20;
private const int Block = 10000;
private const string UserId = "simon";
public static async Task Main(string[] args)
{
var client = new WebClient();
var paths = new List<string>();
var mongo = new MongoClient(new MongoClientSettings()
{
Server = new MongoServerAddress("192.168.0.150"),
MaxConnectionPoolSize = 500
});
var db = mongo.GetDatabase("WEB");
var collection = db.GetCollection<Page>("wet");
DownloadPaths(client);
foreach (var line in File.ReadLines(TargetFolder + "wet"))
{
paths.Add(line);
}
// hack to remove done ones
for (int i = 0; i < Jump; i++)
{
paths.RemoveAt(paths.Count - 1);
}
var tasks = new List<Task>();
tasks.AsParallel();
for (var i = 0; i < Threads; i++)
{
var filename = paths.Last();
tasks.Add(Task.Run(() => Process(filename, collection)));
paths.RemoveAt(paths.Count - 1);
}
while (tasks.Any())
{
await Task.WhenAny(tasks);
var finishedTasks = tasks.Where(t => t.IsCompleted).ToList();
foreach (var finishedTask in finishedTasks)
{
tasks.Remove(finishedTask);
if (paths.Count > 0)
{
var filename = paths.Last();
tasks.Add(Task.Run(() => Process(filename, collection)));
paths.RemoveAt(paths.Count - 1);
Console.WriteLine($"Left {paths.Count} {tasks.Count} {Jump}");
// todo write here the number of files done
}
}
}
}
public static async Task Process(string filename, IMongoCollection<Page> collection)
{
var file = await DownloadWetAsync(filename);
await ParseWet(file, collection);
Console.WriteLine($"FILE PROCESSED");
File.Delete(file);
Jump += 1;
// todo write here that file was completed
}
public static async Task ParseWet(string filename, IMongoCollection<Page> collection)
{
using StreamReader sr = File.OpenText(filename);
string s;
StringBuilder sb = new StringBuilder();
var foundDoc = false;
var foundURL = false;
var url = string.Empty;
var count = 0;
var pages = new List<Page>();
Console.WriteLine($"Processing {filename}");
while ((s = sr.ReadLine()) != null)
{
if (foundDoc == false && s.Equals("WARC-Type: conversion"))
{
sb.Append(s + Environment.NewLine);
foundDoc = true;
}
else if (foundDoc == true && s.Equals("WARC/1.0"))
{
var from = sb.ToString().IndexOf("Content-Length: ", StringComparison.Ordinal) + "Content-Length: ".Length;
var text = sb.ToString()[@from..sb.Length];
var body = text.Substring(text.IndexOf(Environment.NewLine, StringComparison.Ordinal) + 1);
foundDoc = false;
foundURL = false;
sb.Clear();
try
{
count += 1;
pages.Add(new Page()
{
Url = url,
Body = body
});
if (count % 1000 == 0)
{
Console.WriteLine($"Procsessed {count} {DateTime.Now}");
}
if (count == Block)
{
count = 0;
await BulkSave(pages, collection);
Console.WriteLine($"{Block} done {DateTime.Now}");
pages.Clear();
}
}
catch
{
}
}
else if (foundDoc == true)
{
sb.Append(s + Environment.NewLine);
if (foundURL == false && s.StartsWith("WARC-Target-URI: "))
{
var from = s.IndexOf("WARC-Target-URI: ", StringComparison.Ordinal) + "WARC-Target-URI: ".Length;
url = s[@from..s.Length];
foundURL = true;
}
}
}
// save any left over
if (pages.Count > 0)
{
await BulkSave(pages, collection);
}
}
public static async Task BulkSave(List<Page> pages, IMongoCollection<Page> collection)
{
try
{
var updateOneModels = pages.Select(x =>
{
var filterDefinition = Builders<Page>.Filter.Eq(p => p.Url, x.Url);
var updateDefinition = Builders<Page>.Update.SetOnInsert(p => p.Body, x.Body);
return new UpdateOneModel<Page>(filterDefinition, updateDefinition) { IsUpsert = true };
}).ToList();
var resultWrites = await collection.BulkWriteAsync(updateOneModels);
Console.WriteLine($"OK?: {resultWrites.IsAcknowledged} - Inserted Count: {resultWrites.InsertedCount} {resultWrites.ModifiedCount}");
updateOneModels.Clear();
}
catch
{
}
}
public class HttpRetryMessageHandler : DelegatingHandler
{
public HttpRetryMessageHandler(HttpClientHandler handler) : base(handler) { }
protected override Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request,
CancellationToken cancellationToken) =>
Policy
.Handle<HttpRequestException>()
.Or<TaskCanceledException>()
.OrResult<HttpResponseMessage>(x => !x.IsSuccessStatusCode)
.WaitAndRetryAsync(10, retryAttempt => TimeSpan.FromSeconds(Math.Pow(3, retryAttempt)))
.ExecuteAsync(() => base.SendAsync(request, cancellationToken));
}
public static async Task<string> DownloadWetAsync(string line)
{
var filename = line.Split('/').Last();
if (!File.Exists(TargetFolder + filename))
{
Console.WriteLine($"downloading {filename}");
using (HttpClient client = new HttpClient(new HttpRetryMessageHandler(new HttpClientHandler())))
{
using (HttpResponseMessage response = await client.GetAsync(Path + line, HttpCompletionOption.ResponseHeadersRead))
using (Stream streamToReadFrom = await response.Content.ReadAsStreamAsync())
{
using (Stream streamToWriteTo = File.Open(TargetFolder + filename, FileMode.Create))
{
await streamToReadFrom.CopyToAsync(streamToWriteTo);
}
}
}
}
else
{
Console.WriteLine($"GZ exist {filename}");
}
var wetFile = TargetFolder + filename.Substring(0, filename.Length - 3);
if (!File.Exists(wetFile))
{
Console.WriteLine($"Decompressing {filename}");
DecompressGZip(TargetFolder + filename, wetFile);
}
else
{
Console.WriteLine($"WET exist {wetFile}");
}
return wetFile;
}
public static void DownloadPaths(WebClient client)
{
client.DownloadFile("https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2020-16/wet.paths.gz", TargetFolder + "wet.gz");
DecompressGZip(TargetFolder + "wet.gz", TargetFolder + "wet");
}
public static void DecompressGZip(String fileRoot, String destRoot)
{
using FileStream fileStram = new FileStream(fileRoot, FileMode.Open, FileAccess.Read);
using GZipInputStream zipStream = new GZipInputStream(fileStram);
using StreamReader sr = new StreamReader(zipStream);
var data = sr.ReadToEnd();
File.WriteAllText(destRoot, data);
}
}
public class Page
{
[BsonId] public ObjectId Id { get; set; }
[BsonElement("url")] public string Url { get; set; }
[BsonElement("body")] public string Body { get; set; }
}
}