SAX-like Xml parsing
For those of you who don’t know or can’t remember, SAX (Simple API for XML) is a technology for reading Xml in an event based way. Instead of loading your Xml into a great big DOM and looping thru it or plucking out nodes via XPath or LINQ to XML, SAX allows the Xml parser to notify your application as new nodes were encountered. SAX (like XmlReader, is a forward-only Xml reader) and efficient for parsing large and streaming chunks of Xml.
Reactive extensions fits well with the SAX way of thinking because it is designed to “push” information to it’s consumer instead of making you “pull” information from it. At the same time, Reactive LINQ allows you to select on observable objects in a very natural feeling pull-like language. Therefore, it occurred to me: why not combine XmlReader and Reactive extensions to build a SAX-like Xml reader that you could use via Reactive LINQ!?
An observable XmlReader would allow you to subscribe to it and would iterate over your Xml document for you, notifying you when each node was read. The programmer using this could easily write reactive LINQ expressions to select on specific nodes in the Xml resulting in code that would look and feel much like LINQ to XML but with all the performance benefits of XmlReader.
For example, imagine you wanted to find all the values in nodes of a certain name. You could write something like this…
1 XmlReader reader = XmlReader.Create(“TreeOfLife.xml”);
2 IObservable<XmlReader> RxReader = reader.ToObservable();
3 IObservable<string> NameFinder =
4 from nodeReader in RxReader
5 where nodeReader.NodeType == XmlNodeType.Element && nodeReader.Name == “NAME”
6 select nodeReader.ReadElementContentAsString();
7 NameFinder.Subscribe((item) => names.Add(item));
I’ll dissect:
- Create an XmlReader
- Use my newly created extension method to turn that XmlReader into an IObservable.
- Construct a new IObservable
- Select all the XmlReaders in the IObservable (one for each node)
- Filter for node type and element name
- Select their string values
- Actually initiate the XmlReader to iterate and notify you when a new name is available.
Now let’s look at the code in my extension method. It’s surprisingly simple!
public static IObservable<XmlReader> ToObservable(this XmlReader reader)
{
return
Observable.CreateWithDisposable<XmlReader>(observer =>
{
try
{
while (reader.Read())
observer.OnNext(reader);
observer.OnCompleted();
}
catch (Exception e)
{
observer.OnError(e);
}
return reader;
});
}
All I am doing is looping on the XmlReader and passing the XmlReader itself (at it’s current state) to the observer. Violla!
Canceling the operation
Suppose you want to cancel the operation mid subscribe: Because I create the observer via Observable.CreateWithDisposable and return the XmlReader itself as my Disposable object. This allows me to cancel at any time by simply calling:
IDisposable processor = NameFinder.Subscribe((item) => names.Add(item));
processor.Dispose();
More complex selections
Suppose you want to get only the child nodes within a parent node:
IObservable<string> NameFinder =
from r1 in RxReader
where r1.NodeType == XmlNodeType.Element && r1.GetAttribute(“ID”) == “76937”
from r2 in r1.ReadSubtree().ToObservable()
where r2.NodeType == XmlNodeType.Element && r2.Name == “NAME”
select r2.ReadElementContentAsString();
I simply combine reactive LINQ statements and create a new reactive XmlReader to iterate a sub node using XmlReader.ReadSubtree().ToObservable().
Here’s a demo of the code above using the tree of life Xml for a Danaus butterfly genus.
Here’s the source code for the project.
Nice post,
but would you not be able to write a Linq to XML Query and simply call (the Rx Lib version of) ToObservable() on the linq query in order to consume the query results in a reactive /pushed fashion?
Steven, thanks for the comment! From my understanding, Linq to Xml is not a forward-only technology and therefore would require that all Xml be loaded into memory before parsing. For most Xml documents the performance and memory savings probably isn’t worth the trouble but for large xml documents my technique would have an advantage.
Nevertheless, that is a great idea and certainly a much simpler approach if all you’re after is reactive-style Xml parsing.
😦
Tim are you try to apply this “reader” to TcpClient stream with infinite xmlfeed?
while (reader.Read())
reads current message from stream
observer.OnNext(reader);
pushes to observer a reader to NEXT message while message can be readed from the stream only one time.
Hi Tim!
I was wondering if the extension method is async or not?
If not, how we could implement async?
how about:
Observable.CreateWithDisposable(observer =>
Scheduler.CurrentThread.Schedule(() =>
Mike, you can just call:
NameFinder
.SubscribeOn(Scheduler.NewThread)
.ObserveOn(Scheduler.Dispatcher)
.Subscribe(…);
The extra ObserveOn method is optional and used to make sure your callbacks are called on the UI thread.
ups. Please remove my previous comment if posible.
Full sample of xml streamed reader:
using System;
using System.Concurrency;
using System.IO;
using System.Linq;
using System.Text;
using System.Xml;
using System.Xml.Serialization;
namespace BetradarLiveOdds.Test
{
public class TestReader
{
[Serializable]
public class Test
{
[XmlAttribute]
public int id;
}
public void TestXmlReader()
{
XmlSerializer serializer = new XmlSerializer(typeof(Test));
string fragmentElementName = “Test”;
string XmlData = “”;
var observableReader = Observable.Generate(
XmlReader.Create(new MemoryStream(Encoding.UTF8.GetBytes(XmlData)), GetReaderSettings()),
reader => !reader.EOF && reader.ReadToFollowing(fragmentElementName),
reader => reader, reader => reader.ReadSubtree(), Scheduler.NewThread);
observableReader
.Select(reader =>
{
return ((Test)serializer.Deserialize(reader)).id;
})
.Subscribe(Console.WriteLine);
Console.ReadLine();
}
///
/// Returns streamed XmlReader settings
///
///
///
private static XmlReaderSettings GetReaderSettings()
{
XmlReaderSettings settings = new XmlReaderSettings();
settings.ConformanceLevel = ConformanceLevel.Fragment;
settings.CloseInput = false;
settings.CheckCharacters = false;
settings.DtdProcessing = DtdProcessing.Ignore;
settings.IgnoreComments = false;
settings.IgnoreProcessingInstructions = false;
settings.IgnoreWhitespace = false;
settings.MaxCharactersInDocument = 0;
settings.ValidationType = ValidationType.None;
return settings;
}
}
}
string XmlData = “<Test id=’1′ /><Test id=’2’/><Test id=’3’/><Test id=’4’/><Test id=’5’/><Test id=’6’/><Test id=’7’/><Test id=’8’/>”;
Hi Tim!
Thanks for your reply.
One more question about canceling an operation by using CreateWithDisposable.
check the following code:
public IObservable ParseXML(string xmlString)
{
return Observable.CreateWithDisposable(observer =>
Scheduler.ThreadPool.Schedule((() =>
{
….
}
);
}
What is missing so I could do the following:
IDisposable processor = ParseXML(xml).ObserveOnDispatcher().Subscribe((item) => names.Add(item));
processor.Dispose();
currently the dispose call is not been called.