When I released version 3.0 of Ookii.CommandLine last year, I wasn't expecting to have another major release so soon. I still had some ideas for new features, like supporting automatic aliases based on prefixes of command line arguments, but they'd only warrant minor revisions.
But then, I started reading about .Net source generators, and I thought, that's something I could use. Ookii.CommandLine used reflection to determine the command line arguments at runtime, so a source generator would be able to do the same job at compile time. That would make it faster, make the library trim-friendly, and allow me to emit warnings and errors for unsupported argument settings, define positional arguments based on the order of the class members rather than explicit numbers, and extract default values from property initializers.
Ookii.CommandLine 4.0 now gives you that option. You can apply the
GeneratedParserAttribute
to any class that defines command line arguments, and the parser will
be generated at compile time.
[GeneratedParser] partial class MyArguments { [CommandLineArgument(IsPositional = true)] [Description("A required positional argument.")] public required string Required { get; set; } [CommandLineArgument(IsPositional = true)] [Description("An optional positional argument.")] public int Optional { get; set; } = 42; [CommandLineArgument] [Description("An argument that can only be supplied by name.")] public DateTime Named { get; set; } [CommandLineArgument] [Description("A switch argument, which doesn't require a value.")] public bool Switch { get; set; } }
That still might not have warranted a major version bump, if trimming hadn't been a goal.
Ookii.CommandLine relied on the TypeConverter
class to convert argument values from strings to
their defined type, and I soon discovered that determining the right TypeConverter
for any
type at compile time wasn't really possible, and calling TypeDescriptor.GetConverter()
at
runtime was inherently not compatible with trimming an application. I guess this is why
System.CommandLine limits their supported types to just a chosen handful, and everything else
requires custom converters.
The solution was ArgumentConverter
, a new way to convert argument values, that replaces
TypeConverter
. Not only is it trim-friendly, it also enables converting arguments using
ReadOnlySpan<char>
, which means that the parser can avoid allocating strings when splitting
arguments provided like -Name:value
, another performance improvement.
However, it does mean that any command line arguments classes that relied on a TypeConverter
will no longer function correctly without modifications, so the major version needed to change
again. While I was at it, I made a few other breaking changes, the biggest of which was probably the
removal of constructor parameters as a way to define command line arguments. Now, properties and
methods are your only choice. I was never a big fan of the way constructor parameters worked, and
how clumsy it was to add attributes to them. They were already de-emphasized in the documentation,
and were mostly left over from version 1.0 (sadly lost to time), where they were the only way to
define positional arguments. The work needed to support them with source generation meant they ended
up on the chopping block this time.
For these and other breaking changes, check the migration guide. As with version 3.0, most users probably won't need to make many changes.
Besides source generation and new argument converters, Ookii.CommandLine 4.0 gives you plenty of
other goodness, including the aforementioned automatic prefix aliases, support for C# 11 required
properties, a new attribute for value descriptions that allows for localization, canceling parsing
with success and access the remaining arguments afterwards, support for nested subcommands, source
link integration, and more.
See the change log for everything that's new.
Get it on NuGet or GitHub. You can also try out the parser or subcommands on .Net Fiddle.
This is fifth article about Jumbo.
Previously, we looked at how Jumbo required you to create your own job configurations, and how I tried to alleviate that with helpers for common job structures.
I wanted a way to make this easier, and while I didn't want to use a separate language like Hadoop's Pig did, I was inspired by its simple, imperative nature: you just specify a sequence of operations, and it turns that into a sequence of MapReduce jobs.
The JobBuilder would end up doing the same thing for Jumbo, but instead you used a sequence of method calls in ordinary C# code. These would be 'compiled' into a job configuration.
This had two key goals:
The first was accomplished by having the JobBuilder define helper functions for certain operations,
such as AccumulateRecords
, that knew how to create the kind of job structure appropriate for that
operation. That may not seem much better than the old AccumulatorTask
approach, but it had one
key advantage: you could now incorporate this operation in a larger job structure, rather than that
structure having to be your entire job.
It also meant the JobBuilder could apply certain heuristics, such as: if there is only one input task, you don't need a second aggregation stage; everything can be done locally. It's this kind of logic that lets WordCount in the quick start guide run in a single task if the input is small.
The second goal had some complications. I didn't really want to give Jumbo Jet's TaskExecutionUtility (the thing responsible for most of the logic of how to run tasks) the ability to invoke arbitrary functions. It would break too much of the logic of how tasks were constructed, and would require a lot of bespoke code for different types of task methods. No, I wanted to stick to ITask<TInput, TOutput> at that level, which meant JobBuilder had to translate between the two.
That meant dynamically generating MSIL to create task classes that invoked the methods the user
specified. These classes would be generated at job submission time, saved to a generated assembly,
which the TaskExecutionUtility
could just load like normal task type without being aware of any of
this. It was a fun challenge to figure out how to do this, and it made it much easier to create
custom tasks.
The only limitation is that, unfortunately, if the method you used for your task was not public static
,
I couldn't emit a direct call in the generated MSIL. In that case, I had to serialize the delegate
you pass in, and during task execution, deserialize it and call the method through that, which is
much slower. Unfortunately, this meant that using lambdas (which never generate a public method) was
possible, but not ideal.
The first iteration of the JobBuilder meant you could now write code like this to create a job:
var builder = new JobBuilder(); var input = builder.CreateRecordReader<Utf8StringWritable>(_inputPath, typeof(LineRecordReader)); var collector = new RecordCollector<KeyValuePairWritable<Utf8StringWritable, Int32Writable>>(null, null, _combinerTasks == 0 ? null : (int?)_combinerTasks); builder.ProcessRecords(input, collector.CreateRecordWriter(), WordCount); var output = builder.CreateRecordWriter<KeyValuePairWritable<Utf8StringWritable, Int32Writable>>(_outputPath, typeof(TextRecordWriter<KeyValuePairWritable<Utf8StringWritable, Int32Writable>>), BlockSize, ReplicationFactor); builder.AccumulateRecords(collector.CreateRecordReader(), output, WordCountAccumulator); ... [AllowRecordReuse] public static void WordCount(RecordReader<Utf8StringWritable> input, RecordWriter<KeyValuePairWritable<Utf8StringWritable, Int32Writable>> output) { ... } [AllowRecordReuse] public static void WordCountAccumulator(Utf8StringWritable key, Int32Writable value, Int32Writable newValue) { ... }
That was better: no more task classes, no more explicitly creating child stages, and the job was
defined by basically two function calls: ProcessRecords
, and AccumulateRecords
. The JobBuilder
also kept track of all the assemblies used by each record reader, writer, partitioner, and task you
used, and made sure they, and all their non-framework dependencies, would be uploaded to the DFS.
There were two things I didn't like about this, though: all the explicit generic type parameters,
and the fact that you have to define your input, output, and channels (through the RecordCollector
class) explicitly before calling processing functions. It kind of meant you had to think about your
stages backwards, because you had to define their output before you invoked their processing
functions.
I wasn't quite satisfied with this. It still wasn't as easy as I wanted it to be. I tried to improve things by just adding even more specific helper methods, so you could write WordCount like this:
var input = new DfsInput(_inputPath, typeof(WordRecordReader)); var output = CreateDfsOutput(_outputPath, typeof(TextRecordWriter<Pair<Utf8String, int>>)); StageBuilder[] stages = builder.Count<Utf8String>(input, output); ((Channel)stages[0].Output).PartitionCount = _combinerTasks; stages[0].StageId = "WordCountStage"; stages[1].StageId = "WordCountSumStage";
Which was kind of cheating (since I hadn't solved the problem, I just hid it), and the way you customized things like the stage IDs and task count wasn't exactly great.
Eventually, I had an idea that completely changed how the job builder worked. Instead of having to define channels and outputs up front, each operation could directly serve as the input of the next operation, and if that created a channel, the resulting object could be used to customize that channel as well as the stage itself.
This new JobBuilder still had helpers for specific types of operations that could use custom logic (such as adding the local aggregation step), as well as helpers for custom channel types (sorting), and for complex operations like joins. And of course, you could still use custom task classes or manually define child stages if none of the helpers sufficed, combining them seamlessly with the helpers.
Finally, you could specify every step in a straight-forward way:
var job = new JobBuilder(); var input = job.Read(InputPath, typeof(LineRecordReader)); var words = job.Map<Utf8String, Pair<Utf8String, int>>(input, MapWords); var aggregated = job.GroupAggregate<Utf8String, int>(words, AggregateCounts); job.Write(aggregated, OutputPath, typeof(TextRecordWriter<>));
This was the kind of simplicity I wanted. There were no more specific job helpers, and every step was explicit while still being easy to use. Even emulating MapReduce was now simple:
var job = new JobBuilder(); var input = job.Read(InputPath, typeof(LineRecordReader)); var mapped = job.Map<Utf8String, Pair<Utf8String, int>>(input, MapWords); var sorted = job.SpillSortCombine<Utf8String, int>(mapped, ReduceWordCount); var reduced = job.Reduce<Utf8String, int, Pair<Utf8String, int>>(sorted, ReduceWordCount); job.Write(reduced, OutputPath, typeof(TextRecordWriter<>));
Here, SpillSortCombine
configures the channel in between the map and reduce, without the user
needing to know that's what it does. And despite it being an operation that has no stages of its
own, you can just apply it directly to DFS input and output; the JobBuilder makes sure a valid job
structure is created.
The only limitation I could never get away from is the need for explicit generic type arguments.
I was able to make things a bit easier; everywhere you specify a type directly, whether record
readers, writers, partitioners, or even tasks (if not using methods), you can specify the "open"
generic type (like typeof(TextRecordWriter<>)
) above, and the JobBuilder will instantiate it with
the correct type for the records.
However, for the actual processing methods, where they use delegates, it was always necessary to
specify both the input and output types explicitly, because C# isn't able to determine the type
arguments from the arguments of a delegate target (only from the return type). LINQ doesn't suffer
from this problem because the type of the input items is known, because you pass in an
IEnumerable<T>
or IQueryable<T>
. Whereas the types of input
, words
, etc. above (all
implementing IStageInput
) are not generic, so can't be used to deduce the type.
The only improvement I made was shortening a bunch of type names. Utf8StringWritable
became
Utf8String
, KeyValuePairWritable
became Pair
, and Int32Writable
(and a bunch of similar
wrappers) went away in favor of being able to directly use types like int
.
So, why not make IStageInput
generic? Now the delegates work, but you have to specify explicit
types in other places, such as Read
above, and when calling Process
with a custom task class.
There is no way to derive the record type for job.Read(InputPath, typeof(LineRecordReader))
. I
thought of using a constraint like this:
public IStageInput<TRecord> Read<TRecordReader, TRecord>(string inputPath) where TRecordReader : RecordReader<TRecord>
But, you still have to explicitly specify both generic arguments in that case.
Maybe I could do better now, if I gave this some more thought. I can't immediately think of anything that wouldn't have equal drawbacks, though. And, since I'm not developing Jumbo anymore, this is how it will stay.
Still, despite that little annoyance, I accomplished most of what I wanted with the final version of the JobBuilder. It feels like a pretty natural way of defining jobs, and even if you want to customize settings for stages and channels (for example, see advanced WordCount), it works pretty well
It's probably my favorite aspect of Jumbo, something that's really different from anything in Hadoop (at least at the time), and I'm pretty proud of it. And, before I released Jumbo, nobody really knew about it. Since it had no real value to my research, I never published about it. It was only mentioned on one page in my dissertation, but that was it.
That's why I wanted to release Jumbo. It's why I wanted to write these articles about it. Even if nobody cares, the work I did on this is at least preserved now. And that's worth something, even if only to me.
This is fourth article about Jumbo.
From the moment I started working on Jumbo, I knew I wanted two things: I wanted it to be more flexible than plain MapReduce, and I still wanted it to be easy to use.
While I didn't want to be limited to MapReduce, I also didn't just want to allow arbitrary task graphs, because I felt that was too complex for what I wanted to do. By abstracting MapReduce into a linear sequence of "stages," of which you could have an arbitrary number, I feel like I struck a good balance between allowing alternative job structures, while still not getting too complicated.
Add to that things like making channel operations (such as sorting) optional, and suddenly you could use hash-table aggregation, you could do in one job things that in MapReduce would've required several jobs, and by allowing multiple input stages, you could do things that were traditionally hard to emulate in MapReduce, such as joins.
Still, this added flexibility did come with some complexity. In Hadoop, you write a map function, a reduce function, set up a simple configuration specifying what they are, and you're done. With Jumbo, you had to put a little more thought into what kind of structure is right for your job, and creating a JobConfiguration was a bit more complex.
From the earliest versions of Jumbo Jet I wrote, the job configuration was always an XML file, and
it was always a serialization of the JobConfiguration
class. However, the structure of that format changed quite a bit. Originally, you needed to add each
task individually, which meant that if you had an input file with a 1000 blocks, you needed to add
a 1000 TaskConfiguration
elements to the configuration.
Not exactly scalable.
Eventually, I switched this to having a StageConfiguration
instead, and letting the JobServer
figure things out from there. But, you still needed to essentially build the job structure manually.
At first, this still meant manually creating each stage and channel. Later, I added helpers, which
meant that creating WordCount's configuration would look something like this:
var config = new JobConfiguration(new[] { Assembly.GetExecutingAssembly() }); var input = dfsClient.NameServer.GetFileSystemEntryInfo("/input"); var firstStage = config.AddInputStage("WordCount", input, typeof(WordCountTask), typeof(LineRecordReader)); var localAggregation = config.AddPointToPointStage("LocalAggregation", firstStage, typeof(WordCountAggregationTask), ChannelType.Pipeline, null, null); var info = new InputStageInfo(localAggregation) { ChannelType = ChannelType.File, PartitionerType = typeof(HashPartition<Pair<Utf8String, int>>); }; config.AddStage("WordCountAggregation", localAggregation, typeof(WordCountAggregationTask), taskCount, info, "/wcoutput", typeof(TextRecordWriter)); var job = jetClient.JobServer.CreateJob(); jetClient.RunJob(job, config, dfsClient, new[] { Assembly.GetExecutingAssembly().Location });
Maybe it's not terrible, but it's not super friendly (and this was already better than it started
out at). I tried to improve things by creating helpers for specific job types; a job like WordCount
was an AccumulatorJob
; a two-stage job with optional sorting (like MapReduce) was a BasicJob
.
This worked, but kind of left you hanging if you wanted a different job structure.
Some things got easier quickly; instead of manually using the JetClient
class, I introduced the
concept of a job runner, a special class use by JetShell. But defining the structure of the job
stayed like this, unless you could use a helper for a predefined job structure, for a long time.
One of my first ideas for how to make creating jobs easier was to adapt LINQ (which was pretty new at the time). I even thought that something like that could have potential for publication. Unfortunately, Microsoft itself beat me to the punch by publishing a paper on DryadLINQ, so that was no longer an option.
After that, I no longer saw this as something I could publish a paper about, but just for my own gratification I still wanted a better way to create jobs.
I thought about alternative approaches. I could still do LINQ, but it would be complicated, and without a research purpose I didn't want to invest that kind of time. Hadoop had its own methods; while single jobs were easy, complex processing that required multiple jobs or joins was still hard even there, and the best solution at the time was Pig, which used a custom programming language, Pig Latin, for creating jobs. I didn't much like that either, since you'd often have to combine both Pig Latin and Java, and that didn't seem like a good option to me.
No, I wanted something that kept you in C#, and the solution I came up with was the JobBuilder, which we'll discuss next time.
When I released Ookii.CommandLine for C++, I realized I had quite a backlog of things I wanted to update in Ookii.CommandLine, not just for the C++ version, but for the .Net version as well.
The result is the release of Ookii.CommandLine 3.0 for .Net. This is the biggest release of Ookii.CommandLine yet, with many new features, including support for an additional, more POSIX-like argument syntax, argument validation and dependencies, automatic name transformations, an updated subcommand API, usage help color output, more powerful customization, and more.
Seriously, there's a lot. Even version 2.0, which was a substantial rewrite from the original, wasn't anywhere near as big. Unfortunately, that does mean there's some breaking changes, but I expect most users won't need to make too many changes.
One question you might have is, when is all this new stuff coming to the C++ version? Unfortunately, I don't have a good answer. I'll probably add at least some of the new features to the C++ version, but it probably won't be all at once, and I'm not going to give a timeline either. If there's any feature you want in particular, you should file an issue for it.
Get it on NuGet or GitHub. You can also try it out on .Net Fiddle, or try out subcommands.
This is the third article about Jumbo.
One of the things I knew I would need for Jumbo was a way for clients and servers to communicate. Clients would have to communicate with the servers, but servers also had to communicate with each other, mostly through heartbeats.
In some cases, writing a custom TCP server and protocol made sense, like client communication with the data server where large amounts of data had to be processed. But for most things, it was just a matter of invoking some function on the target server, like when creating a file on the NameServer or submitting a job to the JobServer. I figured it would be overkill to create completely custom protocols for this. In fact, being able to easily do this was one of the reasons I picked .Net.
WCF existed at the time, but I wasn't very familiar with it, and I'm not sure if Mono supported it
back then. So, I decided to use good, old-fashioned .Net Remoting. All I had to do was make some interfaces,
like INameServerClientProtocol
,
inherit from MarshalByRefObject
, set up some minimal configuration, and I was in business. This
seemed to work great, both on .Net and Mono.
All was well, until I started scaling up on Linux clusters. Suddenly, I was faced with extremely long delays in remoting calls. This was particularly noticeable with heartbeats, which sometimes ended up taking tens of seconds to complete. Now, I wasn't running thousands of nodes, where scaling issues might be expected; I was running maybe forty at the most. A remoting call that takes a few milliseconds to complete shouldn't suddenly take 10+ seconds in that environment.
It took a lot of digging, but I eventually found the cause: it was a bug in Mono. Mono's remoting TCP server
channel implementation used a thread pool to dispatch requests. Now, the .Net BCL has a built-in thread pool class,
but remoting didn't use that. It used its own RemotingThreadPool
, which was used nowhere else.
This thread pool had a fatal flaw. Once it reached a specific number of active threads, on the next request it would wait 500ms for an existing thread to become available, before creating a new one.
That's already not great. Worse, it did this synchronously on the thread that accepts new connections! Which meant that these delays would stack! If a connection attempt comes in, and one is already waiting, it can't accept this connection until the waiting one is done. And if that one then also hits the limit...
Basically, if you get 10 new connection attempts when the pool is already at the limit, the last one will end up waiting 5 seconds, instead of 500ms. This was the cause of my scalability problems, with a few dozen nodes all trying to send heartbeats to the single NameServer and JobServer.
I reported this bug, but it never got fixed. In fact, this problem is still in the code today, at least as of this writing.
I created my own patched version of Mono, which worked around the issue (I think I just removed the delay). But, needing a custom-built Mono wasn't great for the usability of the project. I eventually ended up writing my own RPC implementation, using Begin/End asynchronous methods, which performed better. Still, I refused to merge this into the main branch (trunk, since this was SVN), still waiting for a Mono fix that would never come.
Eventually, I did switch to using the custom RPC implementation permanently, because if I ever did want to release Jumbo, requiring users to patch and compile Mono wasn't really an option. And, it was probably for the better, since .Net Core no longer has remoting, so I would've needed a different solution anyway when making the port.
And yes, just like .Net Remoting, my custom RPC mechanism depends on BinaryFormatter, because it works much the same. And BinaryFormatter is deprecated and insecure. Since I have no interest in further developing Jumbo, that will remain that way, so do consider that if you run Jumbo on any of your machines.