Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
168 views
in Technique[技术] by (71.8m points)

scala - How to create a DataFrame using a string consisting of key-value pairs?

I'm getting logs from a firewall in CEF Format as a string which looks as:

ABC|XYZ|F123|1.0|DSE|DSE|4|externalId=e705265d0d9e4d4fcb218b cn2=329160 cn1=3053998 dhost=SRV2019 duser=admin msg=Process accessed NTDS fname=ntdsutil.exe filePath=\Device\HarddiskVolume2\Windows\System32 cs5="C:\Windows\system32\ntdsutil.exe" "ac i ntds" ifm "create full ntdstest3" q q fileHash=80c8b68240a95 dntdom=adminDomain cn3=13311 rt=1610948650000 tactic=Credential Access technique=Credential Dumping objective=Gain Access patternDisposition=Detection. outcome=0

How can I create a DataFrame from this kind of string where I'm getting key-value pairs separated by = ?

My objective is to infer schema from this string using the keys dynamically, i.e extract the keys from left side of the = operator and create a schema using them.

What I have been doing currently is pretty lame(IMHO) and not very dynamic in approach.(because the number of key-value pairs can change as per different type of logs)

val a: String = "ABC|XYZ|F123|1.0|DSE|DCE|4|externalId=e705265d0d9e4d4fcb218b cn2=329160 cn1=3053998 dhost=SRV2019 duser=admin msg=Process accessed NTDS fname=ntdsutil.exe filePath=\Device\HarddiskVolume2\Windows\System32 cs5="C:\Windows\system32\ntdsutil.exe" "ac i ntds" ifm "create full ntdstest3" q q fileHash=80c8b68240a95 dntdom=adminDomain cn3=13311 rt=1610948650000 tactic=Credential Access technique=Credential Dumping objective=Gain Access patternDisposition=Detection. outcome=0"

val ttype: String = "DCE"

type parseReturn = (String,String,List[String],Int)

def cefParser(a: String, ttype: String): parseReturn = {
    val firstPart = a.split("\|")
    var pD = new ListBuffer[String]()
    var listSize: Int = 0
    if (firstPart.size == 8 && firstPart(4) == ttype) {
      pD += firstPart(0)
      pD += firstPart(1)
      pD += firstPart(2)
      pD += firstPart(3)
      pD += firstPart(4)
      pD += firstPart(5)
      pD += firstPart(6)
      val secondPart = parseSecondPart(firstPart(7), ttype)
      pD ++= secondPart
      listSize = pD.toList.length
      (firstPart(2), ttype, pD.toList, listSize)
    } else {
      val temp: List[String] = List(a)
      (firstPart(2), "IRRELEVANT", temp, temp.length)
    }
  }

  

The method parseSecondPart is:

def parseSecondPart(m:String, ttype:String): ListBuffer[String] = ttype match {
    case auditActivity.ttype=>parseAuditEvent(m)

Another function call to just replace some text in the logs

def parseAuditEvent(msg: String): ListBuffer[String] = {
    val updated_msg = msg.replace("cat=", "metadata_event_type=")
      .replace("destinationtranslatedaddress=", "event_user_ip=")
      .replace("duser=", "event_user_id=")
      .replace("deviceprocessname=", "event_service_name=")
      .replace("cn3=", "metadata_offset=")
      .replace("outcome=", "event_success=")
      .replace("devicecustomdate1=", "event_utc_timestamp=")
      .replace("rt=", "metadata_event_creation_time=")

    parseEvent(updated_msg)
  }

Final function to get only the values:

def parseEvent(msg: String): ListBuffer[String] = {
    val newMsg = msg.replace("\=", "$_equal_$")
    val pD = new ListBuffer[String]()
    val splitData = newMsg.split("=")
    val mSize = splitData.size
    for (i <- 1 until mSize) {
      if(i < mSize-1) {
        val a = splitData(i).split(" ")
        val b = a.size-1
        val c = a.slice(0,b).mkString(" ")
        pD += c.replace("$_equal_$","=")
      } else if(i == mSize-1) {
        val a = splitData(i).replace("$_equal_$","=")
        pD += a
      } else {
        logExceptions(newMsg)
      }
    }
    pD
  }

The returns contains a ListBuffer[String]at 3rd position, using which I create a DataFrame as follows:

val df = ss.sqlContext
    .createDataFrame(tempRDD.filter(x => x._1 != "IRRELEVANT")
    .map(x => Row.fromSeq(x._3)), schema)

People of stackoverflow, i really need your help in improving my code, both for performance and approach. Any kind of help and/or suggestions will be highly appreciated. Thanks In Advance.

question from:https://stackoverflow.com/questions/65885012/how-to-create-a-dataframe-using-a-string-consisting-of-key-value-pairs

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...