I want to import paycheck data from a csv file which is located at Azure BLOB Storage into Azure Database on ADF. Since paycheck schedules are fixed if batch of PaycheckDate was already on a table, existing data of a given PaycheckDate should be cleaned up before importing and (new) entries in the file are expected to be imported for a given PaycheckDate. (There is no unique identifiers on a file to pick up a particular entry)
For a file,
there is a CSV file, for example, Paycheck_202402.csv on PaycheckContainer of BLOB Storage, and here is a schema and data. (About 28K rows in a file)
CompanyId,EmployeeId,PaycheckDate,Amount,Description
100,e1234,2024-02-09,150.5,Overtime
100,e1234,2024-02-09,1500.0,Salary
100,e1234,2024-02-23,305.25,Overtime
100,e1234,2024-02-23,1500.0,Salary
200,e2222,2024-02-09,50.5,Tip
200,e2222,2024-02-09,500.0,Salary
400,e5555,2024-02-15,1000,Compensate
400,e5555,2024-02-23,1500.0,Salary
For target a table called as factPaychecks, here is a simple schema
CompanyId INT,
EmployeeId VARCHAR,
PaycheckDate DATE,
Amount DOUBLE(10,2),
Description VARCHAR
What I'd like to do is
- to extract unique PaycheckDates from a file into an array, so the expected result is
arrCheckDates= [2024-02-09, 2024-02-15, 2024-02-23]. - to pass
arrCheckDatesto For-Each activity, - and then iterate a DELETE query against
factPaychecksby a givenPaycheckDate
DELETE FROM `factPaychecks` WHERE [PaycheckDate] = $PaycheckDate
- to import entries in a file to
factPaychecks
I tried
- DataFlow(
DFImportPCK) for extracting PaycheckDates and sink to Cache
1.1. ((Source)) from CSV
1.2. ((Aggregate)) [Group by] PaycheckDate with [Aggregates] NumOfRows(PaycheckDate)
1.3. ((Select)) PaycheckDate only - checked the result is DATE type
1.4. ((Sink))`` [Type] as Cache`, [Options] checked Write to activity output, [Key column] List of columns (no columns are assigned) --> Inspect/Data Preview : DATE type
https://i.stack.imgur.com/v7Atu.png
- Assign the returned value to an array type variable
varArrPaycheckDateson a pipeline(PLImportPCK)
varArrPaycheckDates = @activity('DFImportPCK').output.runStatus.output.Sink2Cache.value
There are couple of issues I have
- In DataFlow(
DFImportPCK), I could see only 1 entry2024-02-09at((Aggregate))
`PaycheckDate` 2024-02-09 `NumOfRows` 1000
It's same as subsequent steps even ((Sink)) I could see 2024-02-09 only.
But it must be 3 elements like above [2024-02-09, 2024-02-15, 2024-02-23]
2. DataFlow(DFImportPCK) returns a (LONG)INTEGER value 1688688000000 not 2024-02-09 DATE type.
Could you guide me how I can achieve my goal or point me out what I'm missing?
Thanks!
The issue is with the date type First you need to cast it as string type as below:
Then in DataFlow setting set logging level none and uncheck First Row Only.
then in dataflow Output you can see array of
PaycheckDate.then Add for each loop to iterate on this array. with expreassion
Under foreach activity take append variable with expression
@item().PaycheckDateto store all values in single array.After execution of for each append variable will look like this :