how to pull nomad task logs incrementally

89 Views Asked by At

I'm trying to create a Python code that pulls Hashi corp Nomad tasks logs for raw_exec driver incrementally (pull new logs since last pull). The incremental pull is done every 10-20seconds.

To achieve it I'm using the last offset since last pull as described in the API reference:

I'm using this to pull the 1st log while the startoffset=0" and for the next ones I'm using the last offset I'm getting

response2 = requests.get(f"{nomadapiurl}client/fs/logs/{alloc['ID']}?task={task}&type={logtype}&origin=start&offset={startoffset}")

Logs files for the task are limited in size and rotate every few minutes and since the offset is not relative to specific file I'm not sure how I can pull the new logs generated since the last pull.

any help will be appreciated.

2

There are 2 best solutions below

0
KamilCuk On

Nomad does not keep state of the log files. You have to save the stream content, diff it, find the position in the logs where you last were, and start streaming from there. You can download the whole log file with nomad alloc fs.

The nomad alloc logs --help even has this:

-n
Sets the tail location in best-efforted number of lines relative to the end
of the logs.

The important "best efforted" means that it opens the log stream, waits for some time, and after that time only starts printing the logs, hoping these are only fresh log lines.

You might be interested in implementing Nomad log collection, for example with Grafana Loki and promtail. To solve the problem you are having, promtail runs on the host, connects to text log files in Nomad data directory, and then promtail can save the end file positions to know where it ended streaming.

A bit of self-promotion, in my nomad-watch project you should be able to nomad-watch -f -n0 job thejob to stream from only newest logs. What it does it wait for 0.5 seconds to let nomad transfer the logs and only then starts streaming logs. But in this case it should do exactly the same as nomad alloc logs -tail -n 0.

Also https://github.com/hashicorp/nomad/blob/61941d820448d1b83e16f726c51c14cab30986e1/command/alloc_logs.go#L238 and https://github.com/hashicorp/nomad/blob/61941d820448d1b83e16f726c51c14cab30986e1/command/alloc_logs.go#L295 . Nomad uses a delay of 1 second , 1*time.Second) before starting to stream "new" logs.

Bottom line, what is really missing is any kind of structure to the logs that would store the timestamp of the line printed. Consider making (or find if exists one) an issue https://github.com/hashicorp/nomad/issues, so we can upvote.

0
Haim Marko On

