I'm getting logs from a firewall in CEF Format
as a string which looks as:
ABC|XYZ|F123|1.0|DSE|DSE|4|externalId=e705265d0d9e4d4fcb218b cn2=329160 cn1=3053998 dhost=SRV2019 duser=admin msg=Process accessed NTDS fname=ntdsutil.exe filePath=\Device\HarddiskVolume2\Windows\System32 cs5="C:\Windows\system32\ntdsutil.exe" "ac i ntds" ifm "create full ntdstest3" q q fileHash=80c8b68240a95 dntdom=adminDomain cn3=13311 rt=1610948650000 tactic=Credential Access technique=Credential Dumping objective=Gain Access patternDisposition=Detection. outcome=0
How can I create a DataFrame from this kind of string where I'm getting key-value pairs separated by = ?
My objective is to infer schema from this string using the keys dynamically, i.e extract the keys from left side of the = operator and create a schema using them.
What I have been doing currently is pretty lame(IMHO) and not very dynamic in approach.(because the number of key-value pairs can change as per different type of logs)
val a: String = "ABC|XYZ|F123|1.0|DSE|DCE|4|externalId=e705265d0d9e4d4fcb218b cn2=329160 cn1=3053998 dhost=SRV2019 duser=admin msg=Process accessed NTDS fname=ntdsutil.exe filePath=\Device\HarddiskVolume2\Windows\System32 cs5="C:\Windows\system32\ntdsutil.exe" "ac i ntds" ifm "create full ntdstest3" q q fileHash=80c8b68240a95 dntdom=adminDomain cn3=13311 rt=1610948650000 tactic=Credential Access technique=Credential Dumping objective=Gain Access patternDisposition=Detection. outcome=0"
val ttype: String = "DCE"
type parseReturn = (String,String,List[String],Int)
def cefParser(a: String, ttype: String): parseReturn = {
val firstPart = a.split("\|")
var pD = new ListBuffer[String]()
var listSize: Int = 0
if (firstPart.size == 8 && firstPart(4) == ttype) {
pD += firstPart(0)
pD += firstPart(1)
pD += firstPart(2)
pD += firstPart(3)
pD += firstPart(4)
pD += firstPart(5)
pD += firstPart(6)
val secondPart = parseSecondPart(firstPart(7), ttype)
pD ++= secondPart
listSize = pD.toList.length
(firstPart(2), ttype, pD.toList, listSize)
} else {
val temp: List[String] = List(a)
(firstPart(2), "IRRELEVANT", temp, temp.length)
}
}
The method parseSecondPart is:
def parseSecondPart(m:String, ttype:String): ListBuffer[String] = ttype match {
case auditActivity.ttype=>parseAuditEvent(m)
Another function call to just replace some text in the logs
def parseAuditEvent(msg: String): ListBuffer[String] = {
val updated_msg = msg.replace("cat=", "metadata_event_type=")
.replace("destinationtranslatedaddress=", "event_user_ip=")
.replace("duser=", "event_user_id=")
.replace("deviceprocessname=", "event_service_name=")
.replace("cn3=", "metadata_offset=")
.replace("outcome=", "event_success=")
.replace("devicecustomdate1=", "event_utc_timestamp=")
.replace("rt=", "metadata_event_creation_time=")
parseEvent(updated_msg)
}
Final function to get only the values:
def parseEvent(msg: String): ListBuffer[String] = {
val newMsg = msg.replace("\=", "$_equal_$")
val pD = new ListBuffer[String]()
val splitData = newMsg.split("=")
val mSize = splitData.size
for (i <- 1 until mSize) {
if(i < mSize-1) {
val a = splitData(i).split(" ")
val b = a.size-1
val c = a.slice(0,b).mkString(" ")
pD += c.replace("$_equal_$","=")
} else if(i == mSize-1) {
val a = splitData(i).replace("$_equal_$","=")
pD += a
} else {
logExceptions(newMsg)
}
}
pD
}
The returns contains a ListBuffer[String]
at 3rd position, using which I create a DataFrame as follows:
val df = ss.sqlContext
.createDataFrame(tempRDD.filter(x => x._1 != "IRRELEVANT")
.map(x => Row.fromSeq(x._3)), schema)
People of stackoverflow, i really need your help in improving my code, both for performance and approach.
Any kind of help and/or suggestions will be highly appreciated.
Thanks In Advance.
question from:
https://stackoverflow.com/questions/65885012/how-to-create-a-dataframe-using-a-string-consisting-of-key-value-pairs