I solved this issue using the following code. it is a bit complex but working.

  1. all the logs are pulled using API every 10 seconds
  2. I'm saving the state of the last chunk that has been appended to the log file I want to create in a state file
  3. the state file is being used to save logs that were created after the last pull.
                if parselog:
                    #get stderrprevious stdout and stdout log parsing state
                    offsetconfig = {'stderr': {'offset':0, 'file':'', 'data': '', "full": True, "len": 0, 'seq': 0}, 'stdout':{'offset':0, 'file':'', 'data': '', "full": True, "len": 0, 'seq': 0}}
                    try:
                        if os.path.isfile(cacherunningfile):                            
                            f = open(cacherunningfile)
                            offsetconfig = json.load(f)
                            f.close()
                    except Exception as e:                  
                        logging.debug(f"cannot load data from file:{cacherunningfile} - {e}") 
                        offsetconfig = {'stderr': {'offset':0, 'file':'', 'data': '', "full": True, "len": 0, 'seq': 0}, 'stdout':{'offset':0, 'file':'', 'data': '', "full": True, "len": 0, 'seq': 0}}

                    #iterate over log types         
                    for logtype in ['stderr','stdout']:
                        #when set to True the it new data will be appended 
                        append = False
                        appended = False

                        #this is the last offset on the file 
                        lastoffset = offsetconfig[logtype]['offset']

                        #the last log file number (apears in the suffix of the file name)
                        try:
                            lastlogfilenum = int(re.search(r'\d+$', offsetconfig[logtype]['file']).group())
                        except:
                            lastlogfilenum = 0

                        #initialize dict to host previous data obj 
                        prevjsonobj = {'Full': offsetconfig[logtype]['full'],'Data': offsetconfig[logtype]['data'], 'File': offsetconfig[logtype]['file'], 'Offset': offsetconfig[logtype]['offset'], 'Len': offsetconfig[logtype]['len'], 'Seq': offsetconfig[logtype]['seq'] }

                        #start offset from the begining of the log 
                        startoffset = 0

                        #log file to update 
                        alloclogfile = os.path.join(jobdir,logtype+'log_'+alloc['ID']+'.log')

                        #collect the log the logs 
                        logging.debug(f"collecting logs alloc:{alloc['ID']} logs type:{logtype} offset:{startoffset} using logs API")
                        response2 = requests.get(f"{nomadapiurl}client/fs/logs/{alloc['ID']}?task={task}&type={logtype}&origin=start")
                        
                        #used for part of the logs that need to be included in the appended file 
                        loginc = ''
                        
                        if response2.ok:
                            if re.search(rb"(\d|\S)", response2.content, re.M|re.I):
                                logging.debug("log for job:"+alloc['ID']+" is available using api")
                                jsonresp = response2.content.decode('utf-8')
                                jsondec = json.JSONDecoder()
                                while jsonresp:
                                    jsonobj, json_len = jsondec.raw_decode(jsonresp)
                                    jsonresp = jsonresp[json_len:]
                                    
                                    if 'File' in jsonobj and 'Offset' in jsonobj and 'Data' in jsonobj:
                                        currentfile = jsonobj['File']
                                        currentdatasize = len(jsonobj['Data'])
                                        try:
                                            currlogfilenum = int(re.search(r'\d+$', currentfile).group())
                                        except:
                                            #this shouldn't happend 
                                            currlogfilenum = 0                                      
                                        
                                        #each full data size is in the size of 87384 when it is partial we want to take care of it diffrent
                                        currentfull = True
                                        if currentdatasize < 87384:
                                            currentfull = False 
                                        jsonobj['Full'] = currentfull
                                        jsonobj['Len'] = currentdatasize
                                        jsonobj['Seq'] = (currlogfilenum+1)*1000000+int(jsonobj['Offset']/65536)

                                        #part of the log that wan't processed yet 
                                        #diffrent offset 
                                        if (currlogfilenum == lastlogfilenum and jsonobj['Offset'] > prevjsonobj['Offset']):
                                            append = True
                                        #changes withing the same sequence  
                                        if (jsonobj['Seq']==prevjsonobj['Seq'] and jsonobj['Len'] > prevjsonobj['Len']):
                                            append = True 
                                        #diffrent file
                                        if (currlogfilenum > lastlogfilenum):
                                            append = True 
                                            
                                    
                                        if append:
                                            #convert base64 to string and include prevstring if was used
                                            try:
                                                logstring = base64.b64decode(jsonobj['Data']).decode('utf-8','ignore')
                                            except:
                                                logstring = ''

                                            #capture the length of the log string 
                                            logstringlen = len(logstring)

                                            #prcess partial data (smaller than 65536)
                                            if not prevjsonobj['Full'] and not appended:
                                                
                                                prevlogstring = base64.b64decode(prevjsonobj['Data']).decode('utf-8','ignore')
                                                prevlogstringlen = len(prevlogstring)
                                                if jsonobj['Seq'] == prevjsonobj['Seq']:
                                                    #same seq addition 
                                                    logstring = logstring[-(logstringlen-prevlogstringlen):]
                                                    logstringlen = len(logstring)
                                                    #print("processing same seq addition:", jsonobj['Seq'], prevjsonobj['Seq'], prevlogstringlen, logstringlen)
                                                elif jsonobj['Seq'] > prevjsonobj['Seq'] and jsonobj['Seq']-prevjsonobj['Seq'] < 2000000:
                                                    #dual seq addition
                                                    logstring = logstring[-(65536-prevlogstringlen):]
                                                    logstringlen = len(logstring)           
                                                    #print("processing dual seq addition:", jsonobj['Seq'], prevjsonobj['Seq'], prevlogstringlen, logstringlen)                                     
                                            
                                            #mark to prevent additional append on file change 
                                            appended = True

                                            #normalize log string   
                                            logstringnormalize = logstring.replace("\\n","\n")
                                            
                                            #append normalized string to the included part of the log 
                                            loginc += logstringnormalize                                            

                                            logging.debug(f"incrementing log type:{logtype} job:{alloc['ID']}  offset:{jsonobj['Offset']} seq:{jsonobj['Seq']} len:{jsonobj['Len']} logfile:{jsonobj['File']} Full:{jsonobj['Full']}")
                                                
                                            offsetconfig[logtype]['file'] = jsonobj['File']                                             
                                            offsetconfig[logtype]['offset'] = jsonobj['Offset']
                                            offsetconfig[logtype]['data'] = jsonobj['Data']
                                            offsetconfig[logtype]['full'] = jsonobj['Full']
                                            offsetconfig[logtype]['seq'] = jsonobj['Seq']
                                            offsetconfig[logtype]['len'] = jsonobj['Len']


                                            #keep previous data 
                                            prevjsonobj = jsonobj

                            else:
                                logging.debug(f"log type:{logtype} job:{alloc['ID']} was not updated since last request")
                            
                            #write incremantal log to file
                            try:
                                f = open(alloclogfile,'a')
                                f.write(loginc)
                                f.close()
                            except Exception as e:                  
                                logging.error(f"cannot append data to file:{alloclogfile} - {e}") 
                                exit(1)                             
                        else:
                            logging.info(f"log for type:{logtype} job:{alloc['ID']} does NOT exists")
                            
                    try:
                        f = open(cacherunningfile,'w')
                        json.dump(offsetconfig,f)
                    except Exception as e:                  
                        logging.error(f"cannot load data from file:{cacherunningfile} - {e}") 
                        exit(1)